You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/20 17:51:50 UTC
[04/12] incubator-usergrid git commit: First pass at upgrading to
java 8 and latest RX java
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 26d06ad..ef258f4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -188,7 +188,7 @@ public class GraphManagerImpl implements GraphManager {
final Timer.Context timer = writeEdgeTimer.time();
final Meter meter = writeEdgeMeter;
- return Observable.from( markedEdge ).map( new Func1<MarkedEdge, Edge>() {
+ return Observable.just( markedEdge ).map( new Func1<MarkedEdge, Edge>() {
@Override
public Edge call( final MarkedEdge edge ) {
@@ -234,7 +234,7 @@ public class GraphManagerImpl implements GraphManager {
final Timer.Context timer = deleteEdgeTimer.time();
final Meter meter = deleteEdgeMeter;
- return Observable.from(markedEdge).map(new Func1<MarkedEdge, Edge>() {
+ return Observable.just(markedEdge).map(new Func1<MarkedEdge, Edge>() {
@Override
public Edge call(final MarkedEdge edge) {
@@ -281,7 +281,7 @@ public class GraphManagerImpl implements GraphManager {
public Observable<Id> deleteNode( final Id node, final long timestamp ) {
final Timer.Context timer = deleteNodeTimer.time();
final Meter meter = deleteNodeMeter;
- return Observable.from( node ).map( new Func1<Id, Id>() {
+ return Observable.just( node ).map( new Func1<Id, Id>() {
@Override
public Id call( final Id id ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index ab141f7..bfaeaaa 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -176,6 +176,8 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
* Sum up the total number of edges we had, then execute the mutation if we have
* anything to do
*/
+
+
return MathObservable.sumInteger( Observable.merge( checks ) )
.doOnNext( new Action1<Integer>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index e8c224e..6236a16 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -103,7 +103,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
public Observable<Integer> receive( final ApplicationScope scope, final Id node, final UUID timestamp ) {
- return Observable.from( node )
+ return Observable.just( node )
//delete source and targets in parallel and merge them into a single observable
.flatMap( new Func1<Id, Observable<Integer>>() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
index 2d9b47f..ecb9a9b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
+import rx.schedulers.Schedulers;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -75,61 +76,49 @@ public class EdgeDataMigrationImpl implements DataMigration<GraphNode> {
}
-
-
@Override
- public int migrate( final int currentVersion, final MigrationDataProvider<GraphNode> migrationDataProvider,
- final ProgressObserver observer ) {
+ public int migrate( final int currentVersion, final MigrationDataProvider<GraphNode> migrationDataProvider,
+ final ProgressObserver observer ) {
final AtomicLong counter = new AtomicLong();
- final MigrationRelationship<EdgeMetadataSerialization>
- migration = allVersions.getMigrationRelationship( currentVersion );
-
- final Observable<List<Edge>> observable = migrationDataProvider.getData().flatMap( new Func1<GraphNode,
- Observable<List<Edge>>>() {
- @Override
- public Observable<List<Edge>> call( final GraphNode graphNode ) {
- final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope );
-
- //get edges from the source
- return edgesFromSourceObservable.edgesFromSource( gm, graphNode.entryNode ).buffer( 1000 ).parallel( new Func1<Observable<List<Edge>>, Observable<List<Edge>>>() {
- @Override
- public Observable<List<Edge>> call( final Observable<List<Edge>> listObservable ) {
- return listObservable.doOnNext( new Action1<List<Edge>>() {
- @Override
- public void call( List<Edge> edges ) {
- final MutationBatch batch = keyspace.prepareMutationBatch();
-
- for ( Edge edge : edges ) {
- logger.info( "Migrating meta for edge {}", edge );
- final MutationBatch edgeBatch =
- migration.to.writeEdge( graphNode.applicationScope, edge );
- batch.mergeShallow( edgeBatch );
- }
-
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to perform migration", e );
- }
-
- //update the observer so the admin can see it
- final long newCount = counter.addAndGet( edges.size() );
-
- observer.update( migration.to.getImplementationVersion(),
- String.format( "Currently running. Rewritten %d edge types",
- newCount ) );
- }
- } );
- } } );
- }} );
-
- observable.longCount().toBlocking().last();
+ final MigrationRelationship<EdgeMetadataSerialization> migration =
+ allVersions.getMigrationRelationship( currentVersion );
- return migration.to.getImplementationVersion();
+ final Observable<List<Edge>> observable = migrationDataProvider.getData().flatMap( graphNode -> {
+ final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope );
+
+ //get edges from the source
+ return edgesFromSourceObservable.edgesFromSource( gm, graphNode.entryNode ).buffer( 1000 )
+ .doOnNext( edges -> {
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ for ( Edge edge : edges ) {
+ logger.info( "Migrating meta for edge {}", edge );
+ final MutationBatch edgeBatch =
+ migration.to.writeEdge( graphNode.applicationScope, edge );
+ batch.mergeShallow( edgeBatch );
+ }
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to perform migration", e );
+ }
+
+ //update the observer so the admin can see it
+ final long newCount = counter.addAndGet( edges.size() );
+
+ observer.update( migration.to.getImplementationVersion(), String
+ .format( "Currently running. Rewritten %d edge types",
+ newCount ) );
+ } ).subscribeOn( Schedulers.io() );
+ }, 10 );
+
+ observable.countLong().toBlocking().last();
+
+ return migration.to.getImplementationVersion();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 6d30d22..3bbf3e4 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -516,7 +516,7 @@ public class GraphManagerShardConsistencyIT {
}
} )
- .longCount().toBlocking().last();
+ .countLong().toBlocking().last();
// if(returnedEdgeCount != count[0]-duplicate[0]){
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java
index 0a27a6b..7b3fafd 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java
@@ -63,23 +63,23 @@ public class SimpleTest {
Edge testTargetEdge = createEdge( sourceId1, "test", targetId1, System.currentTimeMillis() );
- gm.writeEdge( testTargetEdge ).toBlockingObservable().singleOrDefault( null );
+ gm.writeEdge( testTargetEdge ).toBlocking().singleOrDefault( null );
Edge testTarget2Edge = createEdge( sourceId2, "edgeType1", targetId1, System.currentTimeMillis() );
- gm.writeEdge( testTarget2Edge ).toBlockingObservable().singleOrDefault( null );
+ gm.writeEdge( testTarget2Edge ).toBlocking().singleOrDefault( null );
Edge test2TargetEdge = createEdge( sourceId1, "edgeType1", targetId1, System.currentTimeMillis() );
- gm.writeEdge( test2TargetEdge ).toBlockingObservable().singleOrDefault( null );
+ gm.writeEdge( test2TargetEdge ).toBlocking().singleOrDefault( null );
Edge test3TargetEdge = createEdge( sourceId1, "edgeType2", targetId1, System.currentTimeMillis() );
- gm.writeEdge( test3TargetEdge ).toBlockingObservable().singleOrDefault( null );
+ gm.writeEdge( test3TargetEdge ).toBlocking().singleOrDefault( null );
int count = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(targetId1, null, null) )
- .count().toBlockingObservable().last();
+ .count().toBlocking().last();
assertEquals( 3, count );
count = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(targetId1, "edgeType", null) )
- .count().toBlockingObservable().last();
+ .count().toBlocking().last();
assertEquals( 2, count );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java
index a269c15..049c3d2 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java
@@ -121,7 +121,7 @@ public class EdgeDataMigrationImplTest implements DataMigrationResetRule.DataMig
//walk from s1 and s2
- final Observable<GraphNode> graphNodes = Observable.from( new GraphNode( applicationScope, sourceId1), new GraphNode(applicationScope, sourceId2 ) );
+ final Observable<GraphNode> graphNodes = Observable.just( new GraphNode( applicationScope, sourceId1), new GraphNode(applicationScope, sourceId2 ) );
final MigrationDataProvider<GraphNode> testMigrationProvider = new MigrationDataProvider<GraphNode>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 9656e2d..3ec7852 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -47,8 +47,8 @@ limitations under the License.
<properties>
- <maven.compiler.source>1.7</maven.compiler.source>
- <maven.compiler.target>1.7</maven.compiler.target>
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
<antlr.version>3.4</antlr.version>
<archaius.version>0.5.12</archaius.version>
@@ -64,14 +64,14 @@ limitations under the License.
<guava.version>18.0</guava.version>
<guice.version>4.0-beta5</guice.version>
<guicyfig.version>3.2</guicyfig.version>
- <hystrix.version>1.3.16</hystrix.version>
+ <hystrix.version>1.4.0</hystrix.version>
<jackson-2-version>2.4.1</jackson-2-version>
<jackson-smile.verson>2.4.3</jackson-smile.verson>
<mockito.version>1.10.8</mockito.version>
<junit.version>4.11</junit.version>
<kryo-serializers.version>0.26</kryo-serializers.version>
<log4j.version>1.2.17</log4j.version>
- <rx.version>0.19.6</rx.version>
+ <rx.version>1.0.8</rx.version>
<slf4j.version>1.7.2</slf4j.version>
<surefire.version>2.16</surefire.version>
<aws.version>1.9.0</aws.version>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
index c962d6b..82af950 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.UUID;
import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
@@ -32,7 +33,6 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.core.test.UseModules;
import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexScope;
import org.apache.usergrid.persistence.index.guice.IndexTestFig;
@@ -41,13 +41,11 @@ import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.field.IntegerField;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.google.inject.Inject;
import rx.Observable;
import rx.functions.Action1;
-import rx.functions.Action2;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@ -57,6 +55,7 @@ import rx.schedulers.Schedulers;
*/
@RunWith( EsRunner.class )
@UseModules( { TestIndexModule.class } )
+@Ignore( "Should only be run during load tests of elasticsearch" )
public class IndexLoadTestsIT extends BaseIT {
private static final Logger log = LoggerFactory.getLogger( IndexLoadTestsIT.class );
@@ -70,13 +69,14 @@ public class IndexLoadTestsIT extends BaseIT {
@Inject
public EntityIndexFactory entityIndexFactory;
+
@Test
- public void testHeavyLoad(){
+ public void testHeavyLoad() {
final UUID applicationUUID = UUID.fromString( indexTestFig.getApplicationId() );
- final Id applicationId = new SimpleId(applicationUUID, "application");
- final ApplicationScope scope = new ApplicationScopeImpl( applicationId );
+ final Id applicationId = new SimpleId( applicationUUID, "application" );
+ final ApplicationScope scope = new ApplicationScopeImpl( applicationId );
final EntityIndex index = entityIndexFactory.createEntityIndex( scope );
@@ -87,83 +87,52 @@ public class IndexLoadTestsIT extends BaseIT {
//run them all
createEntities.toBlocking().last();
-
-
-
-
}
- public Observable<Entity> createStreamFromWorkers(final EntityIndex entityIndex, final Id ownerId){
-
- //create a sequence of observables. Each index will be it's own worker thread using the Schedulers.newthread()
- return Observable.range( 0, indexTestFig.getNumberOfWorkers() ).parallel( new Func1<Observable<Integer>, Observable<Entity>>() {
-
- @Override
- public Observable<Entity> call( final Observable<Integer> integerObservable ) {
- return integerObservable.flatMap( new Func1<Integer, Observable<Entity>>() {
- @Override
- public Observable<Entity> call( final Integer integer ) {
- return createWriteObservable( entityIndex, ownerId, integer );
- }
- } );
+ public Observable<Entity> createStreamFromWorkers( final EntityIndex entityIndex, final Id ownerId ) {
- }
- }, Schedulers.newThread() );
+ //create a sequence of observables. Each index will be it's own worker thread using the Schedulers.newthread()
+ return Observable.range( 0, indexTestFig.getNumberOfWorkers() ).flatMap(
+ integer -> createWriteObservable( entityIndex, ownerId, integer ).subscribeOn( Schedulers.newThread() ) );
}
- private Observable<Entity> createWriteObservable( final EntityIndex entityIndex, final Id ownerId, final int workerIndex){
+ private Observable<Entity> createWriteObservable( final EntityIndex entityIndex, final Id ownerId,
+ final int workerIndex ) {
final IndexScope scope = new IndexScopeImpl( ownerId, "test" );
-
- return Observable.range( 0, indexTestFig.getNumberOfRecords() )
+ return Observable.range( 0, indexTestFig.getNumberOfRecords() )
//create our entity
- .map( new Func1<Integer, Entity>() {
- @Override
- public Entity call( final Integer integer ) {
- final Entity entity = new Entity("test");
-
- entity.setField( new IntegerField("workerIndex", workerIndex));
- entity.setField( new IntegerField( "ordinal", integer ) );
-
- return entity;
- }
- } ).buffer( indexTestFig.getBufferSize() ).doOnNext( new Action1<List<Entity>>() {
- @Override
- public void call( final List<Entity> entities ) {
- //take our entities and roll them into a batch
- Observable.from( entities ).collect( entityIndex.createBatch(), new Action2<EntityIndexBatch, Entity>() {
-
-
- @Override
- public void call( final EntityIndexBatch entityIndexBatch, final Entity entity ) {
- entityIndexBatch.index(scope, entity );
- }
- } ).doOnNext( new Action1<EntityIndexBatch>() {
- @Override
- public void call( final EntityIndexBatch entityIndexBatch ) {
+ .map( new Func1<Integer, Entity>() {
+ @Override
+ public Entity call( final Integer integer ) {
+ final Entity entity = new Entity( "test" );
+
+ entity.setField( new IntegerField( "workerIndex", workerIndex ) );
+ entity.setField( new IntegerField( "ordinal", integer ) );
+
+ return entity;
+ }
+ } ).buffer( indexTestFig.getBufferSize() ).doOnNext( new Action1<List<Entity>>() {
+ @Override
+ public void call( final List<Entity> entities ) {
+ //take our entities and roll them into a batch
+ Observable.from( entities )
+ .collect( () -> entityIndex.createBatch(), ( entityIndexBatch, entity ) -> {
+
+ entityIndexBatch.index( scope, entity );
+ } ).doOnNext( entityIndexBatch -> {
entityIndexBatch.execute();
- }
- } ).toBlocking().last();
- }
- } )
-
- //translate back into a stream of entities for the caller to use
- .flatMap( new Func1<List<Entity>, Observable<Entity>>() {
- @Override
- public Observable<Entity> call( final List<Entity> entities ) {
- return Observable.from( entities );
- }
- } );
+ } ).toBlocking().last();
+ }
+ } )
+ //translate back into a stream of entities for the caller to use
+ .flatMap(entities -> Observable.from( entities ) );
}
-
-
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index f24917a..efbda2d 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -125,7 +125,7 @@
<usergrid.it.threads>8</usergrid.it.threads>
<metrics.version>3.0.0</metrics.version>
- <rx.version>0.19.6</rx.version>
+ <rx.version>1.0.8</rx.version>
<surefire.plugin.artifactName>surefire-junit47</surefire.plugin.artifactName>
<surefire.plugin.version>2.18.1</surefire.plugin.version>
<powermock.version>1.6.1</powermock.version>
@@ -1560,8 +1560,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.8</source>
+ <target>1.8</target>
<optimize>true</optimize>
<showDeprecation>true</showDeprecation>
<debug>true</debug>
@@ -1583,7 +1583,7 @@
<configuration>
<rules>
<requireJavaVersion>
- <version>1.7.0</version>
+ <version>1.8.0</version>
</requireJavaVersion>
<requireMavenVersion>
<version>[3.0,)</version>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 4f849e0..bebd557 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -937,18 +937,12 @@ public class ImportServiceImpl implements ImportService {
// potentially skip the first n if this is a resume operation
final int entityNumSkip = (int)tracker.getTotalEntityCount();
- // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
- final int entityCount = entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
- @Override
- public Boolean call( final WriteEvent writeEvent ) {
- return !tracker.shouldStopProcessingEntities();
- }
- } ).skip(entityNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
- @Override
- public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
- return entityWrapperObservable.doOnNext(doWork);
- }
- }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+
+ entityEventObservable.takeWhile( writeEvent -> !tracker.shouldStopProcessingEntities() ).skip( entityNumSkip )
+ .flatMap( writeEvent -> {
+ return Observable.just( writeEvent ).doOnNext( doWork );
+ }, 10 ).reduce( 0, heartbeatReducer ).toBlocking().last();
+
jp.close();
@@ -979,17 +973,11 @@ public class ImportServiceImpl implements ImportService {
final int connectionNumSkip = (int)tracker.getTotalConnectionCount();
// with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
- final int connectionCount = otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
- @Override
- public Boolean call( final WriteEvent writeEvent ) {
- return !tracker.shouldStopProcessingConnections();
- }
- } ).skip(connectionNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
- @Override
- public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
- return entityWrapperObservable.doOnNext(doWork);
- }
- }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+ final int connectionCount = otherEventObservable.takeWhile(
+ writeEvent -> !tracker.shouldStopProcessingConnections() ).skip(connectionNumSkip).flatMap( entityWrapper ->{
+ return Observable.just(entityWrapper).doOnNext( doWork ).subscribeOn( Schedulers.io() );
+
+ }, 10 ).reduce(0, heartbeatReducer).toBlocking().last();
jp.close();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 5b1a6b3..b183daa 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -110,84 +110,81 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
final UUID appId = em.getApplication().getUuid();
final Map<String,Object> payloads = notification.getPayloads();
- final Func1<Entity,Entity> entityListFunct = new Func1<Entity, Entity>() {
- @Override
- public Entity call(Entity entity) {
+ final Func1<Entity,Entity> entityListFunct = entity -> {
- try {
+ try {
- long now = System.currentTimeMillis();
- List<EntityRef> devicesRef = getDevices(entity); // resolve group
+ long now = System.currentTimeMillis();
+ List<EntityRef> devicesRef = getDevices(entity); // resolve group
- LOG.info("notification {} queue {} devices, duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), devicesRef.size());
+ LOG.info("notification {} queue {} devices, duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), devicesRef.size());
- for (EntityRef deviceRef : devicesRef) {
- LOG.info("notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
- long hash = MurmurHash.hash(deviceRef.getUuid());
- if (sketch.estimateCount(hash) > 0) { //look for duplicates
- LOG.warn("Maybe Found duplicate device: {}", deviceRef.getUuid());
- continue;
- } else {
- sketch.add(hash, 1);
- }
- String notifierId = null;
- String notifierKey = null;
-
- //find the device notifier info, match it to the payload
- for (Map.Entry<String, Object> entry : payloads.entrySet()) {
- ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase());
- now = System.currentTimeMillis();
- String providerId = getProviderId(deviceRef, adapter.getNotifier());
- if (providerId != null) {
- notifierId = providerId;
- notifierKey = entry.getKey().toLowerCase();
- break;
- }
- LOG.info("Provider query for notification {} device {} took "+(System.currentTimeMillis()-now)+" ms",notification.getUuid(),deviceRef.getUuid());
- }
+ for (EntityRef deviceRef : devicesRef) {
+ LOG.info("notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
+ long hash = MurmurHash.hash(deviceRef.getUuid());
+ if (sketch.estimateCount(hash) > 0) { //look for duplicates
+ LOG.warn("Maybe Found duplicate device: {}", deviceRef.getUuid());
+ continue;
+ } else {
+ sketch.add(hash, 1);
+ }
+ String notifierId = null;
+ String notifierKey = null;
- if (notifierId == null) {
- LOG.info("Notifier did not match for device {} ", deviceRef);
- continue;
+ //find the device notifier info, match it to the payload
+ for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+ ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase());
+ now = System.currentTimeMillis();
+ String providerId = getProviderId(deviceRef, adapter.getNotifier());
+ if (providerId != null) {
+ notifierId = providerId;
+ notifierKey = entry.getKey().toLowerCase();
+ break;
}
+ LOG.info("Provider query for notification {} device {} took "+(System.currentTimeMillis()-now)+" ms",notification.getUuid(),deviceRef.getUuid());
+ }
- ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
- if (notification.getQueued() == null) {
- // update queued time
- now = System.currentTimeMillis();
- notification.setQueued(System.currentTimeMillis());
- LOG.info("notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
- }
+ if (notifierId == null) {
+ LOG.info("Notifier did not match for device {} ", deviceRef);
+ continue;
+ }
+
+ ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
+ if (notification.getQueued() == null) {
+ // update queued time
now = System.currentTimeMillis();
- qm.sendMessage(message);
- LOG.info("notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
- deviceCount.incrementAndGet();
- queueMeter.mark();
+ notification.setQueued(System.currentTimeMillis());
+ LOG.info("notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
}
- } catch (Exception deviceLoopException) {
- LOG.error("Failed to add devices", deviceLoopException);
- errorMessages.add("Failed to add devices for entity: " + entity.getUuid() + " error:" + deviceLoopException);
+ now = System.currentTimeMillis();
+ qm.sendMessage(message);
+ LOG.info("notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
+ deviceCount.incrementAndGet();
+ queueMeter.mark();
}
- return entity;
+ } catch (Exception deviceLoopException) {
+ LOG.error("Failed to add devices", deviceLoopException);
+ errorMessages.add("Failed to add devices for entity: " + entity.getUuid() + " error:" + deviceLoopException);
}
+ return entity;
};
long now = System.currentTimeMillis();
- Observable o = rx.Observable.create(new IteratorObservable<Entity>(iterator))
- .parallel(new Func1<Observable<Entity>, Observable<Entity>>() {
- @Override
- public rx.Observable<Entity> call(rx.Observable<Entity> deviceObservable) {
- return deviceObservable.map(entityListFunct);
- }
- }, Schedulers.io())
- .doOnError(new Action1<Throwable>() {
- @Override
- public void call(Throwable throwable) {
- LOG.error("Failed while writing", throwable);
- }
- });
- o.toBlocking().lastOrDefault(null);
- LOG.info("notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
+
+
+ //process up to 10 concurrently
+ Observable o = rx.Observable.create( new IteratorObservable<Entity>( iterator ) )
+
+ .flatMap( entity -> Observable.just( entity ).map( entityListFunct )
+ .doOnError( throwable -> {
+ LOG.error( "Failed while writing",
+ throwable );
+ } ).subscribeOn( Schedulers.io() )
+
+ , 10 );
+
+ o.toBlocking().lastOrDefault( null );
+ LOG.info( "notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
}
// update queued time
@@ -338,48 +335,39 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
return message;
}
};
- Observable o = rx.Observable.from(messages)
- .parallel(new Func1<rx.Observable<QueueMessage>, rx.Observable<ApplicationQueueMessage>>() {
- @Override
- public rx.Observable<ApplicationQueueMessage> call(rx.Observable<QueueMessage> messageObservable) {
- return messageObservable.map(func);
+
+ //from each queue message, process them in parallel up to 10 at a time
+ Observable o = rx.Observable.from( messages ).flatMap( queueMessage -> {
+
+
+ return Observable.just( queueMessage ).map( func ).buffer( messages.size() ).map( queueMessages -> {
+ //for gcm this will actually send notification
+ for ( ProviderAdapter providerAdapter : notifierMap.values() ) {
+ try {
+ providerAdapter.doneSendingNotifications();
}
- }, Schedulers.io())
- .buffer(messages.size())
- .map(new Func1<List<ApplicationQueueMessage>, HashMap<UUID, ApplicationQueueMessage>>() {
- @Override
- public HashMap<UUID, ApplicationQueueMessage> call(List<ApplicationQueueMessage> queueMessages) {
- //for gcm this will actually send notification
- for (ProviderAdapter providerAdapter : notifierMap.values()) {
- try {
- providerAdapter.doneSendingNotifications();
- } catch (Exception e) {
- LOG.error("providerAdapter.doneSendingNotifications: ", e);
- }
+ catch ( Exception e ) {
+ LOG.error( "providerAdapter.doneSendingNotifications: ", e );
+ }
+ }
+ //TODO: check if a notification is done and mark it
+ HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<>();
+ for ( ApplicationQueueMessage message : queueMessages ) {
+ if ( notifications.get( message.getNotificationId() ) == null ) {
+ try {
+ TaskManager taskManager = taskMap.get( message.getNotificationId() );
+ notifications.put( message.getNotificationId(), message );
+ taskManager.finishedBatch();
}
- //TODO: check if a notification is done and mark it
- HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<UUID, ApplicationQueueMessage>();
- for (ApplicationQueueMessage message : queueMessages) {
- if (notifications.get(message.getNotificationId()) == null) {
- try {
- TaskManager taskManager = taskMap.get(message.getNotificationId());
- notifications.put(message.getNotificationId(), message);
- taskManager.finishedBatch();
- } catch (Exception e) {
- LOG.error("Failed to finish batch", e);
- }
- }
-
+ catch ( Exception e ) {
+ LOG.error( "Failed to finish batch", e );
}
- return notifications;
- }
- })
- .doOnError(new Action1<Throwable>() {
- @Override
- public void call(Throwable throwable) {
- LOG.error("Failed while sending",throwable);
}
- });
+ }
+ return notifications;
+ } ).doOnError( throwable -> LOG.error( "Failed while sending", throwable ) );
+ }, 10 );
+
return o;
}
@@ -400,7 +388,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
* {"winphone":"mymessage","apple":"mymessage"}
* TODO: document this method better
*/
- private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter> notifierMap) throws Exception {
+ private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter>
+ notifierMap) throws Exception {
Map<String, Object> translatedPayloads = new HashMap<String, Object>( payloads.size());
for (Map.Entry<String, Object> entry : payloads.entrySet()) {
String payloadKey = entry.getKey().toLowerCase();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java b/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
index e8c5ace..6d0419a 100644
--- a/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
+++ b/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.setup;
+import java.io.IOException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,7 +108,19 @@ public class ConcurrentProcessSingleton {
barrier.await( ONE_MINUTE );
logger.info( "Setup to complete" );
- lock.maybeReleaseLock();
+
+ Runtime.getRuntime().addShutdownHook( new Thread( ){
+ @Override
+ public void run() {
+ try {
+ lock.maybeReleaseLock();
+ }
+ catch ( IOException e ) {
+ throw new RuntimeException( "Unable to release lock" );
+ }
+ }
+ });
+
}
catch ( Exception e ) {
throw new RuntimeException( "Unable to initialize system", e );