You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/20 17:24:43 UTC
[2/8] incubator-usergrid git commit: First pass at upgrading to java
8 and latest RX java
First pass at upgrading to java 8 and latest RX java
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/282e2271
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/282e2271
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/282e2271
Branch: refs/heads/USERGRID-486
Commit: 282e22712890cdda0439a5694810cff632526d7b
Parents: 72ec19d
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Mar 19 18:00:57 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Mar 19 18:00:57 2015 -0600
----------------------------------------------------------------------
stack/core/pom.xml | 26 +--
.../corepersistence/CpEntityManager.java | 2 +-
.../corepersistence/CpEntityManagerFactory.java | 7 +-
.../corepersistence/CpRelationManager.java | 24 +--
.../usergrid/corepersistence/CpWalker.java | 77 +++-----
.../events/EntityVersionDeletedHandler.java | 72 +++----
.../migration/EntityTypeMappingMigration.java | 41 ++--
.../migration/EntityTypeMappingMigrationIT.java | 2 +-
.../impl/EntityCollectionManagerImpl.java | 10 +-
.../collection/impl/EntityDeletedTask.java | 20 +-
.../impl/EntityVersionCleanupTask.java | 27 +--
.../impl/EntityVersionCreatedTask.java | 26 +--
.../MvccEntitySerializationStrategyImpl.java | 89 +++------
.../MvccEntitySerializationStrategyV3Impl.java | 91 +++------
.../migration/MvccEntityDataMigrationImpl.java | 169 +++++++---------
.../persistence/collection/rx/ParallelTest.java | 10 +-
...ctMvccEntityDataMigrationV1ToV3ImplTest.java | 2 +-
stack/corepersistence/common/pom.xml | 15 +-
.../astyanax/MultiKeyColumnNameIterator.java | 4 +-
.../MultiKeyColumnNameIteratorTest.java | 187 ++++++++----------
.../astyanax/MultiRowColumnIteratorTest.java | 50 ++---
.../graph/impl/GraphManagerImpl.java | 6 +-
.../graph/impl/stage/EdgeMetaRepairImpl.java | 2 +
.../impl/stage/NodeDeleteListenerImpl.java | 2 +-
.../impl/migration/EdgeDataMigrationImpl.java | 87 ++++-----
.../graph/GraphManagerShardConsistencyIT.java | 2 +-
.../usergrid/persistence/graph/SimpleTest.java | 12 +-
.../migration/EdgeDataMigrationImplTest.java | 2 +-
stack/corepersistence/pom.xml | 8 +-
.../index/impl/IndexLoadTestsIT.java | 105 ++++------
stack/pom.xml | 8 +-
.../management/importer/ImportServiceImpl.java | 34 ++--
.../impl/ApplicationQueueManagerImpl.java | 195 +++++++++----------
.../setup/ConcurrentProcessSingleton.java | 16 +-
34 files changed, 604 insertions(+), 826 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index 971ee62..119a52b 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -130,15 +130,7 @@
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.3.2</version>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
+
<!-- <plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -481,17 +473,11 @@
<version>${metrics.version}</version>
</dependency>
- <dependency>
- <groupId>com.netflix.rxjava</groupId>
- <artifactId>rxjava-core</artifactId>
- <version>${rx.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.netflix.rxjava</groupId>
- <artifactId>rxjava-math</artifactId>
- <version>${rx.version}</version>
- </dependency>
+ <dependency>
+ <groupId>io.reactivex</groupId>
+ <artifactId>rxjava</artifactId>
+ <version>${rx.version}</version>
+ </dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 789e640..9cffdaf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -1097,7 +1097,7 @@ public class CpEntityManager implements EntityManager {
} );
//TODO: does this call and others like it need a graphite reporter?
- cpEntity = ecm.write( cpEntity ).toBlockingObservable().last();
+ cpEntity = ecm.write( cpEntity ).toBlocking().last();
logger.debug( "Wrote {}:{} version {}", new Object[] {
cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion()
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index f76b9fc..83c3d85 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -451,7 +451,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
fromEntityId, edgeType, Long.MAX_VALUE,
SearchByEdgeType.Order.DESCENDING, null ));
- Iterator<Edge> iter = edges.toBlockingObservable().getIterator();
+ //TODO This is wrong, and will result in OOM if there are too many applications. This needs to stream properly with a buffer
+ Iterator<Edge> iter = edges.toBlocking().getIterator();
while ( iter.hasNext() ) {
Edge edge = iter.next();
@@ -469,7 +470,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
org.apache.usergrid.persistence.model.entity.Entity e =
managerCache.getEntityCollectionManager( collScope ).load( targetId )
- .toBlockingObservable().lastOrDefault(null);
+ .toBlocking().lastOrDefault(null);
if ( e == null ) {
logger.warn("Applicaion {} in index but not found in collections", targetId );
@@ -624,7 +625,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
public long performEntityCount() {
//TODO, this really needs to be a task that writes this data somewhere since this will get
//progressively slower as the system expands
- return (Long) getAllEntitiesObservable().longCount().toBlocking().last();
+ return (Long) getAllEntitiesObservable().countLong().toBlocking().last();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 2eeee28..c4e970d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -280,7 +280,7 @@ public class CpRelationManager implements RelationManager {
Observable<String> types= gm.getEdgeTypesFromSource(
new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ));
- Iterator<String> iter = types.toBlockingObservable().getIterator();
+ Iterator<String> iter = types.toBlocking().getIterator();
while ( iter.hasNext() ) {
indexes.add( iter.next() );
}
@@ -346,7 +346,7 @@ public class CpRelationManager implements RelationManager {
Observable<Edge> edges = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
cpHeadEntity.getId(), etype, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ));
- Iterator<Edge> iter = edges.toBlockingObservable().getIterator();
+ Iterator<Edge> iter = edges.toBlocking().getIterator();
while ( iter.hasNext() ) {
Edge edge = iter.next();
@@ -383,7 +383,7 @@ public class CpRelationManager implements RelationManager {
final GraphManager gm = managerCache.getGraphManager( applicationScope );
Iterator<String> edgeTypesToTarget = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(
- cpHeadEntity.getId(), null, null) ).toBlockingObservable().getIterator();
+ cpHeadEntity.getId(), null, null) ).toBlocking().getIterator();
logger.debug("updateContainingCollectionsAndCollections(): "
+ "Searched for edges to target {}:{}\n in scope {}\n found: {}",
@@ -484,7 +484,7 @@ public class CpRelationManager implements RelationManager {
SearchByEdgeType.Order.DESCENDING,
null ) );
- return edges.toBlockingObservable().firstOrDefault( null ) != null;
+ return edges.toBlocking().firstOrDefault( null ) != null;
}
@@ -511,7 +511,7 @@ public class CpRelationManager implements RelationManager {
SearchByEdgeType.Order.DESCENDING,
null ) );
- return edges.toBlockingObservable().firstOrDefault( null ) != null;
+ return edges.toBlocking().firstOrDefault( null ) != null;
}
@@ -528,7 +528,7 @@ public class CpRelationManager implements RelationManager {
SearchByEdgeType.Order.DESCENDING,
null ) ); // last
- Iterator<Edge> iterator = edgesToTarget.toBlockingObservable().getIterator();
+ Iterator<Edge> iterator = edgesToTarget.toBlocking().getIterator();
int count = 0;
while ( iterator.hasNext() ) {
iterator.next();
@@ -569,7 +569,7 @@ public class CpRelationManager implements RelationManager {
Observable<String> str = gm.getEdgeTypesFromSource(
new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) );
- Iterator<String> iter = str.toBlockingObservable().getIterator();
+ Iterator<String> iter = str.toBlocking().getIterator();
while ( iter.hasNext() ) {
String edgeType = iter.next();
indexes.add( CpNamingUtils.getCollectionName( edgeType ) );
@@ -692,7 +692,7 @@ public class CpRelationManager implements RelationManager {
// create graph edge connection from head entity to member entity
Edge edge = new SimpleEdge( cpHeadEntity.getId(), edgeType, memberEntity.getId(), uuidHash );
GraphManager gm = managerCache.getGraphManager( applicationScope );
- gm.writeEdge( edge ).toBlockingObservable().last();
+ gm.writeEdge( edge ).toBlocking().last();
logger.debug( "Wrote edgeType {}\n from {}:{}\n to {}:{}\n scope {}:{}",
new Object[] {
@@ -855,7 +855,7 @@ public class CpRelationManager implements RelationManager {
cpHeadEntity.getId(),
CpNamingUtils.getEdgeTypeFromCollectionName( collName ),
memberEntity.getId(), UUIDUtils.getUUIDLong( memberEntity.getId().getUuid() ) );
- gm.deleteEdge( collectionToItemEdge ).toBlockingObservable().last();
+ gm.deleteEdge( collectionToItemEdge ).toBlocking().last();
// remove edge from item to collection
Edge itemToCollectionEdge = new SimpleEdge(
@@ -865,7 +865,7 @@ public class CpRelationManager implements RelationManager {
cpHeadEntity.getId(),
UUIDUtils.getUUIDLong( cpHeadEntity.getId().getUuid() ) );
- gm.deleteEdge( itemToCollectionEdge ).toBlockingObservable().last();
+ gm.deleteEdge( itemToCollectionEdge ).toBlocking().last();
// special handling for roles collection of a group
if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
@@ -1058,7 +1058,7 @@ public class CpRelationManager implements RelationManager {
cpHeadEntity.getId(), edgeType, targetEntity.getId(), System.currentTimeMillis() );
GraphManager gm = managerCache.getGraphManager( applicationScope );
- gm.writeEdge( edge ).toBlockingObservable().last();
+ gm.writeEdge( edge ).toBlocking().last();
EntityIndex ei = managerCache.getEntityIndex( applicationScope );
EntityIndexBatch batch = ei.createBatch();
@@ -1290,7 +1290,7 @@ public class CpRelationManager implements RelationManager {
System.currentTimeMillis() );
GraphManager gm = managerCache.getGraphManager( applicationScope );
- gm.deleteEdge( edge ).toBlockingObservable().last();
+ gm.deleteEdge( edge ).toBlocking().last();
final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
final EntityIndexBatch batch = ei.createBatch();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index 4b902d8..332d5a8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -104,53 +104,38 @@ public class CpWalker {
Observable<String> edgeTypes = gm.getEdgeTypesFromSource(
new SimpleSearchEdgeType( applicationId, edgeType, null ) );
- edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
- @Override
- public Observable<Edge> call( final String edgeType ) {
-
- logger.debug( "Loading edges of type {} from node {}", edgeType, applicationId );
-
- return gm.loadEdgesFromSource( new SimpleSearchByEdgeType(
- applicationId, edgeType, Long.MAX_VALUE, order , null ) );
-
- }
-
- } ).parallel( new Func1<Observable<Edge>, Observable<Edge>>() {
-
- @Override
- public Observable<Edge> call( final Observable<Edge> edgeObservable ) { // process edges in parallel
- return edgeObservable.doOnNext( new Action1<Edge>() { // visit and update then entity
-
- @Override
- public void call( Edge edge ) {
-
- logger.info( "Re-indexing edge {}", edge );
-
- EntityRef targetNodeEntityRef = new SimpleEntityRef(
- edge.getTargetNode().getType(),
- edge.getTargetNode().getUuid() );
-
- Entity entity;
- try {
- entity = em.get( targetNodeEntityRef );
- }
- catch ( Exception ex ) {
- logger.error( "Error getting sourceEntity {}:{}, continuing",
- targetNodeEntityRef.getType(),
- targetNodeEntityRef.getUuid() );
- return;
- }
- if(entity == null){
- return;
- }
- String collName = CpNamingUtils.getCollectionName( edge.getType() );
- visitor.visitCollectionEntry( em, collName, entity );
- }
- } );
- }
- }, Schedulers.io() )
+ edgeTypes.flatMap( emittedEdgeType -> {
+
+ logger.debug( "Loading edges of type {} from node {}", edgeType, applicationId );
+
+ return gm.loadEdgesFromSource(
+ new SimpleSearchByEdgeType( applicationId, emittedEdgeType, Long.MAX_VALUE, order, null ) );
+ } ).flatMap( edge -> {
+ //run each edge through it's own scheduler, up to 100 at a time
+ return Observable.just( edge ).doOnNext( edgeValue -> {
+ logger.info( "Re-indexing edge {}", edgeValue );
+
+ EntityRef targetNodeEntityRef =
+ new SimpleEntityRef( edgeValue.getTargetNode().getType(), edgeValue.getTargetNode().getUuid() );
+
+ Entity entity;
+ try {
+ entity = em.get( targetNodeEntityRef );
+ }
+ catch ( Exception ex ) {
+ logger.error( "Error getting sourceEntity {}:{}, continuing", targetNodeEntityRef.getType(),
+ targetNodeEntityRef.getUuid() );
+ return;
+ }
+ if ( entity == null ) {
+ return;
+ }
+ String collName = CpNamingUtils.getCollectionName( edgeValue.getType() );
+ visitor.visitCollectionEntry( em, collName, entity );
+ } ).subscribeOn( Schedulers.io() );
+ }, 100 );
// wait for it to complete
- .toBlocking().lastOrDefault( null ); // end foreach on edges
+ edgeTypes.toBlocking().lastOrDefault( null ); // end foreach on edges
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index c45949b..23f5a32 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@ -17,53 +17,48 @@
*/
package org.apache.usergrid.corepersistence.events;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
-import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
import org.apache.usergrid.persistence.index.IndexScope;
import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Action2;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
+
+import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
/**
- * Remove Entity index when specific version of Entity is deleted.
- * TODO: do we need this? Don't our version-created and entity-deleted handlers take care of this?
- * If we do need it then it should be wired in via GuiceModule in the corepersistence package.
+ * Remove Entity index when specific version of Entity is deleted. TODO: do we need this? Don't our version-created and
+ * entity-deleted handlers take care of this? If we do need it then it should be wired in via GuiceModule in the
+ * corepersistence package.
*/
@Singleton
public class EntityVersionDeletedHandler implements EntityVersionDeleted {
- private static final Logger logger = LoggerFactory.getLogger(EntityVersionDeletedHandler.class );
-
-
+ private static final Logger logger = LoggerFactory.getLogger( EntityVersionDeletedHandler.class );
private final EntityManagerFactory emf;
+
@Inject
public EntityVersionDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;}
-
@Override
public void versionDeleted( final CollectionScope scope, final Id entityId,
final List<MvccLogEntry> entityVersions ) {
@@ -71,40 +66,33 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
// This check is for testing purposes and for a test that to be able to dynamically turn
// off and on delete previous versions so that it can test clean-up on read.
- if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" )) {
+ if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" ) ) {
return;
}
- if(logger.isDebugEnabled()) {
- logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} " + "scope\n name: {}\n owner: {}\n app: {}",
- new Object[] {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} "
+ + "scope\n name: {}\n owner: {}\n app: {}", new Object[] {
entityVersions.size(), entityId.getType(), entityId.getUuid(), scope.getName(), scope.getOwner(),
scope.getApplication()
} );
}
- CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf;
+ CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
final EntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
- final IndexScope indexScope = new IndexScopeImpl(
- new SimpleId(scope.getOwner().getUuid(), scope.getOwner().getType()),
- scope.getName()
- );
-
- Observable.from( entityVersions )
- .collect( ei.createBatch(), new Action2<EntityIndexBatch, MvccLogEntry>() {
- @Override
- public void call( final EntityIndexBatch entityIndexBatch, final MvccLogEntry mvccLogEntry ) {
- entityIndexBatch.deindex( indexScope, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() );
- }
- } ).doOnNext( new Action1<EntityIndexBatch>() {
- @Override
- public void call( final EntityIndexBatch entityIndexBatch ) {
+ final IndexScope indexScope =
+ new IndexScopeImpl( new SimpleId( scope.getOwner().getUuid(), scope.getOwner().getType() ),
+ scope.getName() );
+
+ //create our batch, and then collect all of them into a single batch
+ Observable.from( entityVersions ).collect( () -> ei.createBatch(), ( entityIndexBatch, mvccLogEntry ) -> {
+ entityIndexBatch.deindex( indexScope, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() );
+ } )
+ //after our batch is collected, execute it
+ .doOnNext( entityIndexBatch -> {
entityIndexBatch.execute();
- }
- } ).toBlocking().last();
+ } ).toBlocking().last();
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
index 40ad236..6531d16 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
@@ -37,6 +37,7 @@ import com.google.inject.Inject;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
+import rx.schedulers.Schedulers;
/**
@@ -63,36 +64,26 @@ public class EntityTypeMappingMigration implements DataMigration<EntityIdScope>
final AtomicLong atomicLong = new AtomicLong();
- allEntitiesInSystemObservable.getData()
- //process the entities in parallel
- .parallel( new Func1<Observable<EntityIdScope>, Observable<EntityIdScope>>() {
+ //migrate up to 100 types simultaneously
+ allEntitiesInSystemObservable.getData().flatMap( entityIdScope -> {
+ return Observable.just( entityIdScope ).doOnNext( entityIdScopeObservable -> {
+ final MapScope ms = CpNamingUtils
+ .getEntityTypeMapScope( entityIdScope.getCollectionScope().getApplication() );
+ final MapManager mapManager = managerCache.getMapManager( ms );
- @Override
- public Observable<EntityIdScope> call( final Observable<EntityIdScope> entityIdScopeObservable ) {
+ final UUID entityUuid = entityIdScope.getId().getUuid();
+ final String entityType = entityIdScope.getId().getType();
- //for each entity observable, get the map scope and write it to the map
- return entityIdScopeObservable.doOnNext( new Action1<EntityIdScope>() {
- @Override
- public void call( final EntityIdScope entityIdScope ) {
- final MapScope ms = CpNamingUtils
- .getEntityTypeMapScope( entityIdScope.getCollectionScope().getApplication() );
+ mapManager.putString( entityUuid.toString(), entityType );
- final MapManager mapManager = managerCache.getMapManager( ms );
+ if ( atomicLong.incrementAndGet() % 100 == 0 ) {
+ observer.update( getMaxVersion(),
+ String.format( "Updated %d entities", atomicLong.get() ) );
+ }
- final UUID entityUuid = entityIdScope.getId().getUuid();
- final String entityType = entityIdScope.getId().getType();
-
- mapManager.putString( entityUuid.toString(), entityType );
-
- if ( atomicLong.incrementAndGet() % 100 == 0 ) {
- observer.update( getMaxVersion(),
- String.format( "Updated %d entities", atomicLong.get() ) );
- }
- }
- } );
- }
- } ).count().toBlocking().last();
+ } ).subscribeOn( Schedulers.io() );
+ }, 100 ).count().toBlocking().last();
return getMaxVersion();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
index be7cee4..88f56c8 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
@@ -77,7 +77,7 @@ public class EntityTypeMappingMigrationIT {
final MapScope mapScope2 = new MapScopeImpl(applicationId, CpNamingUtils.TYPES_BY_UUID_MAP );
- final Observable<EntityIdScope> scopes = Observable.from(idScope1, idScope2);
+ final Observable<EntityIdScope> scopes = Observable.just(idScope1, idScope2);
final TestMigrationDataProvider<EntityIdScope> migrationDataProvider = new TestMigrationDataProvider<>();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index f565fab..70b5a3a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -235,7 +235,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
final Timer.Context timer = deleteTimer.time();
- Observable<Id> o = Observable.from(new CollectionIoEvent<Id>(collectionScope, entityId))
+ Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( collectionScope, entityId ) )
.map(markStart)
.doOnNext( markCommit )
.map(new Func1<CollectionIoEvent<MvccEntity>, Id>() {
@@ -284,7 +284,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
return Observable.empty();
}
- return Observable.from(entity.getEntity().get());
+ return Observable.just( entity.getEntity().get() );
}
})
.doOnNext( new Action1<Entity>() {
@@ -449,19 +449,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
WriteStart writeState ) {
- return Observable.from( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
+ return Observable.just( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
@Override
public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
Observable<CollectionIoEvent<MvccEntity>> unique =
- Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
+ Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
.doOnNext( writeVerifyUnique );
// optimistic verification
Observable<CollectionIoEvent<MvccEntity>> optimistic =
- Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
+ Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
.doOnNext( writeOptimisticVerify );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
index 5472645..7620907 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
@@ -127,22 +127,10 @@ public class EntityDeletedTask implements Task<Void> {
LOG.debug( "Started firing {} listeners", listenerSize );
- //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
- Observable.from(listeners)
- .parallel( new Func1<Observable<EntityDeleted>, Observable<EntityDeleted>>() {
-
- @Override
- public Observable<EntityDeleted> call(
- final Observable<EntityDeleted> entityVersionDeletedObservable ) {
-
- return entityVersionDeletedObservable.doOnNext( new Action1<EntityDeleted>() {
- @Override
- public void call( final EntityDeleted listener ) {
- listener.deleted(collectionScope, entityId, version);
- }
- } );
- }
- }, Schedulers.io() ).toBlocking().last();
+ //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time
+ Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
+ listener.deleted( collectionScope, entityId, version );
+ } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
LOG.debug( "Finished firing {} listeners", listenerSize );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index b245528..1a7b86b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -159,7 +159,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
throw new RuntimeException( "Unable to execute batch mutation", e );
}
}
- } ).subscribeOn( Schedulers.io() ).longCount().toBlocking();
+ } ).subscribeOn( Schedulers.io() ).countLong().toBlocking();
//start calling the listeners for remove log entries
@@ -201,7 +201,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
throw new RuntimeException( "Unable to execute batch mutation", e );
}
}
- } ).subscribeOn( Schedulers.io() ).longCount().toBlocking();
+ } ).subscribeOn( Schedulers.io() ).countLong().toBlocking();
//wait or this to complete
final Long removedCount = uniqueValueCleanup.last();
@@ -232,21 +232,14 @@ public class EntityVersionCleanupTask implements Task<Void> {
logger.debug( "Started firing {} listeners", listenerSize );
//if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
- Observable.from( listeners )
- .parallel( new Func1<Observable<EntityVersionDeleted>, Observable<EntityVersionDeleted>>() {
-
- @Override
- public Observable<EntityVersionDeleted> call(
- final Observable<EntityVersionDeleted> entityVersionDeletedObservable ) {
-
- return entityVersionDeletedObservable.doOnNext( new Action1<EntityVersionDeleted>() {
- @Override
- public void call( final EntityVersionDeleted listener ) {
- listener.versionDeleted( scope, entityId, versions );
- }
- } );
- }
- }, Schedulers.io() ).toBlocking().last();
+
+
+ //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time
+ Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
+ listener.versionDeleted( scope, entityId, versions );
+ } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
+
+
logger.debug( "Finished firing {} listeners", listenerSize );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
index 7d3beb1..16a6e77 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
@@ -64,7 +64,7 @@ public class EntityVersionCreatedTask implements Task<Void> {
@Override
public Void rejected() {
- // Our task was rejected meaning our queue was full.
+ // Our task was rejected meaning our queue was full.
// We need this operation to run, so we'll run it in our current thread
try {
call();
@@ -76,7 +76,7 @@ public class EntityVersionCreatedTask implements Task<Void> {
return null;
}
-
+
@Override
public Void call() throws Exception {
@@ -100,22 +100,12 @@ public class EntityVersionCreatedTask implements Task<Void> {
logger.debug( "Started firing {} listeners", listenerSize );
- //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
- Observable.from(listeners).parallel(
- new Func1<Observable<EntityVersionCreated>, Observable<EntityVersionCreated>>() {
-
- @Override
- public Observable<EntityVersionCreated> call(
- final Observable<EntityVersionCreated> entityVersionCreatedObservable ) {
-
- return entityVersionCreatedObservable.doOnNext( new Action1<EntityVersionCreated>() {
- @Override
- public void call( final EntityVersionCreated listener ) {
- listener.versionCreated(collectionScope,entity);
- }
- } );
- }
- }, Schedulers.io() ).toBlocking().last();
+
+ Observable.from( listeners )
+ .flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
+ listener.versionCreated( collectionScope, entity );
+ } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
+
logger.debug( "Finished firing {} listeners", listenerSize );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
index e1445e3..ad1d91a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
@@ -176,77 +176,52 @@ public abstract class MvccEntitySerializationStrategyImpl implements MvccEntityS
}
- final EntitySetImpl entitySetResults = Observable.from( rowKeys )
- //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary)
- .buffer(entitiesPerRequest )
- .parallel( new Func1<Observable<List<ScopedRowKey
- <CollectionPrefixedKey<Id>>>>, Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>>>() {
+ final EntitySetImpl entitySetResults = Observable.from( rowKeys )
+ //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary)
+ .buffer( entitiesPerRequest ).flatMap( listObservable -> {
- @Override
- public Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> call(
- final Observable<List<ScopedRowKey<CollectionPrefixedKey<Id>>>> listObservable ) {
-
-
- //here, we execute our query then emit the items either in parallel, or on the current thread if we have more than 1 request
- return listObservable.map( new Func1<List<ScopedRowKey<CollectionPrefixedKey<Id>>>,
- Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>>() {
-
+ //here, we execute our query then emit the items either in parallel, or on the current thread
+ // if we have more than 1 request
+ return Observable.just( listObservable ).map( scopedRowKeys -> {
- @Override
- public Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> call(
- final List<ScopedRowKey<CollectionPrefixedKey<Id>>> scopedRowKeys ) {
-
- try {
- return keyspace.prepareQuery( columnFamily ).getKeySlice( rowKeys )
- .withColumnRange( maxVersion, null, false,
- 1 ).execute().getResult();
- }
- catch ( ConnectionException e ) {
- throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra",
- e );
- }
+ try {
+ return keyspace.prepareQuery( columnFamily ).getKeySlice( rowKeys )
+ .withColumnRange( maxVersion, null, false, 1 ).execute().getResult();
}
- } );
-
-
-
- }
- }, scheduler )
-
- //reduce all the output into a single Entity set
- .reduce( new EntitySetImpl( entityIds.size() ),
- new Func2<EntitySetImpl, Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>, EntitySetImpl>() {
- @Override
- public EntitySetImpl call( final EntitySetImpl entitySet,
- final Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> rows ) {
-
- final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> latestEntityColumns = rows.iterator();
+ catch ( ConnectionException e ) {
+ throw new CollectionRuntimeException( null, collectionScope,
+ "An error occurred connecting to cassandra", e );
+ }
+ } ).subscribeOn( scheduler );
+ }, 10 )
- while ( latestEntityColumns.hasNext() ) {
- final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> row = latestEntityColumns.next();
+ .reduce( new EntitySetImpl( entityIds.size() ), ( entitySet, rows ) -> {
+ final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> latestEntityColumns =
+ rows.iterator();
- final ColumnList<UUID> columns = row.getColumns();
+ while ( latestEntityColumns.hasNext() ) {
+ final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> row = latestEntityColumns.next();
- if ( columns.size() == 0 ) {
- continue;
- }
+ final ColumnList<UUID> columns = row.getColumns();
- final Id entityId = row.getKey().getKey().getSubKey();
+ if ( columns.size() == 0 ) {
+ continue;
+ }
- final Column<UUID> column = columns.getColumnByIndex( 0 );
+ final Id entityId = row.getKey().getKey().getSubKey();
- final MvccEntity parsedEntity =
- new MvccColumnParser( entityId, getEntitySerializer() ).parseColumn( column );
+ final Column<UUID> column = columns.getColumnByIndex( 0 );
- entitySet.addEntity( parsedEntity );
- }
+ final MvccEntity parsedEntity =
+ new MvccColumnParser( entityId, getEntitySerializer() ).parseColumn( column );
+ entitySet.addEntity( parsedEntity );
+ }
- return entitySet;
- }
- } ).toBlocking().last();
+ return entitySet;
+ } ).toBlocking().last();
return entitySetResults;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
index a5046f6..de959b5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
@@ -185,82 +185,55 @@ public class MvccEntitySerializationStrategyV3Impl implements MvccEntitySerializ
final EntitySetImpl entitySetResults = Observable.from( rowKeys )
- //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary)
- .buffer( entitiesPerRequest ).parallel(
- new Func1<Observable<List<ScopedRowKey<CollectionPrefixedKey<Id>>>>,
- Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>>>() {
+ //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary)
+ .buffer( entitiesPerRequest ).flatMap( listObservable -> {
- @Override
- public Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>> call(
- final Observable<List<ScopedRowKey<CollectionPrefixedKey<Id>>>> listObservable ) {
+ //here, we execute our query then emit the items either in parallel, or on the current thread
+ // if we have more than 1 request
+ return Observable.just( listObservable ).map( scopedRowKeys -> {
- //here, we execute our query then emit
- // the items either in parallel, or on
- // the current thread if we have more
- // than 1 request
- return listObservable
- .map( new Func1<List<ScopedRowKey<CollectionPrefixedKey<Id>>>,
- Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>>() {
+ try {
+ return keyspace.prepareQuery( CF_ENTITY_DATA ).getKeySlice( rowKeys )
+ .withColumnSlice( COL_VALUE ).execute().getResult();
+ }
+ catch ( ConnectionException e ) {
+ throw new CollectionRuntimeException( null, collectionScope,
+ "An error occurred connecting to cassandra", e );
+ }
+ } ).subscribeOn( scheduler );
+ }, 10 )
+ .reduce( new EntitySetImpl( entityIds.size() ), ( entitySet, rows ) -> {
+ final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>> latestEntityColumns =
+ rows.iterator();
- @Override
- public Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean> call(
- final List<ScopedRowKey<CollectionPrefixedKey<Id>>> scopedRowKeys
- ) {
+ while ( latestEntityColumns.hasNext() ) {
+ final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean> row = latestEntityColumns.next();
- try {
- return keyspace.prepareQuery( CF_ENTITY_DATA )
- .getKeySlice( rowKeys )
- .withColumnSlice( COL_VALUE )
- .execute().getResult();
- }
- catch ( ConnectionException e ) {
- throw new CollectionRuntimeException( null, collectionScope,
- "An error occurred connecting to cassandra", e );
- }
- }
- } );
- }
- }, scheduler )
+ final ColumnList<Boolean> columns = row.getColumns();
- //reduce all the output into a single Entity set
- .reduce( new EntitySetImpl( entityIds.size() ),
- new Func2<EntitySetImpl, Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>, EntitySetImpl>() {
- @Override
- public EntitySetImpl call( final EntitySetImpl entitySet,
- final Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean> rows
- ) {
+ if ( columns.size() == 0 ) {
+ continue;
+ }
- final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>> latestEntityColumns =
- rows.iterator();
+ final Id entityId = row.getKey().getKey().getSubKey();
- while ( latestEntityColumns.hasNext() ) {
- final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean> row =
- latestEntityColumns.next();
+ final Column<Boolean> column = columns.getColumnByIndex( 0 );
- final ColumnList<Boolean> columns = row.getColumns();
+ final MvccEntity parsedEntity =
+ new MvccColumnParser( entityId, entitySerializer ).parseColumn( column );
- if ( columns.size() == 0 ) {
- continue;
- }
- final Id entityId = row.getKey().getKey().getSubKey();
+ entitySet.addEntity( parsedEntity );
+ }
- final Column<Boolean> column = columns.getColumnByIndex( 0 );
- final MvccEntity parsedEntity =
- new MvccColumnParser( entityId, entitySerializer ).parseColumn( column );
+ return entitySet;
+ } ).toBlocking().last();
- entitySet.addEntity( parsedEntity );
- }
-
-
- return entitySet;
- }
- } ).toBlocking().last();
return entitySetResults;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index f87b5fd..6982857 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -124,130 +124,107 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
final Observable<List<EntityToSaveMessage>> migrated =
- migrationDataProvider.getData().subscribeOn( Schedulers.io() ).parallel(
- new Func1<Observable<EntityIdScope>, Observable<List<EntityToSaveMessage>>>() {
+ migrationDataProvider.getData().subscribeOn( Schedulers.io() ).flatMap( entityToSaveList -> Observable.just( entityToSaveList ).flatMap( entityIdScope -> {
+ //load the entity
+ final CollectionScope currentScope = entityIdScope.getCollectionScope();
- //process the ids in parallel
- @Override
- public Observable<List<EntityToSaveMessage>> call(
- final Observable<EntityIdScope> entityIdScopeObservable ) {
-
-
- return entityIdScopeObservable.flatMap(
- new Func1<EntityIdScope, Observable<EntityToSaveMessage>>() {
-
-
- @Override
- public Observable<EntityToSaveMessage> call( final EntityIdScope entityIdScope ) {
- //load the entity
- final CollectionScope currentScope = entityIdScope.getCollectionScope();
+ //for each element in our
+ // history, we need to copy it
+ // to v2.
+ // Note that
+ // this migration
+ //won't support anything beyond V2
+ final Iterator<MvccEntity> allVersions =
+ migration.from.loadAscendingHistory( currentScope, entityIdScope.getId(), startTime, 100 );
- //for each element in our
- // history, we need to copy it
- // to v2.
- // Note that
- // this migration
- //won't support anything beyond V2
-
- final Iterator<MvccEntity> allVersions = migration.from
- .loadAscendingHistory( currentScope, entityIdScope.getId(), startTime, 100 );
+ //emit all the entity versions
+ return Observable.create( new Observable.OnSubscribe<EntityToSaveMessage>() {
+ @Override
+ public void call( final Subscriber<? super
+ EntityToSaveMessage> subscriber ) {
- //emit all the entity versions
- return Observable.create( new Observable.OnSubscribe<EntityToSaveMessage>() {
- @Override
- public void call( final Subscriber<? super
- EntityToSaveMessage> subscriber ) {
+ while ( allVersions.hasNext() ) {
+ final EntityToSaveMessage message =
+ new EntityToSaveMessage( currentScope, allVersions.next() );
+ subscriber.onNext( message );
+ }
- while ( allVersions.hasNext() ) {
- final EntityToSaveMessage message = new EntityToSaveMessage( currentScope, allVersions.next() );
- subscriber.onNext( message );
- }
+ subscriber.onCompleted();
+ }
+ } ).buffer( 100 ).doOnNext( entities -> {
- subscriber.onCompleted();
- }
- } );
- }
- } )
- //buffer 10 versions
- .buffer( 100 ).doOnNext( new Action1<List<EntityToSaveMessage>>() {
- @Override
- public void call( final List<EntityToSaveMessage> entities ) {
+ final MutationBatch totalBatch = keyspace.prepareMutationBatch();
- final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+ atomicLong.addAndGet( entities.size() );
- atomicLong.addAndGet( entities.size() );
+ List<EntityVersionCleanupTask> entityVersionCleanupTasks = new ArrayList( entities.size() );
- List<EntityVersionCleanupTask> entityVersionCleanupTasks = new ArrayList(entities.size());
+ for ( EntityToSaveMessage message : entities ) {
+ final MutationBatch entityRewrite = migration.to.write( message.scope, message.entity );
- for ( EntityToSaveMessage message : entities ) {
- final MutationBatch entityRewrite =
- migration.to.write( message.scope, message.entity );
+ //add to
+ // the
+ // total
+ // batch
+ totalBatch.mergeShallow( entityRewrite );
- //add to
- // the
- // total
- // batch
- totalBatch.mergeShallow( entityRewrite );
+ //write
+ // the
+ // unique values
- //write
- // the
- // unique values
+ if ( !message.entity.getEntity().isPresent() ) {
+ return;
+ }
- if ( !message.entity.getEntity().isPresent() ) {
- return;
- }
+ final Entity entity = message.entity.getEntity().get();
- final Entity entity = message.entity.getEntity().get();
+ final Id entityId = entity.getId();
- final Id entityId = entity.getId();
+ final UUID version = message.entity.getVersion();
- final UUID version = message.entity.getVersion();
+ // re-write the unique
+ // values
+ // but this
+ // time with
+ // no TTL so that cleanup can clean up
+ // older values
+ for ( Field field : EntityUtils.getUniqueFields( message.entity.getEntity().get() ) ) {
- // re-write the unique
- // values
- // but this
- // time with
- // no TTL so that cleanup can clean up
- // older values
- for ( Field field : EntityUtils
- .getUniqueFields( message.entity.getEntity().get() ) ) {
+ UniqueValue written = new UniqueValueImpl( field, entityId, version );
- UniqueValue written = new UniqueValueImpl( field, entityId, version );
+ MutationBatch mb = uniqueValueSerializationStrategy.write( message.scope, written );
- MutationBatch mb =
- uniqueValueSerializationStrategy.write( message.scope, written );
+ // merge into our
+ // existing mutation
+ // batch
+ totalBatch.mergeShallow( mb );
+ }
- // merge into our
- // existing mutation
- // batch
- totalBatch.mergeShallow( mb );
- }
+ final EntityVersionCleanupTask task = entityVersionCleanupFactory
+ .getCleanupTask( message.scope, message.entity.getId(), version, false );
- final EntityVersionCleanupTask task = entityVersionCleanupFactory.getCleanupTask( message.scope, message.entity.getId(), version, false );
+ entityVersionCleanupTasks.add( task );
+ }
- entityVersionCleanupTasks.add( task );
- }
+ executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong );
- executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong );
+ //now run our cleanup task
- //now run our cleanup task
+ for ( EntityVersionCleanupTask entityVersionCleanupTask : entityVersionCleanupTasks ) {
+ try {
+ entityVersionCleanupTask.call();
+ }
+ catch ( Exception e ) {
+ LOGGER.error( "Unable to run cleanup task", e );
+ }
+ }
+ } ).subscribeOn( Schedulers.io() );
- for(EntityVersionCleanupTask entityVersionCleanupTask: entityVersionCleanupTasks){
- try {
- entityVersionCleanupTask.call();
- }
- catch ( Exception e ) {
- LOGGER.error( "Unable to run cleanup task", e );
- }
- }
- }
- } );
- }
- } );
+ }, 10) );
migrated.toBlocking().lastOrDefault(null);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
index 2d416a4..a49e533 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
@@ -64,7 +64,7 @@ public class ParallelTest {
final int expected = size - 1;
- // QUESTION Using this thread blocks indefinitely. The execution of the Hystrix command
+ // QUESTION Using this thread blocks indefinitely. The execution of the Hystrix command
// happens on the computation Thread if this is used
// final Scheduler scheduler = Schedulers.threadPoolForComputation();
@@ -90,7 +90,7 @@ public class ParallelTest {
* non blocking?
*/
- final Observable<String> observable = Observable.from( input ).observeOn( Schedulers.io() );
+ final Observable<String> observable = Observable.just( input ).observeOn( Schedulers.io() );
Observable<Integer> thing = observable.flatMap( new Func1<String, Observable<Integer>>() {
@@ -99,7 +99,7 @@ public class ParallelTest {
public Observable<Integer> call( final String s ) {
List<Observable<Integer>> functions = new ArrayList<Observable<Integer>>();
- logger.info( "Creating new set of observables in thread {}",
+ logger.info( "Creating new set of observables in thread {}",
Thread.currentThread().getName() );
for ( int i = 0; i < size; i++ ) {
@@ -107,13 +107,13 @@ public class ParallelTest {
final int index = i;
- // create a new observable and execute the function on it.
+ // create a new observable and execute the function on it.
// These should happen in parallel when a subscription occurs
/**
* QUESTION: Should this again be the process thread, not the I/O
*/
- Observable<String> newObservable = Observable.from( input ).subscribeOn( Schedulers.io() );
+ Observable<String> newObservable = Observable.just( input ).subscribeOn( Schedulers.io() );
Observable<Integer> transformed = newObservable.map( new Func1<String, Integer>() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
index 747ea7b..9938caf 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
@@ -119,7 +119,7 @@ public abstract class AbstractMvccEntityDataMigrationV1ToV3ImplTest implements D
assertEquals( "Same entity", entity2, returned2 );
final Observable<EntityIdScope> entityIdScope =
- Observable.from( new EntityIdScope( scope, entity1.getId() ), new EntityIdScope( scope, entity2.getId() ) );
+ Observable.just( new EntityIdScope( scope, entity1.getId() ), new EntityIdScope( scope, entity2.getId() ) );
final MigrationDataProvider<EntityIdScope> migrationProvider = new MigrationDataProvider<EntityIdScope>() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/common/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/pom.xml b/stack/corepersistence/common/pom.xml
index 82df1d8..2be1c1a 100644
--- a/stack/corepersistence/common/pom.xml
+++ b/stack/corepersistence/common/pom.xml
@@ -101,15 +101,16 @@
<!-- RX java -->
+ <dependency>
+ <groupId>io.reactivex</groupId>
+ <artifactId>rxjava</artifactId>
+ <version>${rx.version}</version>
+ </dependency>
+
<dependency>
- <groupId>com.netflix.rxjava</groupId>
- <artifactId>rxjava-core</artifactId>
- <version>${rx.version}</version>
- </dependency>
- <dependency>
- <groupId>com.netflix.rxjava</groupId>
+ <groupId>io.reactivex</groupId>
<artifactId>rxjava-math</artifactId>
- <version>${rx.version}</version>
+ <version>1.0.0</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
index 15f9aab..23661ee 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
@@ -77,7 +77,9 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
for ( ColumnNameIterator<C, T> columnNameIterator : columnNameIterators ) {
- observables[i] = Observable.from( columnNameIterator, Schedulers.io() );
+
+
+ observables[i] = Observable.from( columnNameIterator ).subscribeOn( Schedulers.io() );
i++;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
index f4f6f9c..3c56763 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
@@ -52,6 +52,7 @@ import com.netflix.astyanax.util.RangeBuilder;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
+import rx.schedulers.Schedulers;
import static org.junit.Assert.assertEquals;
@@ -125,100 +126,95 @@ public class MultiKeyColumnNameIteratorTest {
final long maxValue = 10000;
+
/**
* Write to both rows in parallel
*/
Observable.from( new String[] { rowKey1, rowKey2, rowKey3 } )
- .parallel( new Func1<Observable<String>, Observable<String>>() {
- @Override
- public Observable<String> call( final Observable<String> stringObservable ) {
- return stringObservable.doOnNext( new Action1<String>() {
- @Override
- public void call( final String key ) {
-
- final MutationBatch batch = keyspace.prepareMutationBatch();
-
- for ( long i = 0; i < maxValue; i++ ) {
- batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
-
- if ( i % 1000 == 0 ) {
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( e );
- }
- }
- }
-
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( e );
- }
+ //perform a flatmap
+ .flatMap( stringObservable -> Observable.just( stringObservable ).doOnNext( key -> {
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ for ( long i = 0; i < maxValue; i++ ) {
+ batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
+
+ if ( i % 1000 == 0 ) {
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( e );
}
- } );
+ }
}
- } ).toBlocking().last();
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( e );
+ }
+ } ).subscribeOn( Schedulers.io() ) ).toBlocking().last();
- //create 3 iterators
- ColumnNameIterator<Long, Long> row1Iterator = createIterator( rowKey1, false );
- ColumnNameIterator<Long, Long> row2Iterator = createIterator( rowKey2, false );
- ColumnNameIterator<Long, Long> row3Iterator = createIterator( rowKey3, false );
- final Comparator<Long> ascendingComparator = new Comparator<Long>() {
- @Override
- public int compare( final Long o1, final Long o2 ) {
- return Long.compare( o1, o2 );
- }
- };
+ //create 3 iterators
- /**
- * Again, arbitrary buffer size to attempt we buffer at some point
- */
- final MultiKeyColumnNameIterator<Long, Long> ascendingItr =
+ ColumnNameIterator<Long, Long> row1Iterator = createIterator( rowKey1, false );
+ ColumnNameIterator<Long, Long> row2Iterator = createIterator( rowKey2, false );
+ ColumnNameIterator<Long, Long> row3Iterator = createIterator( rowKey3, false );
+
+ final Comparator<Long> ascendingComparator = new Comparator<Long>() {
+
+ @Override
+ public int compare( final Long o1, final Long o2 ) {
+ return Long.compare( o1, o2 );
+ }
+ };
+
+ /**
+ * Again, arbitrary buffer size to attempt we buffer at some point
+ */
+ final MultiKeyColumnNameIterator<Long, Long> ascendingItr =
new MultiKeyColumnNameIterator<>( Arrays.asList( row1Iterator, row2Iterator, row3Iterator ),
- ascendingComparator, 900 );
+ ascendingComparator, 900 );
- //ensure we have to make several trips, purposefully set to a nonsensical value to ensure we make all the
- // trips required
+ //ensure we have to make several trips, purposefully set to a nonsensical value to ensure we make all the
+ // trips required
- for ( long i = 0; i < maxValue; i++ ) {
- assertEquals( i, ascendingItr.next().longValue() );
- }
+ for ( long i = 0; i < maxValue; i++ ) {
+ assertEquals( i, ascendingItr.next().longValue() );
+ }
- //now test it in reverse
+ //now test it in reverse
- ColumnNameIterator<Long, Long> row1IteratorDesc = createIterator( rowKey1, true );
- ColumnNameIterator<Long, Long> row2IteratorDesc = createIterator( rowKey2, true );
- ColumnNameIterator<Long, Long> row3IteratorDesc = createIterator( rowKey3, true );
+ ColumnNameIterator<Long, Long> row1IteratorDesc = createIterator( rowKey1, true );
+ ColumnNameIterator<Long, Long> row2IteratorDesc = createIterator( rowKey2, true );
+ ColumnNameIterator<Long, Long> row3IteratorDesc = createIterator( rowKey3, true );
- final Comparator<Long> descendingComparator = new Comparator<Long>() {
+ final Comparator<Long> descendingComparator = new Comparator<Long>() {
- @Override
- public int compare( final Long o1, final Long o2 ) {
- return ascendingComparator.compare( o1, o2 ) * -1;
- }
- };
+ @Override
+ public int compare( final Long o1, final Long o2 ) {
+ return ascendingComparator.compare( o1, o2 ) * -1;
+ }
+ };
- /**
- * Again, arbitrary buffer size to attempt we buffer at some point
- */
- final MultiKeyColumnNameIterator<Long, Long> descendingItr =
+ /**
+ * Again, arbitrary buffer size to attempt we buffer at some point
+ */
+ final MultiKeyColumnNameIterator<Long, Long> descendingItr =
new MultiKeyColumnNameIterator<>( Arrays.asList( row1IteratorDesc, row2IteratorDesc, row3IteratorDesc ),
- descendingComparator, 900 );
+ descendingComparator, 900 );
- for ( long i = maxValue - 1; i > -1; i-- ) {
- assertEquals( i, descendingItr.next().longValue() );
+ for ( long i = maxValue - 1; i > -1; i-- ) {
+ assertEquals( i, descendingItr.next().longValue() );
+ }
}
- }
@Test
@@ -233,39 +229,28 @@ public class MultiKeyColumnNameIteratorTest {
/**
* Write to both rows in parallel
*/
- Observable.just( rowKey1 )
- .parallel( new Func1<Observable<String>, Observable<String>>() {
- @Override
- public Observable<String> call( final Observable<String> stringObservable ) {
- return stringObservable.doOnNext( new Action1<String>() {
- @Override
- public void call( final String key ) {
-
- final MutationBatch batch = keyspace.prepareMutationBatch();
-
- for ( long i = 0; i < maxValue; i++ ) {
- batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
-
- if ( i % 1000 == 0 ) {
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( e );
- }
- }
- }
-
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( e );
- }
- }
- } );
- }
- } ).toBlocking().last();
+ Observable.just( rowKey1 ).flatMap( rowKey -> Observable.just( rowKey ).doOnNext( key -> {
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ for ( long i = 0; i < maxValue; i++ ) {
+ batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
+
+ if ( i % 1000 == 0 ) {
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( e );
+ }
+ }
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( e );
+ }} ).subscribeOn( Schedulers.io() ) ).toBlocking().last();
//create 3 iterators
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
index c32b820..d88ebe5 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
@@ -54,6 +54,7 @@ import rx.Observable;
import rx.Observer;
import rx.functions.Action1;
import rx.functions.Func1;
+import rx.schedulers.Schedulers;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -373,38 +374,29 @@ public class MultiRowColumnIteratorTest {
/**
* Write to both rows in parallel
*/
- Observable.just( rowKey1 ).parallel( new Func1<Observable<String>, Observable<String>>() {
- @Override
- public Observable<String> call( final Observable<String> stringObservable ) {
- return stringObservable.doOnNext( new Action1<String>() {
- @Override
- public void call( final String key ) {
-
- final MutationBatch batch = keyspace.prepareMutationBatch();
-
- for ( long i = 0; i < maxValue; i++ ) {
- batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
-
- if ( i % 1000 == 0 ) {
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( e );
- }
- }
- }
+ Observable.just( rowKey1 ).flatMap( rowKey -> Observable.just( rowKey ).doOnNext( key -> {
+ final MutationBatch batch = keyspace.prepareMutationBatch();
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( e );
- }
+ for ( long i = 0; i < maxValue; i++ ) {
+ batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
+
+ if ( i % 1000 == 0 ) {
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( e );
}
- } );
+ }
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( e );
}
- } ).toBlocking().last();
+ } ).subscribeOn( Schedulers.io() ) ).toBlocking().last();
//create 3 iterators