You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/20 17:51:47 UTC

[01/12] incubator-usergrid git commit: WIP Overwrite

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-480 f7e78f4a6 -> 336c22285


WIP Overwrite


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9a787238
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9a787238
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9a787238

Branch: refs/heads/USERGRID-480
Commit: 9a7872380b2953b3916197d2aacc65fa78867642
Parents: eb0c689
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 18 14:18:14 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 18 14:18:14 2015 -0600

----------------------------------------------------------------------
 .../persistence/index/guice/IndexTestFig.java   |  57 ++++
 .../index/guice/TestIndexModule.java            |   4 +
 .../index/impl/CorePerformanceIT.java           | 339 -------------------
 .../index/impl/IndexLoadTestsIT.java            | 169 +++++++++
 4 files changed, 230 insertions(+), 339 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9a787238/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java
new file mode 100644
index 0000000..ecf3dfa
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.guice;
+
+
+import java.util.UUID;
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ * Test configuration for creating documents
+ */
+@FigSingleton
+public interface IndexTestFig extends GuicyFig {
+
+    @Key( "stresstest.numWorkers" )
+    @Default( "16" )
+    public int getNumberOfWorkers();
+
+    @Key( "stresstest.numberofRecords" )
+    @Default( "10000" )
+    public int getNumberOfRecords();
+
+    @Key( "stresstest.bufferSize" )
+    @Default( "1000" )
+    public int getBufferSize();
+
+    @Key( "stresstest.validate.wait" )
+    @Default( "2000" )
+    public long getValidateWait();
+
+
+    @Key( "stresstest.applicationId" )
+    @Default( "0df46683-cdab-11e4-83c2-d2be4de3081a" )
+    public String getApplicationId();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9a787238/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 50b994d..79a021a 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -19,9 +19,12 @@
 package org.apache.usergrid.persistence.index.guice;
 
 
+import org.safehaus.guicyfig.GuicyFigModule;
+
 import org.apache.usergrid.persistence.collection.guice.CollectionModule;
 import org.apache.usergrid.persistence.core.guice.TestModule;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
+import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.impl.BufferQueue;
 import org.apache.usergrid.persistence.index.impl.BufferQueueInMemoryImpl;
 import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl;
@@ -36,5 +39,6 @@ public class TestIndexModule extends TestModule {
         // configure collections and our core astyanax framework
         install( new CollectionModule() );
         install( new IndexModule()  );
+        install( new GuicyFigModule(IndexTestFig.class) );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9a787238/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
deleted file mode 100644
index c1bfe38..0000000
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.index.impl;
-
-
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.junit.ClassRule;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.lang3.math.NumberUtils;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.index.guice.TestIndexModule;
-import org.apache.usergrid.persistence.index.query.CandidateResults;
-import org.apache.usergrid.persistence.index.query.EntityResults;
-import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.apache.usergrid.persistence.model.field.DoubleField;
-import org.apache.usergrid.persistence.model.field.LongField;
-import org.apache.usergrid.persistence.model.field.StringField;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
-
-
-/**
- * TODO: make CorePerformanceIT configurable, add CHOP markup.
- */
-public class CorePerformanceIT extends BaseIT {
-    private static final Logger log = LoggerFactory.getLogger(CorePerformanceIT.class);
-
-    @ClassRule
-    public static ElasticSearchResource es = new ElasticSearchResource();
-
-    // max entities we will write and read
-    static int maxEntities = 10; // TODO: make this configurable when you add Chop
-
-    // each app will get all data
-    static int appCount = 10;
-
-    // number of threads = orgCount x appCount
-
-    // total number of records = orgCount x appCount x numRecords
-
-    static EntityCollectionManagerFactory ecmf;
-    static EntityIndexFactory ecif ;
-
-
-    @Ignore("Relies on finefoods.txt which must be downloaded separately")
-    @Test
-    public void loadAndReadData() throws IOException, InterruptedException {
-
-        Injector injector = Guice.createInjector( new TestIndexModule() );
-
-        // only on first run
-        //MigrationManager m = injector.getInstance( MigrationManager.class )
-        //m.migrate()
-
-        ecmf = injector.getInstance( EntityCollectionManagerFactory.class );
-        ecif = injector.getInstance( EntityIndexFactory.class );
-
-        final ApplicationScope scope = new ApplicationScopeImpl( new SimpleId( "application" ) );
-
-        log.info("Start Data Load");
-
-        List<IndexScope> scopes = loadData(scope);
-
-        log.info("Finish Data Load");
-
-        log.info("Start Data Read");
-
-
-        readData( scope, scopes );
-        log.info("Finish Data Read");
-
-        runSelectedQueries( scope, scopes );
-
-    }
-
-
-    private List<IndexScope> loadData(final ApplicationScope applicationScope) throws InterruptedException {
-
-        long time = new Date().getTime();
-
-        List<IndexScope> scopes = new ArrayList<IndexScope>();
-        List<Thread> threads = new ArrayList<Thread>();
-
-
-        for ( int j = 0; j < appCount; j++ ) {
-
-            String appName = "app-" + j + "-" + time;
-            Id appId = new SimpleId( appName );
-            IndexScope indexScope = new IndexScopeImpl( appId, "reviews");
-            scopes.add( indexScope );
-
-            Thread t = new Thread( new DataLoader( applicationScope, indexScope ) );
-            t.start();
-            threads.add( t );
-        }
-
-        // wait for indexing to end
-        for ( Thread t : threads ) {
-            t.join();
-        }
-
-        return scopes;
-    }
-
-
-    private void readData(final ApplicationScope applicationScope,  List<IndexScope> scopes ) throws InterruptedException {
-
-        List<Thread> threads = new ArrayList<Thread>();
-        for ( IndexScope scope : scopes ) {
-
-            Thread t = new Thread( new DataReader( applicationScope, scope ));
-            t.start();
-            threads.add(t);
-        }
-
-        // wait for reading to end
-        for ( Thread t : threads ) {
-            t.join();
-        }
-    }
-
-
-    static class DataReader implements Runnable {
-        final ApplicationScope scope;
-       final  IndexScope indexScope;
-
-        public DataReader( final ApplicationScope scope, IndexScope indexScope ) {
-            this.scope = scope;
-            this.indexScope = indexScope;
-        }
-
-        public void run() {
-
-            EntityIndex eci =   ecif.createEntityIndex( scope);
-            EntityCollectionManager ecm = ecmf.createCollectionManager( new CollectionScopeImpl( scope.getApplication(), indexScope.getOwner(), indexScope.getName() ) );
-
-            Query query = Query.fromQL( "review_score > 0"); // get all reviews;
-            query.withLimit( maxEntities < 1000 ? maxEntities : 1000 );
-
-            final SearchTypes searchType = SearchTypes.fromTypes( "review" );
-
-            CandidateResults candidateResults = eci.search(indexScope, searchType, query );
-            int count = candidateResults.size();
-
-            while ( candidateResults.hasCursor() && count < maxEntities ) {
-                query.setCursor( candidateResults.getCursor() )   ;
-                candidateResults = eci.search(indexScope, searchType,  query );
-                count += candidateResults.size();
-
-                //cause retrieval from cassandra
-                EntityResults entityResults = new EntityResults(
-                    candidateResults, ecm, UUIDGenerator.newTimeUUID() );
-
-                while(entityResults.hasNext()){
-                    entityResults.next();
-                }
-
-                log.info("Read {} reviews in {} / {} ", new Object[] {
-                    count, indexScope.getOwner(), indexScope.getName() } );
-            }
-        }
-    }
-
-
-    static class DataLoader implements Runnable {
-        final ApplicationScope applicationScope;
-        final IndexScope indexScope;
-
-        public DataLoader( final ApplicationScope applicationScope, IndexScope indexScope ) {
-            this.applicationScope = applicationScope;
-            this.indexScope = indexScope;
-        }
-
-        public void run() {
-
-            CollectionScope collectionScope = new CollectionScopeImpl(
-                    applicationScope.getApplication(), indexScope.getOwner(), indexScope.getName() );
-            EntityCollectionManager ecm = ecmf.createCollectionManager(collectionScope );
-            EntityIndex eci = ecif.createEntityIndex(applicationScope );
-
-            FileReader fr;
-            try {
-                fr = new FileReader("../../resources/finefoods.txt");
-            } catch (FileNotFoundException ex) {
-                throw new RuntimeException("Error opening file", ex);
-            }
-            BufferedReader br = new BufferedReader(fr);
-            String s = null;
-
-            // create the first entry
-            Entity current = new Entity(
-                new SimpleId(UUIDGenerator.newTimeUUID(), "review"));
-
-//            Id orgId = orgAppScope.scope.getApplication();
-//            Id appId = orgAppScope.scope.getOwner();
-
-            int count = 0;
-
-            EntityIndexBatch entityIndexBatch = eci.createBatch();
-
-            try {
-                while ( (s = br.readLine()) != null && count < maxEntities ) {
-
-                    try {
-
-                        if ( s.trim().equals("")) { // then we are at end of a record
-
-                            // write and index current entity
-                            ecm.write( current ).toBlocking().last();
-
-                            entityIndexBatch.index(indexScope, current  );
-
-                            if ( maxEntities < 20 ) {
-                                log.info("Index written for {}", current.getId());
-                                log.info("---");
-                            }
-
-                            // create the next entity
-                            current = new Entity(
-                                    new SimpleId(UUIDGenerator.newTimeUUID(), "review"));
-
-                            count++;
-                            if(count % 1000 == 0){
-                                entityIndexBatch.execute().get();
-                            }
-
-                            if (count % 100000 == 0) {
-                                log.info("Indexed {} reviews in {} / {} ",
-                                    new Object[] {
-                                        count,
-                                            applicationScope,
-                                        indexScope.getOwner() } );
-                            }
-                            continue;
-                        }
-
-                        // process a field
-                        String name = s.substring( 0, s.indexOf(":")).replace("/", "_").toLowerCase() ;
-                        String value = s.substring( s.indexOf(":") + 1 ).trim();
-
-                        if ( maxEntities < 20 ) {
-                            log.info("Indexing {} = {}", name, value);
-                        }
-
-                        if ( NumberUtils.isNumber(value) && value.contains(".")) {
-                            current.setField( new DoubleField( name, Double.parseDouble(value)));
-
-                        } else if ( NumberUtils.isNumber(value) ) {
-                            current.setField( new LongField( name, Long.parseLong(value)));
-
-                        } else {
-                            current.setField( new StringField( name, value.toString() ));
-                        }
-
-                    } catch ( Exception e ) {
-                        log.info("Error on line " + count);
-                    }
-                }
-
-            } catch (IOException ex) {
-                throw new RuntimeException("Error reading file", ex);
-            }
-
-            eci.refresh();
-        }
-    }
-
-
-    public void runSelectedQueries(final ApplicationScope scope,  List<IndexScope> indexScopes ) {
-
-        for ( IndexScope indexScope : indexScopes ) {
-            EntityIndex eci = ecif.createEntityIndex(scope );
-
-            // TODO: come up with more and more complex queries for CorePerformanceIT
-
-            query(indexScope, eci, "product_productid = 'B006K2ZZ7K'") ;
-            query(indexScope, eci, "review_profilename = 'Twoapennything'") ;
-            query(indexScope, eci, "review_profilename contains 'Natalia'") ;
-            query(indexScope, eci, "review_profilename contains 'Patrick'") ;
-            query(indexScope, eci, "review_time = 1342051200") ;
-            query(indexScope, eci, "review_time > 1342051200") ;
-            query(indexScope, eci, "review_score > 0");
-            query(indexScope, eci, "review_score > 2");
-            query(indexScope, eci, "review_score > 3");
-            query(indexScope, eci, "review_score > 4");
-            query(indexScope, eci, "review_score > 5");
-        }
-    }
-
-    public static void query(final IndexScope indexScope, final EntityIndex eci, final String query ) {;
-        Query q = Query.fromQL(query) ;
-//        CandidateResults candidateResults = eci.search(indexScope,  q );  TODO FIXME
-//        log.info("size = {} returned from query {}", candidateResults.size(), q.getQl() );
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9a787238/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
new file mode 100644
index 0000000..c962d6b
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.guice.IndexTestFig;
+import org.apache.usergrid.persistence.index.guice.TestIndexModule;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.IntegerField;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Action2;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+
+/**
+ * TODO: make CorePerformanceIT configurable, add CHOP markup.
+ */
+@RunWith( EsRunner.class )
+@UseModules( { TestIndexModule.class } )
+public class IndexLoadTestsIT extends BaseIT {
+    private static final Logger log = LoggerFactory.getLogger( IndexLoadTestsIT.class );
+
+    @ClassRule
+    public static ElasticSearchResource es = new ElasticSearchResource();
+
+
+    @Inject
+    public IndexTestFig indexTestFig;
+
+    @Inject
+    public EntityIndexFactory entityIndexFactory;
+
+    @Test
+    public void testHeavyLoad(){
+
+        final UUID applicationUUID = UUID.fromString( indexTestFig.getApplicationId() );
+
+        final Id applicationId = new SimpleId(applicationUUID, "application");
+        final ApplicationScope scope = new ApplicationScopeImpl( applicationId  );
+
+        final EntityIndex index = entityIndexFactory.createEntityIndex( scope );
+
+        //create our index if it doesn't exist
+        index.initializeIndex();
+
+        final Observable<Entity> createEntities = createStreamFromWorkers( index, applicationId );
+
+        //run them all
+        createEntities.toBlocking().last();
+
+
+
+
+    }
+
+    public Observable<Entity> createStreamFromWorkers(final EntityIndex entityIndex, final Id ownerId){
+
+        //create a sequence of observables.  Each index will be it's own worker thread using the Schedulers.newthread()
+     return Observable.range( 0, indexTestFig.getNumberOfWorkers() ).parallel( new Func1<Observable<Integer>, Observable<Entity>>() {
+
+
+          @Override
+          public Observable<Entity> call( final Observable<Integer> integerObservable ) {
+             return integerObservable.flatMap( new Func1<Integer, Observable<Entity>>() {
+                  @Override
+                  public Observable<Entity> call( final Integer integer ) {
+                      return createWriteObservable( entityIndex, ownerId, integer );
+                  }
+              } );
+
+          }
+      }, Schedulers.newThread() );
+    }
+
+
+    private Observable<Entity> createWriteObservable( final EntityIndex entityIndex, final Id ownerId, final int workerIndex){
+
+
+        final IndexScope scope = new IndexScopeImpl( ownerId, "test" );
+
+
+
+       return  Observable.range( 0, indexTestFig.getNumberOfRecords() )
+
+            //create our entity
+                  .map( new Func1<Integer, Entity>() {
+            @Override
+            public Entity call( final Integer integer ) {
+                final Entity entity = new Entity("test");
+
+                entity.setField( new IntegerField("workerIndex", workerIndex));
+                entity.setField( new IntegerField( "ordinal", integer ) );
+
+                return entity;
+            }
+        } ).buffer( indexTestFig.getBufferSize() ).doOnNext( new Action1<List<Entity>>() {
+            @Override
+            public void call( final List<Entity> entities ) {
+                //take our entities and roll them into a batch
+                  Observable.from( entities ).collect( entityIndex.createBatch(), new Action2<EntityIndexBatch, Entity>() {
+
+
+                    @Override
+                    public void call( final EntityIndexBatch entityIndexBatch, final Entity entity ) {
+                        entityIndexBatch.index(scope, entity  );
+                    }
+                } ).doOnNext( new Action1<EntityIndexBatch>() {
+                    @Override
+                    public void call( final EntityIndexBatch entityIndexBatch ) {
+                        entityIndexBatch.execute();
+                    }
+                } ).toBlocking().last();
+            }
+        } )
+
+            //translate back into a stream of entities for the caller to use
+           .flatMap( new Func1<List<Entity>, Observable<Entity>>() {
+            @Override
+            public Observable<Entity> call( final List<Entity> entities ) {
+                return Observable.from( entities );
+            }
+        } );
+
+    }
+
+
+
+
+}


[08/12] incubator-usergrid git commit: Fixes rebuild bug

Posted by sf...@apache.org.
Fixes rebuild bug


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5889a3b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5889a3b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5889a3b4

Branch: refs/heads/USERGRID-480
Commit: 5889a3b4d1ae7afe450a76594f3de8f9ba300f8c
Parents: c7e3459
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Mar 19 19:53:55 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Mar 19 19:53:55 2015 -0600

----------------------------------------------------------------------
 .../java/org/apache/usergrid/corepersistence/CpWalker.java   | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5889a3b4/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index 332d5a8..c14447d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -101,10 +101,8 @@ public class CpWalker {
             edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName );
         }
 
-        Observable<String> edgeTypes = gm.getEdgeTypesFromSource(
-            new SimpleSearchEdgeType( applicationId, edgeType, null ) );
-
-        edgeTypes.flatMap( emittedEdgeType -> {
+        Observable<Edge> edges = gm.getEdgeTypesFromSource(
+                    new SimpleSearchEdgeType( applicationId, edgeType, null ) ).flatMap( emittedEdgeType -> {
 
             logger.debug( "Loading edges of type {} from node {}", edgeType, applicationId );
 
@@ -136,6 +134,6 @@ public class CpWalker {
         }, 100 );
 
         // wait for it to complete
-        edgeTypes.toBlocking().lastOrDefault( null ); // end foreach on edges
+        edges.toBlocking().lastOrDefault( null ); // end foreach on edges
     }
 }


[04/12] incubator-usergrid git commit: First pass at upgrading to java 8 and latest RX java

Posted by sf...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 26d06ad..ef258f4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -188,7 +188,7 @@ public class GraphManagerImpl implements GraphManager {
         final Timer.Context timer = writeEdgeTimer.time();
         final Meter meter = writeEdgeMeter;
 
-        return Observable.from( markedEdge ).map( new Func1<MarkedEdge, Edge>() {
+        return Observable.just( markedEdge ).map( new Func1<MarkedEdge, Edge>() {
             @Override
             public Edge call( final MarkedEdge edge ) {
 
@@ -234,7 +234,7 @@ public class GraphManagerImpl implements GraphManager {
 
         final Timer.Context timer = deleteEdgeTimer.time();
         final Meter meter = deleteEdgeMeter;
-        return Observable.from(markedEdge).map(new Func1<MarkedEdge, Edge>() {
+        return Observable.just(markedEdge).map(new Func1<MarkedEdge, Edge>() {
             @Override
             public Edge call(final MarkedEdge edge) {
 
@@ -281,7 +281,7 @@ public class GraphManagerImpl implements GraphManager {
     public Observable<Id> deleteNode( final Id node, final long timestamp ) {
         final Timer.Context timer = deleteNodeTimer.time();
         final Meter meter = deleteNodeMeter;
-        return Observable.from( node ).map( new Func1<Id, Id>() {
+        return Observable.just( node ).map( new Func1<Id, Id>() {
             @Override
             public Id call( final Id id ) {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index ab141f7..bfaeaaa 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -176,6 +176,8 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                          * Sum up the total number of edges we had, then execute the mutation if we have
                          * anything to do
                          */
+
+
                         return MathObservable.sumInteger( Observable.merge( checks ) )
                                              .doOnNext( new Action1<Integer>() {
                                                             @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index e8c224e..6236a16 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -103,7 +103,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
     public Observable<Integer> receive( final ApplicationScope scope, final Id node, final UUID timestamp ) {
 
 
-        return Observable.from( node )
+        return Observable.just( node )
 
                 //delete source and targets in parallel and merge them into a single observable
                 .flatMap( new Func1<Id, Observable<Integer>>() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
index 2d9b47f..ecb9a9b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.functions.Action1;
 import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
@@ -75,61 +76,49 @@ public class EdgeDataMigrationImpl implements DataMigration<GraphNode> {
     }
 
 
-
-
     @Override
-       public int migrate( final int currentVersion, final MigrationDataProvider<GraphNode> migrationDataProvider,
-                           final ProgressObserver observer ) {
+    public int migrate( final int currentVersion, final MigrationDataProvider<GraphNode> migrationDataProvider,
+                        final ProgressObserver observer ) {
 
         final AtomicLong counter = new AtomicLong();
 
-        final MigrationRelationship<EdgeMetadataSerialization>
-                migration = allVersions.getMigrationRelationship( currentVersion );
-
-       final Observable<List<Edge>> observable =  migrationDataProvider.getData().flatMap( new Func1<GraphNode,
-           Observable<List<Edge>>>() {
-            @Override
-            public Observable<List<Edge>> call( final GraphNode graphNode ) {
-                final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope );
-
-                //get edges from the source
-                return edgesFromSourceObservable.edgesFromSource( gm, graphNode.entryNode ).buffer( 1000 ).parallel( new Func1<Observable<List<Edge>>, Observable<List<Edge>>>() {
-                                                  @Override
-                                                  public Observable<List<Edge>> call( final Observable<List<Edge>> listObservable ) {
-                          return listObservable.doOnNext( new Action1<List<Edge>>() {
-                              @Override
-                              public void call( List<Edge> edges ) {
-                                  final MutationBatch batch = keyspace.prepareMutationBatch();
-
-                                  for ( Edge edge : edges ) {
-                                      logger.info( "Migrating meta for edge {}", edge );
-                                      final MutationBatch edgeBatch =
-                                              migration.to.writeEdge(  graphNode.applicationScope, edge );
-                                      batch.mergeShallow( edgeBatch );
-                                  }
-
-                                  try {
-                                      batch.execute();
-                                  }
-                                  catch ( ConnectionException e ) {
-                                      throw new RuntimeException( "Unable to perform migration", e );
-                                  }
-
-                                  //update the observer so the admin can see it
-                                  final long newCount = counter.addAndGet( edges.size() );
-
-                                  observer.update( migration.to.getImplementationVersion(),
-                                          String.format( "Currently running.  Rewritten %d edge types",
-                                                  newCount ) );
-                              }
-                          } );
-                  } } );
-            }} );
-
-        observable.longCount().toBlocking().last();
+        final MigrationRelationship<EdgeMetadataSerialization> migration =
+            allVersions.getMigrationRelationship( currentVersion );
 
-        return migration.to.getImplementationVersion();
+        final Observable<List<Edge>> observable = migrationDataProvider.getData().flatMap( graphNode -> {
+            final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope );
+
+            //get edges from the source
+            return edgesFromSourceObservable.edgesFromSource( gm, graphNode.entryNode ).buffer( 1000 )
+                                            .doOnNext( edges -> {
+                                                    final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                                                    for ( Edge edge : edges ) {
+                                                        logger.info( "Migrating meta for edge {}", edge );
+                                                        final MutationBatch edgeBatch =
+                                                            migration.to.writeEdge( graphNode.applicationScope, edge );
+                                                        batch.mergeShallow( edgeBatch );
+                                                    }
 
+                                                    try {
+                                                        batch.execute();
+                                                    }
+                                                    catch ( ConnectionException e ) {
+                                                        throw new RuntimeException( "Unable to perform migration", e );
+                                                    }
+
+                                                    //update the observer so the admin can see it
+                                                    final long newCount = counter.addAndGet( edges.size() );
+
+                                                    observer.update( migration.to.getImplementationVersion(), String
+                                                        .format( "Currently running.  Rewritten %d edge types",
+                                                            newCount ) );
+                                                } ).subscribeOn( Schedulers.io() );
+        }, 10 );
+
+        observable.countLong().toBlocking().last();
+
+        return migration.to.getImplementationVersion();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 6d30d22..3bbf3e4 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -516,7 +516,7 @@ public class GraphManagerShardConsistencyIT {
                                                             }
                                                         } )
 
-                                                        .longCount().toBlocking().last();
+                                                        .countLong().toBlocking().last();
 
 
 //                if(returnedEdgeCount != count[0]-duplicate[0]){

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java
index 0a27a6b..7b3fafd 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java
@@ -63,23 +63,23 @@ public class SimpleTest {
 
 
         Edge testTargetEdge = createEdge( sourceId1, "test", targetId1, System.currentTimeMillis() );
-        gm.writeEdge( testTargetEdge ).toBlockingObservable().singleOrDefault( null );
+        gm.writeEdge( testTargetEdge ).toBlocking().singleOrDefault( null );
 
         Edge testTarget2Edge = createEdge( sourceId2, "edgeType1", targetId1, System.currentTimeMillis() );
-        gm.writeEdge( testTarget2Edge ).toBlockingObservable().singleOrDefault( null );
+        gm.writeEdge( testTarget2Edge ).toBlocking().singleOrDefault( null );
 
         Edge test2TargetEdge = createEdge( sourceId1, "edgeType1", targetId1, System.currentTimeMillis() );
-        gm.writeEdge( test2TargetEdge ).toBlockingObservable().singleOrDefault( null );
+        gm.writeEdge( test2TargetEdge ).toBlocking().singleOrDefault( null );
 
         Edge test3TargetEdge = createEdge( sourceId1, "edgeType2", targetId1, System.currentTimeMillis() );
-        gm.writeEdge( test3TargetEdge ).toBlockingObservable().singleOrDefault( null );
+        gm.writeEdge( test3TargetEdge ).toBlocking().singleOrDefault( null );
 
         int count = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(targetId1, null, null) )
-                .count().toBlockingObservable().last();
+                .count().toBlocking().last();
         assertEquals( 3, count );
 
         count = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(targetId1, "edgeType", null) )
-                .count().toBlockingObservable().last();
+                .count().toBlocking().last();
         assertEquals( 2, count );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java
index a269c15..049c3d2 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java
@@ -121,7 +121,7 @@ public class EdgeDataMigrationImplTest implements DataMigrationResetRule.DataMig
 
 
         //walk from s1 and s2
-        final Observable<GraphNode> graphNodes = Observable.from( new GraphNode( applicationScope, sourceId1), new GraphNode(applicationScope, sourceId2 ) );
+        final Observable<GraphNode> graphNodes = Observable.just( new GraphNode( applicationScope, sourceId1), new GraphNode(applicationScope, sourceId2 ) );
 
         final MigrationDataProvider<GraphNode> testMigrationProvider = new MigrationDataProvider<GraphNode>() {
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 9656e2d..3ec7852 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -47,8 +47,8 @@ limitations under the License.
 
     <properties>
 
-        <maven.compiler.source>1.7</maven.compiler.source>
-        <maven.compiler.target>1.7</maven.compiler.target>
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
 
         <antlr.version>3.4</antlr.version>
         <archaius.version>0.5.12</archaius.version>
@@ -64,14 +64,14 @@ limitations under the License.
         <guava.version>18.0</guava.version>
         <guice.version>4.0-beta5</guice.version>
         <guicyfig.version>3.2</guicyfig.version>
-        <hystrix.version>1.3.16</hystrix.version>
+        <hystrix.version>1.4.0</hystrix.version>
         <jackson-2-version>2.4.1</jackson-2-version>
         <jackson-smile.verson>2.4.3</jackson-smile.verson>
         <mockito.version>1.10.8</mockito.version>
         <junit.version>4.11</junit.version>
         <kryo-serializers.version>0.26</kryo-serializers.version>
         <log4j.version>1.2.17</log4j.version>
-        <rx.version>0.19.6</rx.version>
+        <rx.version>1.0.8</rx.version>
         <slf4j.version>1.7.2</slf4j.version>
         <surefire.version>2.16</surefire.version>
         <aws.version>1.9.0</aws.version>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
index c962d6b..82af950 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.UUID;
 
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
@@ -32,7 +33,6 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.guice.IndexTestFig;
@@ -41,13 +41,11 @@ import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.field.IntegerField;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.google.inject.Inject;
 
 import rx.Observable;
 import rx.functions.Action1;
-import rx.functions.Action2;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
@@ -57,6 +55,7 @@ import rx.schedulers.Schedulers;
  */
 @RunWith( EsRunner.class )
 @UseModules( { TestIndexModule.class } )
+@Ignore( "Should only be run during load tests of elasticsearch" )
 public class IndexLoadTestsIT extends BaseIT {
     private static final Logger log = LoggerFactory.getLogger( IndexLoadTestsIT.class );
 
@@ -70,13 +69,14 @@ public class IndexLoadTestsIT extends BaseIT {
     @Inject
     public EntityIndexFactory entityIndexFactory;
 
+
     @Test
-    public void testHeavyLoad(){
+    public void testHeavyLoad() {
 
         final UUID applicationUUID = UUID.fromString( indexTestFig.getApplicationId() );
 
-        final Id applicationId = new SimpleId(applicationUUID, "application");
-        final ApplicationScope scope = new ApplicationScopeImpl( applicationId  );
+        final Id applicationId = new SimpleId( applicationUUID, "application" );
+        final ApplicationScope scope = new ApplicationScopeImpl( applicationId );
 
         final EntityIndex index = entityIndexFactory.createEntityIndex( scope );
 
@@ -87,83 +87,52 @@ public class IndexLoadTestsIT extends BaseIT {
 
         //run them all
         createEntities.toBlocking().last();
-
-
-
-
     }
 
-    public Observable<Entity> createStreamFromWorkers(final EntityIndex entityIndex, final Id ownerId){
-
-        //create a sequence of observables.  Each index will be it's own worker thread using the Schedulers.newthread()
-     return Observable.range( 0, indexTestFig.getNumberOfWorkers() ).parallel( new Func1<Observable<Integer>, Observable<Entity>>() {
-
 
-          @Override
-          public Observable<Entity> call( final Observable<Integer> integerObservable ) {
-             return integerObservable.flatMap( new Func1<Integer, Observable<Entity>>() {
-                  @Override
-                  public Observable<Entity> call( final Integer integer ) {
-                      return createWriteObservable( entityIndex, ownerId, integer );
-                  }
-              } );
+    public Observable<Entity> createStreamFromWorkers( final EntityIndex entityIndex, final Id ownerId ) {
 
-          }
-      }, Schedulers.newThread() );
+        //create a sequence of observables.  Each index will be it's own worker thread using the Schedulers.newthread()
+        return Observable.range( 0, indexTestFig.getNumberOfWorkers() ).flatMap(
+            integer -> createWriteObservable( entityIndex, ownerId, integer ).subscribeOn( Schedulers.newThread() ) );
     }
 
 
-    private Observable<Entity> createWriteObservable( final EntityIndex entityIndex, final Id ownerId, final int workerIndex){
+    private Observable<Entity> createWriteObservable( final EntityIndex entityIndex, final Id ownerId,
+                                                      final int workerIndex ) {
 
 
         final IndexScope scope = new IndexScopeImpl( ownerId, "test" );
 
 
-
-       return  Observable.range( 0, indexTestFig.getNumberOfRecords() )
+        return Observable.range( 0, indexTestFig.getNumberOfRecords() )
 
             //create our entity
-                  .map( new Func1<Integer, Entity>() {
-            @Override
-            public Entity call( final Integer integer ) {
-                final Entity entity = new Entity("test");
-
-                entity.setField( new IntegerField("workerIndex", workerIndex));
-                entity.setField( new IntegerField( "ordinal", integer ) );
-
-                return entity;
-            }
-        } ).buffer( indexTestFig.getBufferSize() ).doOnNext( new Action1<List<Entity>>() {
-            @Override
-            public void call( final List<Entity> entities ) {
-                //take our entities and roll them into a batch
-                  Observable.from( entities ).collect( entityIndex.createBatch(), new Action2<EntityIndexBatch, Entity>() {
-
-
-                    @Override
-                    public void call( final EntityIndexBatch entityIndexBatch, final Entity entity ) {
-                        entityIndexBatch.index(scope, entity  );
-                    }
-                } ).doOnNext( new Action1<EntityIndexBatch>() {
-                    @Override
-                    public void call( final EntityIndexBatch entityIndexBatch ) {
+            .map( new Func1<Integer, Entity>() {
+                @Override
+                public Entity call( final Integer integer ) {
+                    final Entity entity = new Entity( "test" );
+
+                    entity.setField( new IntegerField( "workerIndex", workerIndex ) );
+                    entity.setField( new IntegerField( "ordinal", integer ) );
+
+                    return entity;
+                }
+            } ).buffer( indexTestFig.getBufferSize() ).doOnNext( new Action1<List<Entity>>() {
+                @Override
+                public void call( final List<Entity> entities ) {
+                    //take our entities and roll them into a batch
+                    Observable.from( entities )
+                              .collect( () -> entityIndex.createBatch(), ( entityIndexBatch, entity ) -> {
+
+                                  entityIndexBatch.index( scope, entity );
+                              } ).doOnNext( entityIndexBatch -> {
                         entityIndexBatch.execute();
-                    }
-                } ).toBlocking().last();
-            }
-        } )
-
-            //translate back into a stream of entities for the caller to use
-           .flatMap( new Func1<List<Entity>, Observable<Entity>>() {
-            @Override
-            public Observable<Entity> call( final List<Entity> entities ) {
-                return Observable.from( entities );
-            }
-        } );
+                    } ).toBlocking().last();
+                }
+            } )
 
+                //translate back into a stream of entities for the caller to use
+            .flatMap(entities -> Observable.from( entities ) );
     }
-
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index f24917a..efbda2d 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -125,7 +125,7 @@
       <usergrid.it.threads>8</usergrid.it.threads>
 
       <metrics.version>3.0.0</metrics.version>
-      <rx.version>0.19.6</rx.version>
+      <rx.version>1.0.8</rx.version>
         <surefire.plugin.artifactName>surefire-junit47</surefire.plugin.artifactName>
       <surefire.plugin.version>2.18.1</surefire.plugin.version>
       <powermock.version>1.6.1</powermock.version>
@@ -1560,8 +1560,8 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-compiler-plugin</artifactId>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
           <optimize>true</optimize>
           <showDeprecation>true</showDeprecation>
           <debug>true</debug>
@@ -1583,7 +1583,7 @@
             <configuration>
               <rules>
                 <requireJavaVersion>
-                  <version>1.7.0</version>
+                  <version>1.8.0</version>
                 </requireJavaVersion>
                 <requireMavenVersion>
                   <version>[3.0,)</version>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 4f849e0..bebd557 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -937,18 +937,12 @@ public class ImportServiceImpl implements ImportService {
         // potentially skip the first n if this is a resume operation
         final int entityNumSkip = (int)tracker.getTotalEntityCount();
 
-        // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
-       final int entityCount =  entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
-            @Override
-            public Boolean call( final WriteEvent writeEvent ) {
-                return !tracker.shouldStopProcessingEntities();
-            }
-        } ).skip(entityNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
-            @Override
-            public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
-                return entityWrapperObservable.doOnNext(doWork);
-            }
-        }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+
+        entityEventObservable.takeWhile( writeEvent -> !tracker.shouldStopProcessingEntities() ).skip( entityNumSkip )
+            .flatMap( writeEvent -> {
+                return Observable.just( writeEvent ).doOnNext( doWork );
+            }, 10 ).reduce( 0, heartbeatReducer ).toBlocking().last();
+
 
         jp.close();
 
@@ -979,17 +973,11 @@ public class ImportServiceImpl implements ImportService {
         final int connectionNumSkip = (int)tracker.getTotalConnectionCount();
 
         // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
-        final int connectionCount = otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
-            @Override
-            public Boolean call( final WriteEvent writeEvent ) {
-                return !tracker.shouldStopProcessingConnections();
-            }
-        } ).skip(connectionNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
-            @Override
-            public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
-                return entityWrapperObservable.doOnNext(doWork);
-            }
-        }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+        final int connectionCount = otherEventObservable.takeWhile(
+            writeEvent -> !tracker.shouldStopProcessingConnections() ).skip(connectionNumSkip).flatMap( entityWrapper ->{
+                return Observable.just(entityWrapper).doOnNext( doWork ).subscribeOn( Schedulers.io() );
+
+        }, 10 ).reduce(0, heartbeatReducer).toBlocking().last();
 
         jp.close();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 5b1a6b3..b183daa 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -110,84 +110,81 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
             final UUID appId = em.getApplication().getUuid();
             final Map<String,Object> payloads = notification.getPayloads();
 
-            final Func1<Entity,Entity> entityListFunct = new Func1<Entity, Entity>() {
-                @Override
-                public Entity call(Entity entity) {
+            final Func1<Entity,Entity> entityListFunct = entity -> {
 
-                    try {
+                try {
 
-                        long now = System.currentTimeMillis();
-                        List<EntityRef> devicesRef = getDevices(entity); // resolve group
+                    long now = System.currentTimeMillis();
+                    List<EntityRef> devicesRef = getDevices(entity); // resolve group
 
-                        LOG.info("notification {} queue  {} devices, duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), devicesRef.size());
+                    LOG.info("notification {} queue  {} devices, duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), devicesRef.size());
 
-                        for (EntityRef deviceRef : devicesRef) {
-                            LOG.info("notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
-                            long hash = MurmurHash.hash(deviceRef.getUuid());
-                            if (sketch.estimateCount(hash) > 0) { //look for duplicates
-                                LOG.warn("Maybe Found duplicate device: {}", deviceRef.getUuid());
-                                continue;
-                            } else {
-                                sketch.add(hash, 1);
-                            }
-                            String notifierId = null;
-                            String notifierKey = null;
-
-                            //find the device notifier info, match it to the payload
-                            for (Map.Entry<String, Object> entry : payloads.entrySet()) {
-                                ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase());
-                                now = System.currentTimeMillis();
-                                String providerId = getProviderId(deviceRef, adapter.getNotifier());
-                                if (providerId != null) {
-                                    notifierId = providerId;
-                                    notifierKey = entry.getKey().toLowerCase();
-                                    break;
-                                }
-                                LOG.info("Provider query for notification {} device {} took "+(System.currentTimeMillis()-now)+" ms",notification.getUuid(),deviceRef.getUuid());
-                            }
+                    for (EntityRef deviceRef : devicesRef) {
+                        LOG.info("notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
+                        long hash = MurmurHash.hash(deviceRef.getUuid());
+                        if (sketch.estimateCount(hash) > 0) { //look for duplicates
+                            LOG.warn("Maybe Found duplicate device: {}", deviceRef.getUuid());
+                            continue;
+                        } else {
+                            sketch.add(hash, 1);
+                        }
+                        String notifierId = null;
+                        String notifierKey = null;
 
-                            if (notifierId == null) {
-                                LOG.info("Notifier did not match for device {} ", deviceRef);
-                                continue;
+                        //find the device notifier info, match it to the payload
+                        for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+                            ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase());
+                            now = System.currentTimeMillis();
+                            String providerId = getProviderId(deviceRef, adapter.getNotifier());
+                            if (providerId != null) {
+                                notifierId = providerId;
+                                notifierKey = entry.getKey().toLowerCase();
+                                break;
                             }
+                            LOG.info("Provider query for notification {} device {} took "+(System.currentTimeMillis()-now)+" ms",notification.getUuid(),deviceRef.getUuid());
+                        }
 
-                            ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
-                            if (notification.getQueued() == null) {
-                                // update queued time
-                                now = System.currentTimeMillis();
-                                notification.setQueued(System.currentTimeMillis());
-                                LOG.info("notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
-                            }
+                        if (notifierId == null) {
+                            LOG.info("Notifier did not match for device {} ", deviceRef);
+                            continue;
+                        }
+
+                        ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
+                        if (notification.getQueued() == null) {
+                            // update queued time
                             now = System.currentTimeMillis();
-                            qm.sendMessage(message);
-                            LOG.info("notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
-                            deviceCount.incrementAndGet();
-                            queueMeter.mark();
+                            notification.setQueued(System.currentTimeMillis());
+                            LOG.info("notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
                         }
-                    } catch (Exception deviceLoopException) {
-                        LOG.error("Failed to add devices", deviceLoopException);
-                        errorMessages.add("Failed to add devices for entity: " + entity.getUuid() + " error:" + deviceLoopException);
+                        now = System.currentTimeMillis();
+                        qm.sendMessage(message);
+                        LOG.info("notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
+                        deviceCount.incrementAndGet();
+                        queueMeter.mark();
                     }
-                    return entity;
+                } catch (Exception deviceLoopException) {
+                    LOG.error("Failed to add devices", deviceLoopException);
+                    errorMessages.add("Failed to add devices for entity: " + entity.getUuid() + " error:" + deviceLoopException);
                 }
+                return entity;
             };
 
             long now = System.currentTimeMillis();
-            Observable o = rx.Observable.create(new IteratorObservable<Entity>(iterator))
-                    .parallel(new Func1<Observable<Entity>, Observable<Entity>>() {
-                        @Override
-                        public rx.Observable<Entity> call(rx.Observable<Entity> deviceObservable) {
-                            return deviceObservable.map(entityListFunct);
-                        }
-                    }, Schedulers.io())
-                    .doOnError(new Action1<Throwable>() {
-                        @Override
-                        public void call(Throwable throwable) {
-                            LOG.error("Failed while writing", throwable);
-                        }
-                    });
-            o.toBlocking().lastOrDefault(null);
-            LOG.info("notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
+
+
+            //process up to 10 concurrently
+            Observable o = rx.Observable.create( new IteratorObservable<Entity>( iterator ) )
+
+                                        .flatMap( entity -> Observable.just( entity ).map( entityListFunct )
+                                                                      .doOnError( throwable -> {
+                                                                          LOG.error( "Failed while writing",
+                                                                              throwable );
+                                                                      } ).subscribeOn( Schedulers.io() )
+
+                                            , 10 );
+
+            o.toBlocking().lastOrDefault( null );
+            LOG.info( "notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
         }
 
         // update queued time
@@ -338,48 +335,39 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                 return message;
             }
         };
-        Observable o = rx.Observable.from(messages)
-                .parallel(new Func1<rx.Observable<QueueMessage>, rx.Observable<ApplicationQueueMessage>>() {
-                    @Override
-                    public rx.Observable<ApplicationQueueMessage> call(rx.Observable<QueueMessage> messageObservable) {
-                        return messageObservable.map(func);
+
+        //from each queue message, process them in parallel up to 10 at a time
+        Observable o = rx.Observable.from( messages ).flatMap( queueMessage -> {
+
+
+            return Observable.just( queueMessage ).map( func ).buffer( messages.size() ).map( queueMessages -> {
+                //for gcm this will actually send notification
+                for ( ProviderAdapter providerAdapter : notifierMap.values() ) {
+                    try {
+                        providerAdapter.doneSendingNotifications();
                     }
-                }, Schedulers.io())
-                .buffer(messages.size())
-                .map(new Func1<List<ApplicationQueueMessage>, HashMap<UUID, ApplicationQueueMessage>>() {
-                    @Override
-                    public HashMap<UUID, ApplicationQueueMessage> call(List<ApplicationQueueMessage> queueMessages) {
-                        //for gcm this will actually send notification
-                        for (ProviderAdapter providerAdapter : notifierMap.values()) {
-                            try {
-                                providerAdapter.doneSendingNotifications();
-                            } catch (Exception e) {
-                                LOG.error("providerAdapter.doneSendingNotifications: ", e);
-                            }
+                    catch ( Exception e ) {
+                        LOG.error( "providerAdapter.doneSendingNotifications: ", e );
+                    }
+                }
+                //TODO: check if a notification is done and mark it
+                HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<>();
+                for ( ApplicationQueueMessage message : queueMessages ) {
+                    if ( notifications.get( message.getNotificationId() ) == null ) {
+                        try {
+                            TaskManager taskManager = taskMap.get( message.getNotificationId() );
+                            notifications.put( message.getNotificationId(), message );
+                            taskManager.finishedBatch();
                         }
-                        //TODO: check if a notification is done and mark it
-                        HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<UUID, ApplicationQueueMessage>();
-                        for (ApplicationQueueMessage message : queueMessages) {
-                            if (notifications.get(message.getNotificationId()) == null) {
-                                try {
-                                    TaskManager taskManager = taskMap.get(message.getNotificationId());
-                                    notifications.put(message.getNotificationId(), message);
-                                    taskManager.finishedBatch();
-                                } catch (Exception e) {
-                                    LOG.error("Failed to finish batch", e);
-                                }
-                            }
-
+                        catch ( Exception e ) {
+                            LOG.error( "Failed to finish batch", e );
                         }
-                        return notifications;
-                    }
-                })
-                .doOnError(new Action1<Throwable>() {
-                    @Override
-                    public void call(Throwable throwable) {
-                        LOG.error("Failed while sending",throwable);
                     }
-                });
+                }
+                return notifications;
+            } ).doOnError( throwable -> LOG.error( "Failed while sending", throwable ) );
+        }, 10 );
+
         return o;
     }
 
@@ -400,7 +388,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
      * {"winphone":"mymessage","apple":"mymessage"}
      * TODO: document this method better
      */
-    private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter> notifierMap) throws Exception {
+    private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter>
+        notifierMap) throws Exception {
         Map<String, Object> translatedPayloads = new HashMap<String, Object>(  payloads.size());
         for (Map.Entry<String, Object> entry : payloads.entrySet()) {
             String payloadKey = entry.getKey().toLowerCase();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java b/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
index e8c5ace..6d0419a 100644
--- a/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
+++ b/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.setup;
 
 
+import java.io.IOException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -106,7 +108,19 @@ public class ConcurrentProcessSingleton {
             barrier.await( ONE_MINUTE );
             logger.info( "Setup to complete" );
 
-            lock.maybeReleaseLock();
+
+            Runtime.getRuntime().addShutdownHook( new Thread(  ){
+                @Override
+                public void run() {
+                    try {
+                        lock.maybeReleaseLock();
+                    }
+                    catch ( IOException e ) {
+                        throw new RuntimeException( "Unable to release lock" );
+                    }
+                }
+            });
+
         }
         catch ( Exception e ) {
             throw new RuntimeException( "Unable to initialize system", e );


[03/12] incubator-usergrid git commit: Removed collections from query, it shouldn't care about this module.

Posted by sf...@apache.org.
Removed collections from query, it shouldn't care about this module.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/72ec19d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/72ec19d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/72ec19d5

Branch: refs/heads/USERGRID-480
Commit: 72ec19d563c4e2a9f9e3842038162aeb8f038222
Parents: 12c2a1a
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Mar 19 10:54:05 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Mar 19 10:54:05 2015 -0600

----------------------------------------------------------------------
 stack/corepersistence/collection/pom.xml        |  14 +-
 .../impl/EntityVersionCleanupTask.java          |  13 +-
 .../mvcc/stage/write/WriteCommit.java           |   2 +-
 .../mvcc/stage/write/WriteUniqueVerify.java     |   8 +-
 .../MvccEntitySerializationStrategyImpl.java    |   3 +-
 .../MvccEntitySerializationStrategyV3Impl.java  |   3 +-
 .../UniqueValueSerializationStrategyImpl.java   |   8 -
 .../migration/MvccEntityDataMigrationImpl.java  |   2 +-
 .../collection/util/EntityUtils.java            |  72 -----
 .../mvcc/stage/AbstractEntityStageTest.java     |   2 +-
 .../mvcc/stage/AbstractMvccEntityStageTest.java |   2 +-
 .../mvcc/stage/TestEntityGenerator.java         |   2 +-
 ...MvccEntitySerializationStrategyImplTest.java |   4 +-
 ...ccEntitySerializationStrategyV1ImplTest.java |   4 +-
 ...ccEntitySerializationStrategyV2ImplTest.java |   2 +-
 .../impl/SerializationComparison.java           |   4 +-
 .../collection/util/InvalidEntityGenerator.java |   1 +
 stack/corepersistence/model/pom.xml             |   1 -
 .../persistence/model/util/EntityUtils.java     |  72 +++++
 stack/corepersistence/queryindex/pom.xml        |   6 -
 .../persistence/index/query/EntityResults.java  | 108 -------
 .../persistence/index/query/Results.java        | 148 ---------
 .../persistence/index/utils/ListUtils.java      |   6 +-
 .../index/guice/TestIndexModule.java            |   3 +-
 .../impl/EntityConnectionIndexImplTest.java     | 306 -------------------
 .../persistence/index/impl/EntityIndexTest.java |  32 +-
 26 files changed, 119 insertions(+), 709 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index 3a89a8e..a4e7219 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -24,10 +24,10 @@
         <artifactId>chop-maven-plugin</artifactId>
         <version>${chop.version}</version>
 
-        
+
         NOTE: you should be putting most of these variables into your settings.xml
         as an automatically activated profile.
-        
+
 
         <configuration>
           <accessKey>${aws.s3.key}</accessKey>
@@ -48,11 +48,11 @@
           <runnerKeyPairName>${runner.keypair.name}</runnerKeyPairName>
           <runnerCount>6</runnerCount>
           <securityGroupExceptions>
-            
+
             Add your own IP address as an exception to allow access
             but please do this in the settings.xml file .. essentially
             all parameters should be in the settings.xml file.
-            
+
             <param>${myip.address}/32:24981</param>
             <param>${myip.address}/32:22</param>
           </securityGroupExceptions>
@@ -81,13 +81,7 @@
 
 
 
-    <!-- lang utils for setting uuids etc -->
 
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
-      <version>${commons.lang.version}</version>
-    </dependency>
 
     <!-- tests -->
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 55d135b..b245528 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -25,15 +25,10 @@ import java.util.UUID;
 
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
-import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.field.Field;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,15 +37,11 @@ import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.model.entity.Id;
 
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 8889b2f..65ba0b4 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -36,7 +36,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
 import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 5bdf3b9..548127c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -19,22 +19,18 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
 import com.netflix.astyanax.model.ConsistencyLevel;
-import com.netflix.hystrix.Hystrix;
 import com.netflix.hystrix.HystrixCommand;
 import com.netflix.hystrix.HystrixCommandGroupKey;
 import com.netflix.hystrix.HystrixThreadPoolProperties;
-import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle;
 
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
 import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
index 5bc9f56..e1445e3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
@@ -23,7 +23,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
@@ -41,7 +40,7 @@ import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeExc
 import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
 import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
 import org.apache.usergrid.persistence.core.astyanax.ColumnParser;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
index ace076b..a5046f6 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
@@ -24,7 +24,7 @@ import org.apache.usergrid.persistence.collection.exception.EntityTooLargeExcept
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
 import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
 import org.apache.usergrid.persistence.core.astyanax.FieldBuffer;
@@ -55,7 +55,6 @@ import com.netflix.astyanax.model.Row;
 import com.netflix.astyanax.model.Rows;
 import com.netflix.astyanax.serializers.AbstractSerializer;
 import com.netflix.astyanax.serializers.BooleanSerializer;
-import com.netflix.astyanax.util.TimeUUIDUtils;
 
 import rx.Observable;
 import rx.Scheduler;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index 108f2e8..c95650c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@ -33,14 +33,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.marshal.BytesType;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
 import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
 import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
 import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
@@ -48,13 +44,10 @@ import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
-import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.Field;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.netflix.astyanax.ColumnListMutation;
@@ -64,7 +57,6 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.Row;
 import com.netflix.astyanax.query.RowQuery;
-import com.netflix.astyanax.serializers.AbstractSerializer;
 import com.netflix.astyanax.util.RangeBuilder;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index bd4eafc..f87b5fd 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -38,7 +38,7 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV3Impl;
 import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
deleted file mode 100644
index 20edb66..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package org.apache.usergrid.persistence.collection.util;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Collection;
-import java.util.UUID;
-import org.apache.usergrid.persistence.model.field.Field;
-
-import org.apache.commons.lang3.reflect.FieldUtils;
-
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- * @author tnine
- */
-public class EntityUtils {
-
-
-    private static final java.lang.reflect.Field VERSION = FieldUtils.getField( Entity.class, "version", true );
-
-    private static final java.lang.reflect.Field ID = FieldUtils.getField( Entity.class, "id", true );
-
-
-    /**
-     * Set the version into the entity
-     */
-    public static void setVersion( Entity entity, UUID version ) {
-
-        try {
-            FieldUtils.writeField( VERSION, entity, version, true );
-        }
-        catch ( IllegalAccessException e ) {
-            throw new RuntimeException( "Unable to set the field " + VERSION + " into the entity", e );
-        }
-    }
-
-
-    /**
-     * Set the id into the entity
-     */
-    public static void setId( Entity entity, Id id ) {
-        try {
-            FieldUtils.writeField( ID, entity, id, true );
-        }
-        catch ( IllegalAccessException e ) {
-            throw new RuntimeException( "Unable to set the field " + ID + " into the entity", e );
-        }
-    }
-
-
-    /**
-     * Get all unique fields on an entity
-     * @param entity
-     * @return
-     */
-    public static List<Field> getUniqueFields( Entity entity ) {
-        final Collection<Field> entityFields = entity.getFields();
-
-        //preallocate to max possible for more efficient runtime
-        final List<Field> possibleFields = new ArrayList<>( entityFields.size() );
-
-        for ( Field field : entity.getFields() ) {
-            if ( field.isUnique() ) {
-                possibleFields.add( field );
-            }
-        }
-        return possibleFields;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/AbstractEntityStageTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/AbstractEntityStageTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/AbstractEntityStageTest.java
index c79fcdb..2546790 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/AbstractEntityStageTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/AbstractEntityStageTest.java
@@ -25,7 +25,7 @@ import org.junit.experimental.theories.Theory;
 import org.junit.runner.RunWith;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
 import org.apache.usergrid.persistence.collection.util.InvalidEntityGenerator;
 import org.apache.usergrid.persistence.collection.util.InvalidIdGenerator;
 import org.apache.usergrid.persistence.model.entity.Entity;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/AbstractMvccEntityStageTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/AbstractMvccEntityStageTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/AbstractMvccEntityStageTest.java
index 6c6ea0f..dff0a83 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/AbstractMvccEntityStageTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/AbstractMvccEntityStageTest.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
 import org.apache.usergrid.persistence.collection.util.InvalidEntityGenerator;
 import org.apache.usergrid.persistence.collection.util.InvalidIdGenerator;
 import org.apache.usergrid.persistence.collection.util.InvalidMvccEntityGenerator;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/TestEntityGenerator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/TestEntityGenerator.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/TestEntityGenerator.java
index 4713f5a..7a415f0 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/TestEntityGenerator.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/TestEntityGenerator.java
@@ -21,7 +21,7 @@ package org.apache.usergrid.persistence.collection.mvcc.stage;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
index 3c89b31..07a4b1b 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
@@ -35,7 +35,7 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
 import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -55,8 +55,6 @@ import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
-import junit.framework.Assert;
-
 import static junit.framework.TestCase.assertEquals;
 import static junit.framework.TestCase.assertFalse;
 import static junit.framework.TestCase.assertNotNull;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1ImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1ImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1ImplTest.java
index b0dba5d..ff8d743 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1ImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1ImplTest.java
@@ -31,7 +31,7 @@ import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
 import org.apache.usergrid.persistence.core.test.ITRunner;
 import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -49,8 +49,6 @@ import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
-import net.jcip.annotations.NotThreadSafe;
-
 
 @RunWith( ITRunner.class )
 @UseModules( TestCollectionModule.class )

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2ImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2ImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2ImplTest.java
index b91c453..c64940a 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2ImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2ImplTest.java
@@ -31,7 +31,7 @@ import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
 import org.apache.usergrid.persistence.core.test.ITRunner;
 import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.model.entity.Entity;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationComparison.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationComparison.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationComparison.java
index 4f19f28..6383e75 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationComparison.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationComparison.java
@@ -28,7 +28,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.field.BooleanField;
@@ -43,7 +43,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 
 /**
- * TODO We need to get both of these serialization methods working, and benchmark them for 
+ * TODO We need to get both of these serialization methods working, and benchmark them for
  * comparison Neither works out of the box for us without custom work.
  *
  * @author tnine

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/InvalidEntityGenerator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/InvalidEntityGenerator.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/InvalidEntityGenerator.java
index 5b82ac8..9bba866 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/InvalidEntityGenerator.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/InvalidEntityGenerator.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/model/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/model/pom.xml b/stack/corepersistence/model/pom.xml
index fb7419b..fe88256 100644
--- a/stack/corepersistence/model/pom.xml
+++ b/stack/corepersistence/model/pom.xml
@@ -39,7 +39,6 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
             <version>${commons.lang.version}</version>
-            <scope>test</scope>
         </dependency>
 
       <!-- the core, which includes Streaming API, shared low-level abstractions (but NOT data-binding) -->

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/EntityUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/EntityUtils.java b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/EntityUtils.java
new file mode 100644
index 0000000..929a563
--- /dev/null
+++ b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/EntityUtils.java
@@ -0,0 +1,72 @@
+package org.apache.usergrid.persistence.model.util;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.usergrid.persistence.model.field.Field;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * @author tnine
+ */
+public class EntityUtils {
+
+
+    private static final java.lang.reflect.Field VERSION = FieldUtils.getField( Entity.class, "version", true );
+
+    private static final java.lang.reflect.Field ID = FieldUtils.getField( Entity.class, "id", true );
+
+
+    /**
+     * Set the version into the entity
+     */
+    public static void setVersion( Entity entity, UUID version ) {
+
+        try {
+            FieldUtils.writeField( VERSION, entity, version, true );
+        }
+        catch ( IllegalAccessException e ) {
+            throw new RuntimeException( "Unable to set the field " + VERSION + " into the entity", e );
+        }
+    }
+
+
+    /**
+     * Set the id into the entity
+     */
+    public static void setId( Entity entity, Id id ) {
+        try {
+            FieldUtils.writeField( ID, entity, id, true );
+        }
+        catch ( IllegalAccessException e ) {
+            throw new RuntimeException( "Unable to set the field " + ID + " into the entity", e );
+        }
+    }
+
+
+    /**
+     * Get all unique fields on an entity
+     * @param entity
+     * @return
+     */
+    public static List<Field> getUniqueFields( Entity entity ) {
+        final Collection<Field> entityFields = entity.getFields();
+
+        //preallocate to max possible for more efficient runtime
+        final List<Field> possibleFields = new ArrayList<>( entityFields.size() );
+
+        for ( Field field : entity.getFields() ) {
+            if ( field.isUnique() ) {
+                possibleFields.add( field );
+            }
+        }
+        return possibleFields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/queryindex/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/pom.xml b/stack/corepersistence/queryindex/pom.xml
index 5f01ee7..2dc40ce 100644
--- a/stack/corepersistence/queryindex/pom.xml
+++ b/stack/corepersistence/queryindex/pom.xml
@@ -92,12 +92,6 @@
 
         <!-- major dependencies -->
 
-        <dependency>
-            <groupId>${project.parent.groupId}</groupId>
-            <artifactId>collection</artifactId>
-            <version>${project.version}</version>
-            <type>jar</type>
-        </dependency>
 
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/EntityResults.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/EntityResults.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/EntityResults.java
deleted file mode 100644
index 59f20dd..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/EntityResults.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.index.query;
-
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.index.utils.UUIDUtils;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-
-/**
- * Loads results from candidate results.  This needs to be refactored to the calling module, 
- * and should not exist in the query index
- */
-public class EntityResults implements Iterable<Entity>, Iterator<Entity> {
-
-
-    private final CandidateResults results;
-    private final EntityCollectionManager ecm;
-    private final UUID maxVersion;
-    private final Iterator<CandidateResult> itr;
-    private Entity next = null;
-
-
-    public EntityResults( final CandidateResults results, final EntityCollectionManager ecm, final UUID maxVersion ) {
-        this.results = results;
-        this.ecm = ecm;
-        this.maxVersion = maxVersion;
-        this.itr = results.iterator();
-    }
-
-
-    @Override
-    public Iterator<Entity> iterator() {
-        return this;
-    }
-
-
-    @Override
-    public boolean hasNext() {
-       if(next == null){
-           doAdvance();
-       }
-
-       return next != null;
-    }
-
-
-    /**
-     * Advance to our next candidate so that it is available
-     */
-    private void doAdvance(){
-        while(itr.hasNext() && next == null){
-            CandidateResult candidate = itr.next();
-
-            // our candidate is > our max, we can't use it
-            if( UUIDUtils.compare( candidate.getVersion(), maxVersion ) > 0){
-                continue;
-            }
-
-            // our candidate was too new, ignore it
-            next = ecm.load( candidate.getId() ).toBlocking().single();
-        }
-    }
-
-
-    @Override
-    public Entity next() {
-        if(!hasNext()){
-            throw new NoSuchElementException("No more elements in the iterator");
-        }
-
-
-        Entity result =  next;
-
-        next = null;
-
-        return result;
-    }
-
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException( "Remove is not supported" );
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Results.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Results.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Results.java
deleted file mode 100644
index 89745d0..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Results.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.index.query;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-
-
-@XmlRootElement
-public class Results implements Iterable<Entity> {
-
-    private static final Logger log = LoggerFactory.getLogger(Results.class);
-
-    private String cursor = null;
-
-    private final Query query;
-    private final List<Id> ids = new ArrayList<Id>();
-
-    private Entity entity = null;
-    private List<Entity> entities = null;
-
-    private final List<CandidateResult> candidates;
-
-    final EntityCollectionManagerFactory ecmf;
-
-
-    public Results( Query query, List<CandidateResult> candidates, 
-            EntityCollectionManagerFactory ecmf ) {
-
-        this.query = query;
-        this.candidates = candidates;
-        this.ecmf = ecmf;
-        for ( CandidateResult candidate : candidates ) {
-            ids.add( candidate.getId() );
-        }
-    }
-
-
-    public boolean hasCursor() {
-        return cursor != null;
-    }
-
-
-    public String getCursor() {
-        return cursor;
-    }
-
-
-    public void setCursor(String cursor) {
-        this.cursor = cursor;
-    }
-
-
-    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-    public Query getQuery() {
-        return query;
-    }
-
-
-    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-    public List<Id> getIds() {
-        return Collections.unmodifiableList(ids);
-    }
-
-
-    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-    public List<Entity> getEntities() {
-        return getEntities(false);
-    }
-
-    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-    public List<Entity> getEntities(Boolean takeAllVersions) {
-
-        if ( entities == null ) {
-
-            entities = new ArrayList<Entity>();
-
-            EntityCollectionManager ecm = null;
-
-            for ( CandidateResult candidate : candidates ) {
-
-                Entity entity = ecm.load( candidate.getId() ).toBlocking().last();
-                if ( !takeAllVersions && candidate.getVersion().compareTo(entity.getVersion()) == -1) {
-                    log.debug("   Stale hit {} version {}", entity.getId(), entity.getVersion() );
-                    continue;
-                }
-
-                entities.add(entity);
-            }
-        }
-
-        return Collections.unmodifiableList( entities );
-    }
-
-
-    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-    public Entity getEntity() {
-        if ( size() > 0 ) {
-            return getEntities().get(0);
-        }
-        return null;
-    }
-
-
-    public int size() {
-        return ids.size();
-    }
-
-
-    public boolean isEmpty() {
-        return ids.isEmpty();
-    }
-
-
-    @Override
-    public Iterator<Entity> iterator() {
-        return getEntities().iterator();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
index 6c7b480..d588476 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
@@ -22,13 +22,11 @@ import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.commons.lang.math.NumberUtils;
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
-import org.apache.usergrid.persistence.model.entity.Id;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang.math.NumberUtils;
+
 
 public class ListUtils extends org.apache.commons.collections.ListUtils {
     private static final Logger LOG = LoggerFactory.getLogger( ListUtils.class );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 23dfe06..4cf46d6 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -21,9 +21,8 @@ package org.apache.usergrid.persistence.index.guice;
 
 import org.safehaus.guicyfig.GuicyFigModule;
 
-import org.apache.usergrid.persistence.collection.guice.CollectionModule;
-import org.apache.usergrid.persistence.core.guice.TestModule;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
+import org.apache.usergrid.persistence.core.guice.TestModule;
 
 
 public class TestIndexModule extends TestModule {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
deleted file mode 100644
index a399809..0000000
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.index.impl;
-
-
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.index.exceptions.QueryParseException;
-import org.apache.usergrid.persistence.index.guice.TestIndexModule;
-import org.apache.usergrid.persistence.index.query.CandidateResult;
-import org.apache.usergrid.persistence.index.query.CandidateResults;
-import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.inject.Inject;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-
-@RunWith( EsRunner.class )
-@UseModules( { TestIndexModule.class } )
-public class EntityConnectionIndexImplTest extends BaseIT {
-
-    private static final Logger log = LoggerFactory.getLogger( EntityConnectionIndexImplTest.class );
-
-    //    @ClassRule
-    //    public static ElasticSearchResource es = new ElasticSearchResource();
-
-
-    @Inject
-    public EntityIndexFactory ecif;
-
-
-    @Test
-    public void testBasicOperation() throws IOException, InterruptedException {
-
-        Id appId = new SimpleId( "application" );
-        ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
-
-        // create a muffin
-        Entity muffin = new Entity( new SimpleId( UUIDGenerator.newTimeUUID(), "muffin" ) );
-
-        muffin = EntityIndexMapUtils.fromMap( muffin, new HashMap<String, Object>() {{
-            put( "size", "Large" );
-            put( "flavor", "Blueberry" );
-            put( "stars", 5 );
-        }} );
-        EntityUtils.setVersion( muffin, UUIDGenerator.newTimeUUID() );
-
-        Entity egg = new Entity( new SimpleId( UUIDGenerator.newTimeUUID(), "egg" ) );
-
-        egg = EntityIndexMapUtils.fromMap( egg, new HashMap<String, Object>() {{
-            put( "size", "Large" );
-            put( "type", "scramble" );
-            put( "stars", 5 );
-        }} );
-        EntityUtils.setVersion( egg, UUIDGenerator.newTimeUUID() );
-
-        Entity oj = new Entity( new SimpleId( UUIDGenerator.newTimeUUID(), "juice" ) );
-
-        oj = EntityIndexMapUtils.fromMap( oj, new HashMap<String, Object>() {{
-            put( "size", "Large" );
-            put( "type", "pulpy" );
-            put( "stars", 3 );
-        }} );
-        EntityUtils.setVersion( oj, UUIDGenerator.newTimeUUID() );
-
-
-        // create a person who likes muffins
-        Id personId = new SimpleId( UUIDGenerator.newTimeUUID(), "person" );
-
-
-        assertNotNull( personId );
-        assertNotNull( personId.getType() );
-        assertNotNull( personId.getUuid() );
-
-        // index connection of "person Dave likes Large Blueberry muffin"
-
-        IndexScope searchScope = new IndexScopeImpl( personId, "likes" );
-
-        //create another scope we index in, want to be sure these scopes are filtered
-        IndexScope otherIndexScope =
-                new IndexScopeImpl( new SimpleId( UUIDGenerator.newTimeUUID(), "animal" ), "likes" );
-
-        EntityIndex personLikesIndex = ecif.createEntityIndex( applicationScope );
-        personLikesIndex.initializeIndex();
-
-        EntityIndexBatch batch = personLikesIndex.createBatch();
-
-        //add to both scopes
-
-        //add a muffin
-        batch.index( searchScope, muffin );
-        batch.index( otherIndexScope, muffin );
-
-        //add the eggs
-        batch.index( searchScope, egg );
-        batch.index( otherIndexScope, egg );
-
-        //add the oj
-        batch.index( searchScope, oj );
-        batch.index( otherIndexScope, oj );
-
-        batch.execute().get();
-        personLikesIndex.refresh();
-
-
-        Thread.sleep( 2000 );
-
-        // now, let's search for muffins
-        CandidateResults likes = personLikesIndex
-                .search( searchScope, SearchTypes.fromTypes( muffin.getId().getType() ), Query.fromQL( "select *" ) );
-        assertEquals( 1, likes.size() );
-        assertEquals( muffin.getId(), likes.get( 0 ).getId() );
-
-        // now, let's search for egg
-        likes = personLikesIndex
-                .search( searchScope, SearchTypes.fromTypes( egg.getId().getType() ), Query.fromQL( "select *" ) );
-        assertEquals( 1, likes.size() );
-        assertEquals( egg.getId(), likes.get( 0 ).getId() );
-
-        // search for OJ
-        likes = personLikesIndex
-                .search( searchScope, SearchTypes.fromTypes( oj.getId().getType() ), Query.fromQL( "select *" ) );
-        assertEquals( 1, likes.size() );
-        assertEquals( oj.getId(), likes.get( 0 ).getId() );
-
-
-        //now lets search for all explicitly
-        likes = personLikesIndex.search( searchScope,
-                SearchTypes.fromTypes( muffin.getId().getType(), egg.getId().getType(), oj.getId().getType() ),
-                Query.fromQL( "select *" ) );
-        assertEquals( 3, likes.size() );
-        assertContains( egg.getId(), likes );
-        assertContains( muffin.getId(), likes );
-        assertContains( oj.getId(), likes );
-
-        //now lets search for all explicitly
-        likes = personLikesIndex.search( searchScope, SearchTypes.allTypes(), Query.fromQL( "select *" ) );
-        assertEquals( 3, likes.size() );
-        assertContains( egg.getId(), likes );
-        assertContains( muffin.getId(), likes );
-        assertContains( oj.getId(), likes );
-
-
-        //now search all entity types with a query that returns a subset
-        likes = personLikesIndex.search( searchScope,
-                SearchTypes.fromTypes( muffin.getId().getType(), egg.getId().getType(), oj.getId().getType() ),
-                Query.fromQL( "select * where stars = 5" ) );
-        assertEquals( 2, likes.size() );
-        assertContains( egg.getId(), likes );
-        assertContains( muffin.getId(), likes );
-
-
-        //now search with no types, we should get only the results that match
-        likes = personLikesIndex
-                .search( searchScope, SearchTypes.allTypes(), Query.fromQL( "select * where stars = 5" ) );
-        assertEquals( 2, likes.size() );
-        assertContains( egg.getId(), likes );
-        assertContains( muffin.getId(), likes );
-    }
-
-
-    @Test
-    public void testDelete() throws IOException, InterruptedException {
-
-        Id appId = new SimpleId( "application" );
-        ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
-
-        // create a muffin
-        Entity muffin = new Entity( new SimpleId( UUIDGenerator.newTimeUUID(), "muffin" ) );
-
-        muffin = EntityIndexMapUtils.fromMap( muffin, new HashMap<String, Object>() {{
-            put( "size", "Large" );
-            put( "flavor", "Blueberry" );
-            put( "stars", 5 );
-        }} );
-        EntityUtils.setVersion( muffin, UUIDGenerator.newTimeUUID() );
-
-        Entity egg = new Entity( new SimpleId( UUIDGenerator.newTimeUUID(), "egg" ) );
-
-        egg = EntityIndexMapUtils.fromMap( egg, new HashMap<String, Object>() {{
-            put( "size", "Large" );
-            put( "type", "scramble" );
-            put( "stars", 5 );
-        }} );
-        EntityUtils.setVersion( egg, UUIDGenerator.newTimeUUID() );
-
-        Entity oj = new Entity( new SimpleId( UUIDGenerator.newTimeUUID(), "juice" ) );
-
-        oj = EntityIndexMapUtils.fromMap( oj, new HashMap<String, Object>() {{
-            put( "size", "Large" );
-            put( "type", "pulpy" );
-            put( "stars", 3 );
-        }} );
-        EntityUtils.setVersion( oj, UUIDGenerator.newTimeUUID() );
-
-
-        // create a person who likes muffins
-        Id personId = new SimpleId( UUIDGenerator.newTimeUUID(), "person" );
-
-
-        assertNotNull( personId );
-        assertNotNull( personId.getType() );
-        assertNotNull( personId.getUuid() );
-
-        // index connection of "person Dave likes Large Blueberry muffin"
-
-        IndexScope searchScope = new IndexScopeImpl( personId, "likes" );
-
-        //create another scope we index in, want to be sure these scopes are filtered
-        IndexScope otherIndexScope =
-                new IndexScopeImpl( new SimpleId( UUIDGenerator.newTimeUUID(), "animal" ), "likes" );
-
-        EntityIndex personLikesIndex = ecif.createEntityIndex( applicationScope );
-        personLikesIndex.initializeIndex();
-
-        EntityIndexBatch batch = personLikesIndex.createBatch();
-
-        //add to both scopes
-
-        //add a muffin
-        batch.index( searchScope, muffin );
-        batch.index( otherIndexScope, muffin );
-
-        //add the eggs
-        batch.index( searchScope, egg );
-        batch.index( otherIndexScope, egg );
-
-        //add the oj
-        batch.index( searchScope, oj );
-        batch.index( otherIndexScope, oj );
-
-        batch.execute().get();
-        personLikesIndex.refresh();
-
-
-        // now, let's search for muffins
-        CandidateResults likes = personLikesIndex.search( searchScope,
-                SearchTypes.fromTypes( muffin.getId().getType(), egg.getId().getType(), oj.getId().getType() ),
-                Query.fromQL( "select *" ) );
-        assertEquals( 3, likes.size() );
-        assertContains( egg.getId(), likes );
-        assertContains( muffin.getId(), likes );
-        assertContains( oj.getId(), likes );
-
-
-        //now delete them
-        batch.deindex( searchScope, egg );
-        batch.deindex( searchScope, muffin );
-        batch.deindex( searchScope, oj );
-        batch.execute().get();
-        personLikesIndex.refresh();
-
-        likes = personLikesIndex.search( searchScope,
-                SearchTypes.fromTypes( muffin.getId().getType(), egg.getId().getType(), oj.getId().getType() ),
-                Query.fromQL( "select *" ) );
-        assertEquals( 0, likes.size() );
-    }
-
-
-    private void assertContains( final Id id, final CandidateResults results ) {
-        for ( CandidateResult result : results ) {
-            if ( result.getId().equals( id ) ) {
-                return;
-            }
-        }
-
-        fail( String.format( "Could not find id %s in candidate results", id ) );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72ec19d5/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index ca9bf79..39a16b1 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -20,16 +20,15 @@ package org.apache.usergrid.persistence.index.impl;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
-import org.apache.usergrid.persistence.index.*;
-import org.apache.usergrid.persistence.model.field.ArrayField;
-import org.apache.usergrid.persistence.model.field.EntityObjectField;
-import org.apache.usergrid.persistence.model.field.UUIDField;
-import org.apache.usergrid.persistence.model.field.value.EntityObject;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -39,11 +38,17 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.commons.lang3.time.StopWatch;
 
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.core.util.Health;
+import org.apache.usergrid.persistence.index.AliasedEntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.guice.TestIndexModule;
 import org.apache.usergrid.persistence.index.query.CandidateResults;
 import org.apache.usergrid.persistence.index.query.Query;
@@ -51,7 +56,12 @@ import org.apache.usergrid.persistence.index.utils.UUIDUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.ArrayField;
+import org.apache.usergrid.persistence.model.field.EntityObjectField;
 import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.persistence.model.field.UUIDField;
+import org.apache.usergrid.persistence.model.field.value.EntityObject;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -59,7 +69,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Maps;
 import com.google.inject.Inject;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 
 @RunWith(EsRunner.class)


[09/12] incubator-usergrid git commit: Fixes test that won't work when the system runs too quickly.

Posted by sf...@apache.org.
Fixes test that won't work when the system runs too quickly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6dc742e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6dc742e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6dc742e4

Branch: refs/heads/USERGRID-480
Commit: 6dc742e402bf75a8cf6bb0f2fa3ffc4c07367263
Parents: 5889a3b
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 20 07:30:55 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 20 07:30:55 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/graph/GraphManagerIT.java | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6dc742e4/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
index ef5d69d..05b36fe 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
@@ -964,20 +964,25 @@ public abstract class GraphManagerIT {
         Id targetId2 = new SimpleId( "target2" );
 
 
-        Edge edge1 = createEdge( sourceId, "test", targetId1, System.currentTimeMillis() );
+        long startTime = System.currentTimeMillis();
+
+        long edge1Time = startTime;
+        long edge2Time = edge1Time+1;
+
+        final long maxVersion= edge2Time;
+
+        Edge edge1 = createEdge( sourceId, "test", targetId1, edge1Time);
 
         gm.writeEdge( edge1 ).toBlocking().singleOrDefault( null );
 
-        Edge edge2 = createEdge( sourceId, "test", targetId2, System.currentTimeMillis() );
+        Edge edge2 = createEdge( sourceId, "test", targetId2, edge2Time );
 
         gm.writeEdge( edge2 ).toBlocking().singleOrDefault( null );
 
 
-        final long maxVersion = System.currentTimeMillis();
-
 
-        assertTrue( Long.compare( maxVersion, edge2.getTimestamp() ) > 0 );
-        assertTrue( Long.compare( maxVersion, edge1.getTimestamp() ) > 0 );
+        assertTrue( Long.compare( maxVersion, edge2.getTimestamp() ) >= 0 );
+        assertTrue( Long.compare( maxVersion, edge1.getTimestamp() ) >= 0 );
 
 
         //get our 2 edges


[05/12] incubator-usergrid git commit: First pass at upgrading to java 8 and latest RX java

Posted by sf...@apache.org.
First pass at upgrading to java 8 and latest RX java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/282e2271
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/282e2271
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/282e2271

Branch: refs/heads/USERGRID-480
Commit: 282e22712890cdda0439a5694810cff632526d7b
Parents: 72ec19d
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Mar 19 18:00:57 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Mar 19 18:00:57 2015 -0600

----------------------------------------------------------------------
 stack/core/pom.xml                              |  26 +--
 .../corepersistence/CpEntityManager.java        |   2 +-
 .../corepersistence/CpEntityManagerFactory.java |   7 +-
 .../corepersistence/CpRelationManager.java      |  24 +--
 .../usergrid/corepersistence/CpWalker.java      |  77 +++-----
 .../events/EntityVersionDeletedHandler.java     |  72 +++----
 .../migration/EntityTypeMappingMigration.java   |  41 ++--
 .../migration/EntityTypeMappingMigrationIT.java |   2 +-
 .../impl/EntityCollectionManagerImpl.java       |  10 +-
 .../collection/impl/EntityDeletedTask.java      |  20 +-
 .../impl/EntityVersionCleanupTask.java          |  27 +--
 .../impl/EntityVersionCreatedTask.java          |  26 +--
 .../MvccEntitySerializationStrategyImpl.java    |  89 +++------
 .../MvccEntitySerializationStrategyV3Impl.java  |  91 +++------
 .../migration/MvccEntityDataMigrationImpl.java  | 169 +++++++---------
 .../persistence/collection/rx/ParallelTest.java |  10 +-
 ...ctMvccEntityDataMigrationV1ToV3ImplTest.java |   2 +-
 stack/corepersistence/common/pom.xml            |  15 +-
 .../astyanax/MultiKeyColumnNameIterator.java    |   4 +-
 .../MultiKeyColumnNameIteratorTest.java         | 187 ++++++++----------
 .../astyanax/MultiRowColumnIteratorTest.java    |  50 ++---
 .../graph/impl/GraphManagerImpl.java            |   6 +-
 .../graph/impl/stage/EdgeMetaRepairImpl.java    |   2 +
 .../impl/stage/NodeDeleteListenerImpl.java      |   2 +-
 .../impl/migration/EdgeDataMigrationImpl.java   |  87 ++++-----
 .../graph/GraphManagerShardConsistencyIT.java   |   2 +-
 .../usergrid/persistence/graph/SimpleTest.java  |  12 +-
 .../migration/EdgeDataMigrationImplTest.java    |   2 +-
 stack/corepersistence/pom.xml                   |   8 +-
 .../index/impl/IndexLoadTestsIT.java            | 105 ++++------
 stack/pom.xml                                   |   8 +-
 .../management/importer/ImportServiceImpl.java  |  34 ++--
 .../impl/ApplicationQueueManagerImpl.java       | 195 +++++++++----------
 .../setup/ConcurrentProcessSingleton.java       |  16 +-
 34 files changed, 604 insertions(+), 826 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index 971ee62..119a52b 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -130,15 +130,7 @@
           </execution>
         </executions>
       </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <version>2.3.2</version>
-        <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
-        </configuration>
-      </plugin>
+
 
 <!--            <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
@@ -481,17 +473,11 @@
       <version>${metrics.version}</version>
     </dependency>
 
-    <dependency>
-      <groupId>com.netflix.rxjava</groupId>
-      <artifactId>rxjava-core</artifactId>
-      <version>${rx.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.netflix.rxjava</groupId>
-      <artifactId>rxjava-math</artifactId>
-      <version>${rx.version}</version>
-    </dependency>
+      <dependency>
+          <groupId>io.reactivex</groupId>
+          <artifactId>rxjava</artifactId>
+          <version>${rx.version}</version>
+      </dependency>
 
     <dependency>
       <groupId>com.clearspring.analytics</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 789e640..9cffdaf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -1097,7 +1097,7 @@ public class CpEntityManager implements EntityManager {
         } );
 
         //TODO: does this call and others like it need a graphite reporter?
-        cpEntity = ecm.write( cpEntity ).toBlockingObservable().last();
+        cpEntity = ecm.write( cpEntity ).toBlocking().last();
 
         logger.debug( "Wrote {}:{} version {}", new Object[] {
                 cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion()

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index f76b9fc..83c3d85 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -451,7 +451,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
                 fromEntityId, edgeType, Long.MAX_VALUE,
                 SearchByEdgeType.Order.DESCENDING, null ));
 
-        Iterator<Edge> iter = edges.toBlockingObservable().getIterator();
+        //TODO This is wrong, and will result in OOM if there are too many applications.  This needs to stream properly with a buffer
+        Iterator<Edge> iter = edges.toBlocking().getIterator();
         while ( iter.hasNext() ) {
 
             Edge edge = iter.next();
@@ -469,7 +470,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
             org.apache.usergrid.persistence.model.entity.Entity e =
                     managerCache.getEntityCollectionManager( collScope ).load( targetId )
-                        .toBlockingObservable().lastOrDefault(null);
+                        .toBlocking().lastOrDefault(null);
 
             if ( e == null ) {
                 logger.warn("Applicaion {} in index but not found in collections", targetId );
@@ -624,7 +625,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     public long performEntityCount() {
         //TODO, this really needs to be a task that writes this data somewhere since this will get
         //progressively slower as the system expands
-        return (Long) getAllEntitiesObservable().longCount().toBlocking().last();
+        return (Long) getAllEntitiesObservable().countLong().toBlocking().last();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 2eeee28..c4e970d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -280,7 +280,7 @@ public class CpRelationManager implements RelationManager {
         Observable<String> types= gm.getEdgeTypesFromSource(
             new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix,  null ));
 
-        Iterator<String> iter = types.toBlockingObservable().getIterator();
+        Iterator<String> iter = types.toBlocking().getIterator();
         while ( iter.hasNext() ) {
             indexes.add( iter.next() );
         }
@@ -346,7 +346,7 @@ public class CpRelationManager implements RelationManager {
             Observable<Edge> edges = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
                 cpHeadEntity.getId(), etype, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ));
 
-            Iterator<Edge> iter = edges.toBlockingObservable().getIterator();
+            Iterator<Edge> iter = edges.toBlocking().getIterator();
             while ( iter.hasNext() ) {
                 Edge edge = iter.next();
 
@@ -383,7 +383,7 @@ public class CpRelationManager implements RelationManager {
         final GraphManager gm = managerCache.getGraphManager( applicationScope );
 
         Iterator<String> edgeTypesToTarget = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(
-            cpHeadEntity.getId(), null, null) ).toBlockingObservable().getIterator();
+            cpHeadEntity.getId(), null, null) ).toBlocking().getIterator();
 
         logger.debug("updateContainingCollectionsAndCollections(): "
                 + "Searched for edges to target {}:{}\n   in scope {}\n   found: {}",
@@ -484,7 +484,7 @@ public class CpRelationManager implements RelationManager {
             SearchByEdgeType.Order.DESCENDING,
             null ) );
 
-        return edges.toBlockingObservable().firstOrDefault( null ) != null;
+        return edges.toBlocking().firstOrDefault( null ) != null;
     }
 
 
@@ -511,7 +511,7 @@ public class CpRelationManager implements RelationManager {
             SearchByEdgeType.Order.DESCENDING,
             null ) );
 
-        return edges.toBlockingObservable().firstOrDefault( null ) != null;
+        return edges.toBlocking().firstOrDefault( null ) != null;
     }
 
 
@@ -528,7 +528,7 @@ public class CpRelationManager implements RelationManager {
             SearchByEdgeType.Order.DESCENDING,
             null ) ); // last
 
-        Iterator<Edge> iterator = edgesToTarget.toBlockingObservable().getIterator();
+        Iterator<Edge> iterator = edgesToTarget.toBlocking().getIterator();
         int count = 0;
         while ( iterator.hasNext() ) {
             iterator.next();
@@ -569,7 +569,7 @@ public class CpRelationManager implements RelationManager {
         Observable<String> str = gm.getEdgeTypesFromSource(
                 new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) );
 
-        Iterator<String> iter = str.toBlockingObservable().getIterator();
+        Iterator<String> iter = str.toBlocking().getIterator();
         while ( iter.hasNext() ) {
             String edgeType = iter.next();
             indexes.add( CpNamingUtils.getCollectionName( edgeType ) );
@@ -692,7 +692,7 @@ public class CpRelationManager implements RelationManager {
         // create graph edge connection from head entity to member entity
         Edge edge = new SimpleEdge( cpHeadEntity.getId(), edgeType, memberEntity.getId(), uuidHash );
         GraphManager gm = managerCache.getGraphManager( applicationScope );
-        gm.writeEdge( edge ).toBlockingObservable().last();
+        gm.writeEdge( edge ).toBlocking().last();
 
         logger.debug( "Wrote edgeType {}\n   from {}:{}\n   to {}:{}\n   scope {}:{}",
             new Object[] {
@@ -855,7 +855,7 @@ public class CpRelationManager implements RelationManager {
                 cpHeadEntity.getId(),
                 CpNamingUtils.getEdgeTypeFromCollectionName( collName ),
                 memberEntity.getId(), UUIDUtils.getUUIDLong( memberEntity.getId().getUuid() ) );
-        gm.deleteEdge( collectionToItemEdge ).toBlockingObservable().last();
+        gm.deleteEdge( collectionToItemEdge ).toBlocking().last();
 
         // remove edge from item to collection
         Edge itemToCollectionEdge = new SimpleEdge(
@@ -865,7 +865,7 @@ public class CpRelationManager implements RelationManager {
                 cpHeadEntity.getId(),
                 UUIDUtils.getUUIDLong( cpHeadEntity.getId().getUuid() ) );
 
-        gm.deleteEdge( itemToCollectionEdge ).toBlockingObservable().last();
+        gm.deleteEdge( itemToCollectionEdge ).toBlocking().last();
 
         // special handling for roles collection of a group
         if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
@@ -1058,7 +1058,7 @@ public class CpRelationManager implements RelationManager {
                 cpHeadEntity.getId(), edgeType, targetEntity.getId(), System.currentTimeMillis() );
 
         GraphManager gm = managerCache.getGraphManager( applicationScope );
-        gm.writeEdge( edge ).toBlockingObservable().last();
+        gm.writeEdge( edge ).toBlocking().last();
 
         EntityIndex ei = managerCache.getEntityIndex( applicationScope );
         EntityIndexBatch batch = ei.createBatch();
@@ -1290,7 +1290,7 @@ public class CpRelationManager implements RelationManager {
                 System.currentTimeMillis() );
 
         GraphManager gm = managerCache.getGraphManager( applicationScope );
-        gm.deleteEdge( edge ).toBlockingObservable().last();
+        gm.deleteEdge( edge ).toBlocking().last();
 
         final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
         final EntityIndexBatch batch = ei.createBatch();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index 4b902d8..332d5a8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -104,53 +104,38 @@ public class CpWalker {
         Observable<String> edgeTypes = gm.getEdgeTypesFromSource(
             new SimpleSearchEdgeType( applicationId, edgeType, null ) );
 
-        edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
-            @Override
-            public Observable<Edge> call( final String edgeType ) {
-
-                logger.debug( "Loading edges of type {} from node {}", edgeType, applicationId );
-
-                return gm.loadEdgesFromSource(  new SimpleSearchByEdgeType(
-                    applicationId, edgeType, Long.MAX_VALUE, order , null ) );
-
-            }
-
-        } ).parallel( new Func1<Observable<Edge>, Observable<Edge>>() {
-
-            @Override
-            public Observable<Edge> call( final Observable<Edge> edgeObservable ) { // process edges in parallel
-                return edgeObservable.doOnNext( new Action1<Edge>() { // visit and update then entity
-
-                    @Override
-                    public void call( Edge edge ) {
-
-                        logger.info( "Re-indexing edge {}", edge );
-
-                        EntityRef targetNodeEntityRef = new SimpleEntityRef(
-                            edge.getTargetNode().getType(),
-                            edge.getTargetNode().getUuid() );
-
-                        Entity entity;
-                        try {
-                            entity = em.get( targetNodeEntityRef );
-                        }
-                        catch ( Exception ex ) {
-                            logger.error( "Error getting sourceEntity {}:{}, continuing",
-                                targetNodeEntityRef.getType(),
-                                targetNodeEntityRef.getUuid() );
-                            return;
-                        }
-                        if(entity == null){
-                            return;
-                        }
-                        String collName = CpNamingUtils.getCollectionName( edge.getType() );
-                        visitor.visitCollectionEntry( em, collName, entity );
-                    }
-                } );
-            }
-        }, Schedulers.io() )
+        edgeTypes.flatMap( emittedEdgeType -> {
+
+            logger.debug( "Loading edges of type {} from node {}", edgeType, applicationId );
+
+            return gm.loadEdgesFromSource(
+                new SimpleSearchByEdgeType( applicationId, emittedEdgeType, Long.MAX_VALUE, order, null ) );
+        } ).flatMap( edge -> {
+            //run each edge through it's own scheduler, up to 100 at a time
+            return Observable.just( edge ).doOnNext( edgeValue -> {
+                logger.info( "Re-indexing edge {}", edgeValue );
+
+                EntityRef targetNodeEntityRef =
+                    new SimpleEntityRef( edgeValue.getTargetNode().getType(), edgeValue.getTargetNode().getUuid() );
+
+                Entity entity;
+                try {
+                    entity = em.get( targetNodeEntityRef );
+                }
+                catch ( Exception ex ) {
+                    logger.error( "Error getting sourceEntity {}:{}, continuing", targetNodeEntityRef.getType(),
+                        targetNodeEntityRef.getUuid() );
+                    return;
+                }
+                if ( entity == null ) {
+                    return;
+                }
+                String collName = CpNamingUtils.getCollectionName( edgeValue.getType() );
+                visitor.visitCollectionEntry( em, collName, entity );
+            } ).subscribeOn( Schedulers.io() );
+        }, 100 );
 
         // wait for it to complete
-        .toBlocking().lastOrDefault( null ); // end foreach on edges
+        edgeTypes.toBlocking().lastOrDefault( null ); // end foreach on edges
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index c45949b..23f5a32 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@ -17,53 +17,48 @@
  */
 package org.apache.usergrid.corepersistence.events;
 
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
 
 import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
-import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
 
 import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Action2;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
+
+import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
 
 
 /**
- * Remove Entity index when specific version of Entity is deleted.
- * TODO: do we need this? Don't our version-created and entity-deleted handlers take care of this?
- * If we do need it then it should be wired in via GuiceModule in the corepersistence package.
+ * Remove Entity index when specific version of Entity is deleted. TODO: do we need this? Don't our version-created and
+ * entity-deleted handlers take care of this? If we do need it then it should be wired in via GuiceModule in the
+ * corepersistence package.
  */
 @Singleton
 public class EntityVersionDeletedHandler implements EntityVersionDeleted {
-    private static final Logger logger = LoggerFactory.getLogger(EntityVersionDeletedHandler.class );
-
-
+    private static final Logger logger = LoggerFactory.getLogger( EntityVersionDeletedHandler.class );
 
 
     private final EntityManagerFactory emf;
 
+
     @Inject
     public EntityVersionDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;}
 
 
-
     @Override
     public void versionDeleted( final CollectionScope scope, final Id entityId,
                                 final List<MvccLogEntry> entityVersions ) {
@@ -71,40 +66,33 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
 
         // This check is for testing purposes and for a test that to be able to dynamically turn
         // off and on delete previous versions so that it can test clean-up on read.
-        if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" )) {
+        if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" ) ) {
             return;
         }
 
-        if(logger.isDebugEnabled()) {
-            logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} " + "scope\n   name: {}\n   owner: {}\n   app: {}",
-                new Object[] {
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} "
+                    + "scope\n   name: {}\n   owner: {}\n   app: {}", new Object[] {
                     entityVersions.size(), entityId.getType(), entityId.getUuid(), scope.getName(), scope.getOwner(),
                     scope.getApplication()
                 } );
         }
 
-        CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf;
+        CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
 
         final EntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
 
-        final IndexScope indexScope = new IndexScopeImpl(
-                new SimpleId(scope.getOwner().getUuid(), scope.getOwner().getType()),
-                scope.getName()
-        );
-
-        Observable.from( entityVersions )
-            .collect( ei.createBatch(), new Action2<EntityIndexBatch, MvccLogEntry>() {
-                @Override
-                public void call( final EntityIndexBatch entityIndexBatch, final MvccLogEntry mvccLogEntry ) {
-                    entityIndexBatch.deindex( indexScope, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() );
-                }
-            } ).doOnNext( new Action1<EntityIndexBatch>() {
-            @Override
-            public void call( final EntityIndexBatch entityIndexBatch ) {
+        final IndexScope indexScope =
+            new IndexScopeImpl( new SimpleId( scope.getOwner().getUuid(), scope.getOwner().getType() ),
+                scope.getName() );
+
+        //create our batch, and then collect all of them into a single batch
+        Observable.from( entityVersions ).collect( () -> ei.createBatch(), ( entityIndexBatch, mvccLogEntry ) -> {
+            entityIndexBatch.deindex( indexScope, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() );
+        } )
+            //after our batch is collected, execute it
+            .doOnNext( entityIndexBatch -> {
                 entityIndexBatch.execute();
-            }
-        } ).toBlocking().last();
+            } ).toBlocking().last();
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
index 40ad236..6531d16 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
@@ -37,6 +37,7 @@ import com.google.inject.Inject;
 import rx.Observable;
 import rx.functions.Action1;
 import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -63,36 +64,26 @@ public class EntityTypeMappingMigration implements DataMigration<EntityIdScope>
         final AtomicLong atomicLong = new AtomicLong();
 
 
-        allEntitiesInSystemObservable.getData()
-                                            //process the entities in parallel
-         .parallel( new Func1<Observable<EntityIdScope>, Observable<EntityIdScope>>() {
+        //migrate up to 100 types simultaneously
+        allEntitiesInSystemObservable.getData().flatMap( entityIdScope -> {
+            return Observable.just( entityIdScope ).doOnNext( entityIdScopeObservable -> {
+                final MapScope ms = CpNamingUtils
+                                                 .getEntityTypeMapScope( entityIdScope.getCollectionScope().getApplication() );
 
+                                             final MapManager mapManager = managerCache.getMapManager( ms );
 
-                 @Override
-                 public Observable<EntityIdScope> call( final Observable<EntityIdScope> entityIdScopeObservable ) {
+                                             final UUID entityUuid = entityIdScope.getId().getUuid();
+                                             final String entityType = entityIdScope.getId().getType();
 
-                     //for each entity observable, get the map scope and write it to the map
-                     return entityIdScopeObservable.doOnNext( new Action1<EntityIdScope>() {
-                         @Override
-                         public void call( final EntityIdScope entityIdScope ) {
-                             final MapScope ms = CpNamingUtils
-                                 .getEntityTypeMapScope( entityIdScope.getCollectionScope().getApplication() );
+                                             mapManager.putString( entityUuid.toString(), entityType );
 
-                             final MapManager mapManager = managerCache.getMapManager( ms );
+                                             if ( atomicLong.incrementAndGet() % 100 == 0 ) {
+                                                 observer.update( getMaxVersion(),
+                                                     String.format( "Updated %d entities", atomicLong.get() ) );
+                                             }
 
-                             final UUID entityUuid = entityIdScope.getId().getUuid();
-                             final String entityType = entityIdScope.getId().getType();
-
-                             mapManager.putString( entityUuid.toString(), entityType );
-
-                             if ( atomicLong.incrementAndGet() % 100 == 0 ) {
-                                 observer.update( getMaxVersion(),
-                                     String.format( "Updated %d entities", atomicLong.get() ) );
-                             }
-                         }
-                     } );
-                 }
-             } ).count().toBlocking().last();
+            } ).subscribeOn( Schedulers.io() );
+        }, 100 ).count().toBlocking().last();
 
 
         return getMaxVersion();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
index be7cee4..88f56c8 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
@@ -77,7 +77,7 @@ public class EntityTypeMappingMigrationIT  {
         final MapScope mapScope2 = new MapScopeImpl(applicationId, CpNamingUtils.TYPES_BY_UUID_MAP );
 
 
-        final Observable<EntityIdScope> scopes = Observable.from(idScope1, idScope2);
+        final Observable<EntityIdScope> scopes = Observable.just(idScope1, idScope2);
 
         final TestMigrationDataProvider<EntityIdScope> migrationDataProvider = new TestMigrationDataProvider<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index f565fab..70b5a3a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -235,7 +235,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
 
         final Timer.Context timer = deleteTimer.time();
-        Observable<Id> o = Observable.from(new CollectionIoEvent<Id>(collectionScope, entityId))
+        Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( collectionScope, entityId ) )
             .map(markStart)
             .doOnNext( markCommit )
             .map(new Func1<CollectionIoEvent<MvccEntity>, Id>() {
@@ -284,7 +284,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                     return Observable.empty();
                 }
 
-                return Observable.from(entity.getEntity().get());
+                return Observable.just( entity.getEntity().get() );
             }
         })
             .doOnNext( new Action1<Entity>() {
@@ -449,19 +449,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
                                                                   WriteStart writeState ) {
 
-        return Observable.from( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
+        return Observable.just( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
 
                     @Override
                     public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
 
                         Observable<CollectionIoEvent<MvccEntity>> unique =
-                                Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
+                                Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
                                           .doOnNext( writeVerifyUnique );
 
 
                         // optimistic verification
                         Observable<CollectionIoEvent<MvccEntity>> optimistic =
-                                Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
+                                Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
                                           .doOnNext( writeOptimisticVerify );
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
index 5472645..7620907 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
@@ -127,22 +127,10 @@ public class EntityDeletedTask implements Task<Void> {
 
         LOG.debug( "Started firing {} listeners", listenerSize );
 
-        //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
-        Observable.from(listeners)
-                .parallel( new Func1<Observable<EntityDeleted>, Observable<EntityDeleted>>() {
-
-                    @Override
-                    public Observable<EntityDeleted> call(
-                            final Observable<EntityDeleted> entityVersionDeletedObservable ) {
-
-                        return entityVersionDeletedObservable.doOnNext( new Action1<EntityDeleted>() {
-                            @Override
-                            public void call( final EntityDeleted listener ) {
-                                listener.deleted(collectionScope, entityId, version);
-                            }
-                        } );
-                    }
-                }, Schedulers.io() ).toBlocking().last();
+        //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time
+        Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
+            listener.deleted( collectionScope, entityId, version );
+        } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
 
         LOG.debug( "Finished firing {} listeners", listenerSize );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index b245528..1a7b86b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -159,7 +159,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
                             throw new RuntimeException( "Unable to execute batch mutation", e );
                         }
                     }
-                } ).subscribeOn( Schedulers.io() ).longCount().toBlocking();
+                } ).subscribeOn( Schedulers.io() ).countLong().toBlocking();
 
 
         //start calling the listeners for remove log entries
@@ -201,7 +201,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
                             throw new RuntimeException( "Unable to execute batch mutation", e );
                         }
                     }
-                } ).subscribeOn( Schedulers.io() ).longCount().toBlocking();
+                } ).subscribeOn( Schedulers.io() ).countLong().toBlocking();
 
         //wait or this to complete
         final Long removedCount = uniqueValueCleanup.last();
@@ -232,21 +232,14 @@ public class EntityVersionCleanupTask implements Task<Void> {
         logger.debug( "Started firing {} listeners", listenerSize );
 
         //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
-        Observable.from( listeners )
-                  .parallel( new Func1<Observable<EntityVersionDeleted>, Observable<EntityVersionDeleted>>() {
-
-                      @Override
-                      public Observable<EntityVersionDeleted> call(
-                              final Observable<EntityVersionDeleted> entityVersionDeletedObservable ) {
-
-                          return entityVersionDeletedObservable.doOnNext( new Action1<EntityVersionDeleted>() {
-                              @Override
-                              public void call( final EntityVersionDeleted listener ) {
-                                  listener.versionDeleted( scope, entityId, versions );
-                              }
-                          } );
-                      }
-                  }, Schedulers.io() ).toBlocking().last();
+
+
+        //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time
+        Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
+            listener.versionDeleted( scope, entityId, versions );
+        } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
+
+
 
         logger.debug( "Finished firing {} listeners", listenerSize );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
index 7d3beb1..16a6e77 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
@@ -64,7 +64,7 @@ public class EntityVersionCreatedTask implements Task<Void> {
     @Override
     public Void rejected() {
 
-        // Our task was rejected meaning our queue was full.  
+        // Our task was rejected meaning our queue was full.
         // We need this operation to run, so we'll run it in our current thread
         try {
             call();
@@ -76,7 +76,7 @@ public class EntityVersionCreatedTask implements Task<Void> {
         return null;
     }
 
-    
+
     @Override
     public Void call() throws Exception {
 
@@ -100,22 +100,12 @@ public class EntityVersionCreatedTask implements Task<Void> {
 
         logger.debug( "Started firing {} listeners", listenerSize );
 
-        //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
-        Observable.from(listeners).parallel( 
-            new Func1<Observable<EntityVersionCreated>, Observable<EntityVersionCreated>>() {
-
-                @Override
-                public Observable<EntityVersionCreated> call(
-                    final Observable<EntityVersionCreated> entityVersionCreatedObservable ) {
-
-                    return entityVersionCreatedObservable.doOnNext( new Action1<EntityVersionCreated>() {
-                        @Override
-                        public void call( final EntityVersionCreated listener ) {
-                            listener.versionCreated(collectionScope,entity);
-                        }
-                    } );
-                }
-            }, Schedulers.io() ).toBlocking().last();
+
+        Observable.from( listeners )
+                  .flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
+                      listener.versionCreated( collectionScope, entity );
+                  } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
+
 
         logger.debug( "Finished firing {} listeners", listenerSize );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
index e1445e3..ad1d91a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
@@ -176,77 +176,52 @@ public abstract class MvccEntitySerializationStrategyImpl implements MvccEntityS
         }
 
 
-       final EntitySetImpl entitySetResults =  Observable.from( rowKeys )
-               //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary)
-                                                         .buffer(entitiesPerRequest )
-                                                         .parallel( new Func1<Observable<List<ScopedRowKey
-                <CollectionPrefixedKey<Id>>>>, Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>>>() {
+        final EntitySetImpl entitySetResults = Observable.from( rowKeys )
+            //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary)
+            .buffer( entitiesPerRequest ).flatMap( listObservable -> {
 
 
-            @Override
-            public Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> call(
-                    final Observable<List<ScopedRowKey<CollectionPrefixedKey<Id>>>> listObservable ) {
-
-
-                 //here, we execute our query then emit the items either in parallel, or on the current thread if we have more than 1 request
-                return listObservable.map( new Func1<List<ScopedRowKey<CollectionPrefixedKey<Id>>>,
-                        Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>>() {
-
+                //here, we execute our query then emit the items either in parallel, or on the current thread
+                // if we have more than 1 request
+                return Observable.just( listObservable ).map( scopedRowKeys -> {
 
-                    @Override
-                    public Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> call(
-                            final List<ScopedRowKey<CollectionPrefixedKey<Id>>> scopedRowKeys ) {
-
-                            try {
-                                return keyspace.prepareQuery( columnFamily ).getKeySlice( rowKeys )
-                                                              .withColumnRange( maxVersion, null, false,
-                                                                      1 ).execute().getResult();
-                            }
-                            catch ( ConnectionException e ) {
-                                throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra",
-                                        e );
-                            }
+                    try {
+                        return keyspace.prepareQuery( columnFamily ).getKeySlice( rowKeys )
+                                       .withColumnRange( maxVersion, null, false, 1 ).execute().getResult();
                     }
-                } );
-
-
-
-            }
-        }, scheduler )
-
-               //reduce all the output into a single Entity set
-               .reduce( new EntitySetImpl( entityIds.size() ),
-                new Func2<EntitySetImpl, Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>, EntitySetImpl>() {
-                    @Override
-                    public EntitySetImpl call( final EntitySetImpl entitySet,
-                                               final Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> rows ) {
-
-                        final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> latestEntityColumns = rows.iterator();
+                    catch ( ConnectionException e ) {
+                        throw new CollectionRuntimeException( null, collectionScope,
+                            "An error occurred connecting to cassandra", e );
+                    }
+                } ).subscribeOn( scheduler );
+            }, 10 )
 
-                        while ( latestEntityColumns.hasNext() ) {
-                                   final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> row = latestEntityColumns.next();
+            .reduce( new EntitySetImpl( entityIds.size() ), ( entitySet, rows ) -> {
+                final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> latestEntityColumns =
+                    rows.iterator();
 
-                                   final ColumnList<UUID> columns = row.getColumns();
+                while ( latestEntityColumns.hasNext() ) {
+                    final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> row = latestEntityColumns.next();
 
-                                   if ( columns.size() == 0 ) {
-                                       continue;
-                                   }
+                    final ColumnList<UUID> columns = row.getColumns();
 
-                                   final Id entityId = row.getKey().getKey().getSubKey();
+                    if ( columns.size() == 0 ) {
+                        continue;
+                    }
 
-                                   final Column<UUID> column = columns.getColumnByIndex( 0 );
+                    final Id entityId = row.getKey().getKey().getSubKey();
 
-                                   final MvccEntity parsedEntity =
-                                           new MvccColumnParser( entityId, getEntitySerializer() ).parseColumn( column );
+                    final Column<UUID> column = columns.getColumnByIndex( 0 );
 
-                                    entitySet.addEntity( parsedEntity );
-                               }
+                    final MvccEntity parsedEntity =
+                        new MvccColumnParser( entityId, getEntitySerializer() ).parseColumn( column );
 
+                    entitySet.addEntity( parsedEntity );
+                }
 
 
-                        return entitySet;
-                    }
-                } ).toBlocking().last();
+                return entitySet;
+            } ).toBlocking().last();
 
         return entitySetResults;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
index a5046f6..de959b5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
@@ -185,82 +185,55 @@ public class MvccEntitySerializationStrategyV3Impl implements MvccEntitySerializ
 
 
         final EntitySetImpl entitySetResults = Observable.from( rowKeys )
-                //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary)
-                .buffer( entitiesPerRequest ).parallel(
-                        new Func1<Observable<List<ScopedRowKey<CollectionPrefixedKey<Id>>>>,
-                                Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>>>() {
+            //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary)
+            .buffer( entitiesPerRequest ).flatMap( listObservable -> {
 
 
-                            @Override
-                            public Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>> call(
-                                    final Observable<List<ScopedRowKey<CollectionPrefixedKey<Id>>>> listObservable ) {
+                //here, we execute our query then emit the items either in parallel, or on the current thread
+                // if we have more than 1 request
+                return Observable.just( listObservable ).map( scopedRowKeys -> {
 
 
-                                //here, we execute our query then emit
-                                // the items either in parallel, or on
-                                // the current thread if we have more
-                                // than 1 request
-                                return listObservable
-                                        .map( new Func1<List<ScopedRowKey<CollectionPrefixedKey<Id>>>,
-                                                Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>>() {
+                    try {
+                        return keyspace.prepareQuery( CF_ENTITY_DATA ).getKeySlice( rowKeys )
+                                       .withColumnSlice( COL_VALUE ).execute().getResult();
+                    }
+                    catch ( ConnectionException e ) {
+                        throw new CollectionRuntimeException( null, collectionScope,
+                            "An error occurred connecting to cassandra", e );
+                    }
+                } ).subscribeOn( scheduler );
+            }, 10 )
 
+            .reduce( new EntitySetImpl( entityIds.size() ), ( entitySet, rows ) -> {
+                final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>> latestEntityColumns =
+                    rows.iterator();
 
-                                            @Override
-                                            public Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean> call(
-                                                    final List<ScopedRowKey<CollectionPrefixedKey<Id>>> scopedRowKeys
-                                                                                                           ) {
+                while ( latestEntityColumns.hasNext() ) {
+                    final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean> row = latestEntityColumns.next();
 
-                                                try {
-                                                    return keyspace.prepareQuery( CF_ENTITY_DATA )
-                                                                   .getKeySlice( rowKeys )
-                                                                    .withColumnSlice( COL_VALUE )
-                                                                   .execute().getResult();
-                                                }
-                                                catch ( ConnectionException e ) {
-                                                    throw new CollectionRuntimeException( null, collectionScope,
-                                                            "An error occurred connecting to cassandra", e );
-                                                }
-                                            }
-                                        } );
-                            }
-                        }, scheduler )
+                    final ColumnList<Boolean> columns = row.getColumns();
 
-                        //reduce all the output into a single Entity set
-                .reduce( new EntitySetImpl( entityIds.size() ),
-                        new Func2<EntitySetImpl, Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>, EntitySetImpl>() {
-                            @Override
-                            public EntitySetImpl call( final EntitySetImpl entitySet,
-                                                       final Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean> rows
-                                                     ) {
+                    if ( columns.size() == 0 ) {
+                        continue;
+                    }
 
-                                final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>> latestEntityColumns =
-                                        rows.iterator();
+                    final Id entityId = row.getKey().getKey().getSubKey();
 
-                                while ( latestEntityColumns.hasNext() ) {
-                                    final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean> row =
-                                            latestEntityColumns.next();
+                    final Column<Boolean> column = columns.getColumnByIndex( 0 );
 
-                                    final ColumnList<Boolean> columns = row.getColumns();
+                    final MvccEntity parsedEntity =
+                        new MvccColumnParser( entityId, entitySerializer ).parseColumn( column );
 
-                                    if ( columns.size() == 0 ) {
-                                        continue;
-                                    }
 
-                                    final Id entityId = row.getKey().getKey().getSubKey();
+                    entitySet.addEntity( parsedEntity );
+                }
 
-                                    final Column<Boolean> column = columns.getColumnByIndex( 0 );
 
-                                    final MvccEntity parsedEntity =
-                                            new MvccColumnParser( entityId, entitySerializer ).parseColumn( column );
+                return entitySet;
+            } ).toBlocking().last();
 
 
-                                    entitySet.addEntity( parsedEntity );
-                                }
-
-
-                                return entitySet;
-                            }
-                        } ).toBlocking().last();
 
         return entitySetResults;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index f87b5fd..6982857 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -124,130 +124,107 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
 
 
         final Observable<List<EntityToSaveMessage>> migrated =
-            migrationDataProvider.getData().subscribeOn( Schedulers.io() ).parallel(
-                new Func1<Observable<EntityIdScope>, Observable<List<EntityToSaveMessage>>>() {
+            migrationDataProvider.getData().subscribeOn( Schedulers.io() ).flatMap( entityToSaveList -> Observable.just( entityToSaveList ).flatMap( entityIdScope -> {
 
+                //load the entity
+                final CollectionScope currentScope = entityIdScope.getCollectionScope();
 
-                    //process the ids in parallel
-                    @Override
-                    public Observable<List<EntityToSaveMessage>> call(
-                        final Observable<EntityIdScope> entityIdScopeObservable ) {
-
-
-                        return entityIdScopeObservable.flatMap(
-                            new Func1<EntityIdScope, Observable<EntityToSaveMessage>>() {
-
-
-                                @Override
-                                public Observable<EntityToSaveMessage> call( final EntityIdScope entityIdScope ) {
 
-                                    //load the entity
-                                    final CollectionScope currentScope = entityIdScope.getCollectionScope();
+                //for each element in our
+                // history, we need to copy it
+                // to v2.
+                // Note that
+                // this migration
+                //won't support anything beyond V2
 
+                final Iterator<MvccEntity> allVersions =
+                    migration.from.loadAscendingHistory( currentScope, entityIdScope.getId(), startTime, 100 );
 
-                                    //for each element in our
-                                    // history, we need to copy it
-                                    // to v2.
-                                    // Note that
-                                    // this migration
-                                    //won't support anything beyond V2
-
-                                    final Iterator<MvccEntity> allVersions = migration.from
-                                        .loadAscendingHistory( currentScope, entityIdScope.getId(), startTime, 100 );
+                //emit all the entity versions
+                return Observable.create( new Observable.OnSubscribe<EntityToSaveMessage>() {
+                    @Override
+                    public void call( final Subscriber<? super
+                        EntityToSaveMessage> subscriber ) {
 
-                                    //emit all the entity versions
-                                    return Observable.create( new Observable.OnSubscribe<EntityToSaveMessage>() {
-                                            @Override
-                                            public void call( final Subscriber<? super
-                                                EntityToSaveMessage> subscriber ) {
+                        while ( allVersions.hasNext() ) {
+                            final EntityToSaveMessage message =
+                                new EntityToSaveMessage( currentScope, allVersions.next() );
+                            subscriber.onNext( message );
+                        }
 
-                                                while ( allVersions.hasNext() ) {
-                                                    final EntityToSaveMessage message =  new EntityToSaveMessage( currentScope, allVersions.next() );
-                                                    subscriber.onNext( message );
-                                                }
+                        subscriber.onCompleted();
+                    }
+                } ).buffer( 100 ).doOnNext( entities -> {
 
-                                                subscriber.onCompleted();
-                                            }
-                                        } );
-                                }
-                            } )
-                            //buffer 10 versions
-                            .buffer( 100 ).doOnNext( new Action1<List<EntityToSaveMessage>>() {
-                                @Override
-                                public void call( final List<EntityToSaveMessage> entities ) {
+                        final MutationBatch totalBatch = keyspace.prepareMutationBatch();
 
-                                    final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+                        atomicLong.addAndGet( entities.size() );
 
-                                    atomicLong.addAndGet( entities.size() );
+                        List<EntityVersionCleanupTask> entityVersionCleanupTasks = new ArrayList( entities.size() );
 
-                                    List<EntityVersionCleanupTask> entityVersionCleanupTasks = new ArrayList(entities.size());
+                        for ( EntityToSaveMessage message : entities ) {
+                            final MutationBatch entityRewrite = migration.to.write( message.scope, message.entity );
 
-                                    for ( EntityToSaveMessage message : entities ) {
-                                        final MutationBatch entityRewrite =
-                                            migration.to.write( message.scope, message.entity );
+                            //add to
+                            // the
+                            // total
+                            // batch
+                            totalBatch.mergeShallow( entityRewrite );
 
-                                        //add to
-                                        // the
-                                        // total
-                                        // batch
-                                        totalBatch.mergeShallow( entityRewrite );
+                            //write
+                            // the
+                            // unique values
 
-                                        //write
-                                        // the
-                                        // unique values
+                            if ( !message.entity.getEntity().isPresent() ) {
+                                return;
+                            }
 
-                                        if ( !message.entity.getEntity().isPresent() ) {
-                                            return;
-                                        }
+                            final Entity entity = message.entity.getEntity().get();
 
-                                        final Entity entity = message.entity.getEntity().get();
+                            final Id entityId = entity.getId();
 
-                                        final Id entityId = entity.getId();
+                            final UUID version = message.entity.getVersion();
 
-                                        final UUID version = message.entity.getVersion();
+                            // re-write the unique
+                            // values
+                            // but this
+                            // time with
+                            // no TTL so that cleanup can clean up
+                            // older values
+                            for ( Field field : EntityUtils.getUniqueFields( message.entity.getEntity().get() ) ) {
 
-                                        // re-write the unique
-                                        // values
-                                        // but this
-                                        // time with
-                                        // no TTL so that cleanup can clean up
-                                        // older values
-                                        for ( Field field : EntityUtils
-                                            .getUniqueFields( message.entity.getEntity().get() ) ) {
+                                UniqueValue written = new UniqueValueImpl( field, entityId, version );
 
-                                            UniqueValue written = new UniqueValueImpl( field, entityId, version );
+                                MutationBatch mb = uniqueValueSerializationStrategy.write( message.scope, written );
 
-                                            MutationBatch mb =
-                                                uniqueValueSerializationStrategy.write( message.scope, written );
 
+                                // merge into our
+                                // existing mutation
+                                // batch
+                                totalBatch.mergeShallow( mb );
+                            }
 
-                                            // merge into our
-                                            // existing mutation
-                                            // batch
-                                            totalBatch.mergeShallow( mb );
-                                        }
+                            final EntityVersionCleanupTask task = entityVersionCleanupFactory
+                                .getCleanupTask( message.scope, message.entity.getId(), version, false );
 
-                                        final EntityVersionCleanupTask task = entityVersionCleanupFactory.getCleanupTask( message.scope, message.entity.getId(), version, false );
+                            entityVersionCleanupTasks.add( task );
+                        }
 
-                                        entityVersionCleanupTasks.add( task );
-                                    }
+                        executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong );
 
-                                    executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong );
+                        //now run our cleanup task
 
-                                    //now run our cleanup task
+                        for ( EntityVersionCleanupTask entityVersionCleanupTask : entityVersionCleanupTasks ) {
+                            try {
+                                entityVersionCleanupTask.call();
+                            }
+                            catch ( Exception e ) {
+                                LOGGER.error( "Unable to run cleanup task", e );
+                            }
+                        }
+                    } ).subscribeOn( Schedulers.io() );
 
-                                    for(EntityVersionCleanupTask entityVersionCleanupTask: entityVersionCleanupTasks){
-                                        try {
-                                            entityVersionCleanupTask.call();
-                                        }
-                                        catch ( Exception e ) {
-                                            LOGGER.error( "Unable to run cleanup task", e );
-                                        }
-                                    }
-                                }
-                            } );
-                    }
-                } );
+            }, 10) );
 
         migrated.toBlocking().lastOrDefault(null);
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
index 2d416a4..a49e533 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
@@ -64,7 +64,7 @@ public class ParallelTest {
         final int expected = size - 1;
 
 
-        // QUESTION Using this thread blocks indefinitely.  The execution of the Hystrix command 
+        // QUESTION Using this thread blocks indefinitely.  The execution of the Hystrix command
          // happens on the computation Thread if this is used
 
         //        final Scheduler scheduler = Schedulers.threadPoolForComputation();
@@ -90,7 +90,7 @@ public class ParallelTest {
          *  non blocking?
          */
 
-        final Observable<String> observable = Observable.from( input ).observeOn( Schedulers.io() );
+        final Observable<String> observable = Observable.just( input ).observeOn( Schedulers.io() );
 
 
         Observable<Integer> thing = observable.flatMap( new Func1<String, Observable<Integer>>() {
@@ -99,7 +99,7 @@ public class ParallelTest {
             public Observable<Integer> call( final String s ) {
                 List<Observable<Integer>> functions = new ArrayList<Observable<Integer>>();
 
-                logger.info( "Creating new set of observables in thread {}", 
+                logger.info( "Creating new set of observables in thread {}",
                         Thread.currentThread().getName() );
 
                 for ( int i = 0; i < size; i++ ) {
@@ -107,13 +107,13 @@ public class ParallelTest {
 
                     final int index = i;
 
-                    // create a new observable and execute the function on it.  
+                    // create a new observable and execute the function on it.
                     // These should happen in parallel when a subscription occurs
 
                     /**
                      * QUESTION: Should this again be the process thread, not the I/O
                      */
-                    Observable<String> newObservable = Observable.from( input ).subscribeOn( Schedulers.io() );
+                    Observable<String> newObservable = Observable.just( input ).subscribeOn( Schedulers.io() );
 
                     Observable<Integer> transformed = newObservable.map( new Func1<String, Integer>() {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
index 747ea7b..9938caf 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
@@ -119,7 +119,7 @@ public abstract class AbstractMvccEntityDataMigrationV1ToV3ImplTest implements D
         assertEquals( "Same entity", entity2, returned2 );
 
         final Observable<EntityIdScope> entityIdScope =
-            Observable.from( new EntityIdScope( scope, entity1.getId() ), new EntityIdScope( scope, entity2.getId() ) );
+            Observable.just( new EntityIdScope( scope, entity1.getId() ), new EntityIdScope( scope, entity2.getId() ) );
 
 
         final MigrationDataProvider<EntityIdScope> migrationProvider = new MigrationDataProvider<EntityIdScope>() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/common/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/pom.xml b/stack/corepersistence/common/pom.xml
index 82df1d8..2be1c1a 100644
--- a/stack/corepersistence/common/pom.xml
+++ b/stack/corepersistence/common/pom.xml
@@ -101,15 +101,16 @@
 
     <!-- RX java -->
 
+      <dependency>
+          <groupId>io.reactivex</groupId>
+          <artifactId>rxjava</artifactId>
+          <version>${rx.version}</version>
+      </dependency>
+
     <dependency>
-      <groupId>com.netflix.rxjava</groupId>
-      <artifactId>rxjava-core</artifactId>
-      <version>${rx.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.netflix.rxjava</groupId>
+      <groupId>io.reactivex</groupId>
       <artifactId>rxjava-math</artifactId>
-      <version>${rx.version}</version>
+      <version>1.0.0</version>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
index 15f9aab..23661ee 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
@@ -77,7 +77,9 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
 
         for ( ColumnNameIterator<C, T> columnNameIterator : columnNameIterators ) {
 
-            observables[i] = Observable.from( columnNameIterator, Schedulers.io() );
+
+
+            observables[i] = Observable.from( columnNameIterator ).subscribeOn( Schedulers.io() );
 
             i++;
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
index f4f6f9c..3c56763 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
@@ -52,6 +52,7 @@ import com.netflix.astyanax.util.RangeBuilder;
 import rx.Observable;
 import rx.functions.Action1;
 import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 import static org.junit.Assert.assertEquals;
 
@@ -125,100 +126,95 @@ public class MultiKeyColumnNameIteratorTest {
 
         final long maxValue = 10000;
 
+
         /**
          * Write to both rows in parallel
          */
         Observable.from( new String[] { rowKey1, rowKey2, rowKey3 } )
-                  .parallel( new Func1<Observable<String>, Observable<String>>() {
-                      @Override
-                      public Observable<String> call( final Observable<String> stringObservable ) {
-                          return stringObservable.doOnNext( new Action1<String>() {
-                              @Override
-                              public void call( final String key ) {
-
-                                  final MutationBatch batch = keyspace.prepareMutationBatch();
-
-                                  for ( long i = 0; i < maxValue; i++ ) {
-                                      batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
-
-                                      if ( i % 1000 == 0 ) {
-                                          try {
-                                              batch.execute();
-                                          }
-                                          catch ( ConnectionException e ) {
-                                              throw new RuntimeException( e );
-                                          }
-                                      }
-                                  }
-
-                                  try {
-                                      batch.execute();
-                                  }
-                                  catch ( ConnectionException e ) {
-                                      throw new RuntimeException( e );
-                                  }
+            //perform a flatmap
+                  .flatMap( stringObservable -> Observable.just( stringObservable ).doOnNext( key -> {
+                      final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                      for ( long i = 0; i < maxValue; i++ ) {
+                          batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
+
+                          if ( i % 1000 == 0 ) {
+                              try {
+                                  batch.execute();
+                              }
+                              catch ( ConnectionException e ) {
+                                  throw new RuntimeException( e );
                               }
-                          } );
+                          }
                       }
-                  } ).toBlocking().last();
 
+                      try {
+                          batch.execute();
+                      }
+                      catch ( ConnectionException e ) {
+                          throw new RuntimeException( e );
+                      }
+                  } ).subscribeOn( Schedulers.io() ) ).toBlocking().last();
 
-        //create 3 iterators
 
-        ColumnNameIterator<Long, Long> row1Iterator = createIterator( rowKey1, false );
-        ColumnNameIterator<Long, Long> row2Iterator = createIterator( rowKey2, false );
-        ColumnNameIterator<Long, Long> row3Iterator = createIterator( rowKey3, false );
 
-        final Comparator<Long> ascendingComparator = new Comparator<Long>() {
 
-            @Override
-            public int compare( final Long o1, final Long o2 ) {
-                return Long.compare( o1, o2 );
-            }
-        };
+            //create 3 iterators
 
-        /**
-         * Again, arbitrary buffer size to attempt we buffer at some point
-         */
-        final MultiKeyColumnNameIterator<Long, Long> ascendingItr =
+            ColumnNameIterator<Long, Long> row1Iterator = createIterator( rowKey1, false );
+            ColumnNameIterator<Long, Long> row2Iterator = createIterator( rowKey2, false );
+            ColumnNameIterator<Long, Long> row3Iterator = createIterator( rowKey3, false );
+
+            final Comparator<Long> ascendingComparator = new Comparator<Long>() {
+
+                @Override
+                public int compare( final Long o1, final Long o2 ) {
+                    return Long.compare( o1, o2 );
+                }
+            };
+
+            /**
+             * Again, arbitrary buffer size to attempt we buffer at some point
+             */
+            final MultiKeyColumnNameIterator<Long, Long> ascendingItr =
                 new MultiKeyColumnNameIterator<>( Arrays.asList( row1Iterator, row2Iterator, row3Iterator ),
-                        ascendingComparator, 900 );
+                    ascendingComparator, 900 );
 
 
-        //ensure we have to make several trips, purposefully set to a nonsensical value to ensure we make all the
-        // trips required
+            //ensure we have to make several trips, purposefully set to a nonsensical value to ensure we make all the
+            // trips required
 
 
-        for ( long i = 0; i < maxValue; i++ ) {
-            assertEquals( i, ascendingItr.next().longValue() );
-        }
+            for ( long i = 0; i < maxValue; i++ ) {
+                assertEquals( i, ascendingItr.next().longValue() );
+            }
 
-        //now test it in reverse
+            //now test it in reverse
 
-        ColumnNameIterator<Long, Long> row1IteratorDesc = createIterator( rowKey1, true );
-        ColumnNameIterator<Long, Long> row2IteratorDesc = createIterator( rowKey2, true );
-        ColumnNameIterator<Long, Long> row3IteratorDesc = createIterator( rowKey3, true );
+            ColumnNameIterator<Long, Long> row1IteratorDesc = createIterator( rowKey1, true );
+            ColumnNameIterator<Long, Long> row2IteratorDesc = createIterator( rowKey2, true );
+            ColumnNameIterator<Long, Long> row3IteratorDesc = createIterator( rowKey3, true );
 
-        final Comparator<Long> descendingComparator = new Comparator<Long>() {
+            final Comparator<Long> descendingComparator = new Comparator<Long>() {
 
-            @Override
-            public int compare( final Long o1, final Long o2 ) {
-                return ascendingComparator.compare( o1, o2 ) * -1;
-            }
-        };
+                @Override
+                public int compare( final Long o1, final Long o2 ) {
+                    return ascendingComparator.compare( o1, o2 ) * -1;
+                }
+            };
 
-        /**
-         * Again, arbitrary buffer size to attempt we buffer at some point
-         */
-        final MultiKeyColumnNameIterator<Long, Long> descendingItr =
+            /**
+             * Again, arbitrary buffer size to attempt we buffer at some point
+             */
+            final MultiKeyColumnNameIterator<Long, Long> descendingItr =
                 new MultiKeyColumnNameIterator<>( Arrays.asList( row1IteratorDesc, row2IteratorDesc, row3IteratorDesc ),
-                        descendingComparator, 900 );
+                    descendingComparator, 900 );
 
 
-        for ( long i = maxValue - 1; i > -1; i-- ) {
-            assertEquals( i, descendingItr.next().longValue() );
+            for ( long i = maxValue - 1; i > -1; i-- ) {
+                assertEquals( i, descendingItr.next().longValue() );
+            }
         }
-    }
 
 
     @Test
@@ -233,39 +229,28 @@ public class MultiKeyColumnNameIteratorTest {
            /**
             * Write to both rows in parallel
             */
-           Observable.just( rowKey1  )
-                     .parallel( new Func1<Observable<String>, Observable<String>>() {
-                         @Override
-                         public Observable<String> call( final Observable<String> stringObservable ) {
-                             return stringObservable.doOnNext( new Action1<String>() {
-                                 @Override
-                                 public void call( final String key ) {
-
-                                     final MutationBatch batch = keyspace.prepareMutationBatch();
-
-                                     for ( long i = 0; i < maxValue; i++ ) {
-                                         batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
-
-                                         if ( i % 1000 == 0 ) {
-                                             try {
-                                                 batch.execute();
-                                             }
-                                             catch ( ConnectionException e ) {
-                                                 throw new RuntimeException( e );
-                                             }
-                                         }
-                                     }
-
-                                     try {
-                                         batch.execute();
-                                     }
-                                     catch ( ConnectionException e ) {
-                                         throw new RuntimeException( e );
-                                     }
-                                 }
-                             } );
-                         }
-                     } ).toBlocking().last();
+           Observable.just( rowKey1  ).flatMap( rowKey -> Observable.just( rowKey ).doOnNext( key -> {
+               final MutationBatch batch = keyspace.prepareMutationBatch();
+
+               for ( long i = 0; i < maxValue; i++ ) {
+                   batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
+
+                   if ( i % 1000 == 0 ) {
+                       try {
+                           batch.execute();
+                       }
+                       catch ( ConnectionException e ) {
+                           throw new RuntimeException( e );
+                       }
+                   }
+               }
+
+               try {
+                   batch.execute();
+               }
+               catch ( ConnectionException e ) {
+                   throw new RuntimeException( e );
+               }} ).subscribeOn( Schedulers.io() ) ).toBlocking().last();
 
 
            //create 3 iterators

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
index c32b820..d88ebe5 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
@@ -54,6 +54,7 @@ import rx.Observable;
 import rx.Observer;
 import rx.functions.Action1;
 import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -373,38 +374,29 @@ public class MultiRowColumnIteratorTest {
         /**
          * Write to both rows in parallel
          */
-        Observable.just( rowKey1 ).parallel( new Func1<Observable<String>, Observable<String>>() {
-            @Override
-            public Observable<String> call( final Observable<String> stringObservable ) {
-                return stringObservable.doOnNext( new Action1<String>() {
-                    @Override
-                    public void call( final String key ) {
-
-                        final MutationBatch batch = keyspace.prepareMutationBatch();
-
-                        for ( long i = 0; i < maxValue; i++ ) {
-                            batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
-
-                            if ( i % 1000 == 0 ) {
-                                try {
-                                    batch.execute();
-                                }
-                                catch ( ConnectionException e ) {
-                                    throw new RuntimeException( e );
-                                }
-                            }
-                        }
+        Observable.just( rowKey1 ).flatMap( rowKey -> Observable.just( rowKey ).doOnNext( key -> {
+            final MutationBatch batch = keyspace.prepareMutationBatch();
 
-                        try {
-                            batch.execute();
-                        }
-                        catch ( ConnectionException e ) {
-                            throw new RuntimeException( e );
-                        }
+            for ( long i = 0; i < maxValue; i++ ) {
+                batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
+
+                if ( i % 1000 == 0 ) {
+                    try {
+                        batch.execute();
+                    }
+                    catch ( ConnectionException e ) {
+                        throw new RuntimeException( e );
                     }
-                } );
+                }
+            }
+
+            try {
+                batch.execute();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( e );
             }
-        } ).toBlocking().last();
+        } ).subscribeOn( Schedulers.io() ) ).toBlocking().last();
 
 
         //create 3 iterators


[02/12] incubator-usergrid git commit: Merge branch 'USERGRID-405' into USERGRID-486

Posted by sf...@apache.org.
Merge branch 'USERGRID-405' into USERGRID-486

Conflicts:
	stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/12c2a1a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/12c2a1a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/12c2a1a0

Branch: refs/heads/USERGRID-480
Commit: 12c2a1a063da04f674eb654f5116a898a5ddd43d
Parents: 9a78723 8ea46ba
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Mar 19 10:26:42 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Mar 19 10:26:42 2015 -0600

----------------------------------------------------------------------
 .../main/dist/init_instance/init_rest_server.sh |   6 +-
 stack/core/pom.xml                              |  10 +
 .../AllApplicationsObservable.java              |  39 ++
 .../usergrid/corepersistence/CoreModule.java    |  88 ++-
 .../corepersistence/CpEntityManagerFactory.java | 109 ++-
 .../corepersistence/CpManagerCache.java         |  55 +-
 .../corepersistence/CpRelationManager.java      |   7 +-
 .../usergrid/corepersistence/ManagerCache.java  |  12 +-
 .../events/EntityVersionDeletedHandler.java     |  16 +-
 .../migration/CoreDataVersions.java             |  43 ++
 .../migration/CoreMigration.java                |  53 ++
 .../migration/CoreMigrationPlugin.java          |  64 ++
 .../migration/EntityDataMigration.java          | 147 ----
 .../migration/EntityTypeMappingMigration.java   |  78 ++-
 .../migration/GraphShardVersionMigration.java   | 152 -----
 .../migration/MigrationModuleVersionPlugin.java | 133 ++++
 .../migration/MigrationSystemVersions.java      |  43 ++
 .../corepersistence/migration/Versions.java     |  14 -
 .../results/FilteringLoader.java                |  50 +-
 .../rx/AllEntitiesInSystemObservable.java       | 101 ---
 .../rx/ApplicationObservable.java               | 128 ----
 .../rx/EdgesFromSourceObservable.java           |  63 --
 .../rx/EdgesToTargetObservable.java             |  63 --
 .../corepersistence/rx/TargetIdObservable.java  |  66 --
 .../rx/impl/AbstractGraphVisitorImpl.java       | 107 +++
 .../rx/impl/AllApplicationsObservableImpl.java  | 142 ++++
 .../rx/impl/AllEntitiesInSystemImpl.java        |  62 ++
 .../rx/impl/AllNodesInGraphImpl.java            |  55 ++
 .../corepersistence/util/CpEntityMapUtils.java  |  19 +-
 .../corepersistence/util/CpNamingUtils.java     |   2 +
 .../persistence/EntityManagerFactory.java       |  20 -
 .../cassandra/EntityManagerFactoryImpl.java     |  23 -
 .../corepersistence/StaleIndexCleanupTest.java  |  48 ++
 .../migration/EntityDataMigrationIT.java        | 262 -------
 .../migration/EntityTypeMappingMigrationIT.java | 183 ++---
 .../migration/GraphShardVersionMigrationIT.java | 226 ------
 .../MigrationModuleVersionPluginTest.java       | 259 +++++++
 .../migration/MigrationTestRule.java            |  99 ---
 .../migration/TestProgressObserver.java         |  71 --
 .../rx/AllEntitiesInSystemObservableIT.java     |  41 +-
 .../rx/ApplicationObservableTestIT.java         |  14 +-
 .../rx/EdgesFromSourceObservableIT.java         |  10 +-
 .../rx/EdgesToTargetObservableIT.java           |   8 +-
 .../rx/TargetIdObservableTestIT.java            |   7 +-
 .../org/apache/usergrid/persistence/GeoIT.java  |   5 +-
 .../usergrid/persistence/LargeEntityIT.java     |   3 +-
 .../collection/EntityCollectionManager.java     |   6 -
 .../EntityCollectionManagerFactory.java         |  27 +-
 .../collection/EntityCollectionManagerSync.java |  49 --
 .../persistence/collection/MvccEntity.java      |   5 +-
 .../persistence/collection/MvccLogEntry.java    |   1 +
 .../cache/CachedEntityCollectionManager.java    |   7 -
 .../collection/event/EntityVersionDeleted.java  |   3 +-
 .../collection/guice/CollectionModule.java      |  45 +-
 .../persistence/collection/guice/Write.java     |  17 -
 .../collection/guice/WriteUpdate.java           |  17 -
 .../EntityCollectionManagerFactoryImpl.java     |  71 +-
 .../impl/EntityCollectionManagerImpl.java       | 119 +---
 .../impl/EntityCollectionManagerSyncImpl.java   |  71 --
 .../collection/impl/EntityDeletedTask.java      |   8 +-
 .../impl/EntityVersionCleanupTask.java          | 191 +++---
 .../mvcc/MvccEntitySerializationStrategy.java   | 102 ---
 .../mvcc/stage/delete/MarkCommit.java           |   2 +-
 .../mvcc/stage/write/WriteCommit.java           |   9 +-
 .../collection/mvcc/stage/write/WriteStart.java |   6 +-
 .../collection/serialization/EntityRepair.java  |  38 --
 .../MvccEntitySerializationStrategy.java        | 119 ++++
 .../serialization/OptimisticUpdate.java         |  23 -
 .../UniqueValueSerializationStrategy.java       |  46 +-
 .../serialization/UniqueValueSet.java           |   4 +
 .../impl/CollectionDataVersions.java            |  44 ++
 .../serialization/impl/EntityRepairImpl.java    | 149 ----
 .../impl/EntityVersionSerializer.java           |   2 -
 .../serialization/impl/FieldSerializer.java     | 118 ----
 .../MvccEntitySerializationStrategyImpl.java    |  24 +-
 ...vccEntitySerializationStrategyProxyImpl.java | 151 ++--
 .../MvccEntitySerializationStrategyV1Impl.java  |   9 +-
 .../MvccEntitySerializationStrategyV2Impl.java  |   8 +-
 .../MvccEntitySerializationStrategyV3Impl.java  | 615 +++++++++++++++++
 .../serialization/impl/SerializationModule.java |  85 ++-
 .../serialization/impl/UniqueFieldEntry.java    |  61 ++
 .../impl/UniqueFieldEntrySerializer.java        | 141 ++++
 .../impl/UniqueFieldRowKeySerializer.java       | 122 ++++
 .../UniqueValueSerializationStrategyImpl.java   | 254 +++++--
 .../impl/migration/CollectionMigration.java     |  53 ++
 .../migration/CollectionMigrationPlugin.java    |  68 ++
 .../impl/migration/EntityIdScope.java           |  49 ++
 .../migration/MvccEntityDataMigrationImpl.java  | 283 ++++++++
 .../collection/util/EntityUtils.java            |  19 +-
 .../collection/EntityCollectionManagerIT.java   | 104 +--
 .../EntityCollectionManagerSyncIT.java          | 193 ------
 .../collection/guice/TestCollectionModule.java  |  22 +-
 .../impl/EntityVersionCleanupTaskTest.java      | 680 ++++++++-----------
 .../impl/EntityVersionCreatedTaskTest.java      |   2 -
 .../mvcc/stage/delete/MarkCommitTest.java       |   2 +-
 .../mvcc/stage/write/FieldSerializerTest.java   |  52 --
 .../write/UniqueFieldRowKeySerializerTest.java  |  52 ++
 ...niqueValueSerializationStrategyImplTest.java | 149 +++-
 .../mvcc/stage/write/WriteCommitTest.java       |   2 +-
 .../mvcc/stage/write/WriteStartTest.java        |   6 +-
 .../serialization/EntityRepairImplTest.java     | 147 ----
 .../impl/LogEntryIteratorTest.java              |   3 +-
 ...MvccEntitySerializationStrategyImplTest.java | 117 +---
 ...cEntitySerializationStrategyProxyV1Test.java |  85 ---
 ...ntitySerializationStrategyProxyV1_3Test.java |  83 +++
 ...cEntitySerializationStrategyProxyV2Test.java |  83 ---
 ...ntitySerializationStrategyProxyV2_3Test.java |  84 +++
 ...ccEntitySerializationStrategyV1ImplTest.java |  80 ++-
 ...ccEntitySerializationStrategyV2ImplTest.java |  75 +-
 .../MvccEntitySerializationStrategyV2Test.java  |   7 +-
 ...ccEntitySerializationStrategyV3ImplTest.java |  74 ++
 .../impl/UniqueFieldEntrySerializerTest.java    | 108 +++
 ...ctMvccEntityDataMigrationV1ToV3ImplTest.java | 215 ++++++
 .../MvccEntityDataMigrationV1ToV3ImplTest.java  | 105 +++
 .../MvccEntityDataMigrationV2ToV3ImplTest.java  |  99 +++
 .../collection/util/LogEntryMock.java           |  39 +-
 .../collection/util/UniqueValueEntryMock.java   | 161 +++++
 .../collection/util/VersionGenerator.java       |  55 ++
 .../usergrid/persistence/core/CPManager.java    |  28 +
 .../astyanax/DynamicCompositeParserImpl.java    |  81 +++
 .../core/astyanax/FieldBufferBuilder.java       |  15 +
 .../core/astyanax/FieldBufferParser.java        |  13 +
 .../core/astyanax/MultiRowColumnIterator.java   |  12 +-
 .../persistence/core/guice/CommonModule.java    |  17 +-
 .../persistence/core/guice/CurrentImpl.java     |  42 --
 .../persistence/core/guice/PreviousImpl.java    |  42 --
 .../core/hystrix/HystrixCassandra.java          |  94 ---
 .../migration/data/AbstractMigrationPlugin.java | 128 ++++
 .../core/migration/data/DataMigration.java      |  97 +--
 .../migration/data/DataMigrationManager.java    |  18 +-
 .../data/DataMigrationManagerImpl.java          | 234 ++++---
 .../migration/data/MigrationDataProvider.java   |  43 ++
 .../core/migration/data/MigrationInfoCache.java |  52 ++
 .../migration/data/MigrationInfoCacheImpl.java  |  84 +++
 .../data/MigrationInfoSerialization.java        |  28 +-
 .../data/MigrationInfoSerializationImpl.java    |  67 +-
 .../core/migration/data/MigrationPlugin.java    |  56 ++
 .../migration/data/MigrationRelationship.java   | 100 +++
 .../core/migration/data/PluginPhase.java        |  42 ++
 .../core/migration/data/ProgressObserver.java   |  63 ++
 .../core/migration/data/VersionedData.java      |  38 ++
 .../migration/data/VersionedMigrationSet.java   | 153 +++++
 .../core/scope/ApplicationScope.java            |   1 +
 .../persistence/core/task/TaskExecutor.java     |   4 +-
 .../core/guice/DataMigrationResetRule.java      |  88 +++
 .../core/guice/MaxMigrationModule.java          |  39 --
 .../core/guice/MaxMigrationVersion.java         |  40 --
 .../core/guice/MigrationManagerRule.java        |   7 +
 .../core/guice/TestCommonModule.java            |   1 +
 .../persistence/core/guice/TestModule.java      |   5 +-
 .../data/DataMigrationManagerImplTest.java      | 350 ++++++----
 .../data/MigrationInfoSerializationTest.java    |  32 +-
 .../data/TestMigrationDataProvider.java         |  61 ++
 .../migration/data/TestProgressObserver.java    |  89 +++
 .../data/VersionedMigrationSetTest.java         | 198 ++++++
 .../persistence/core/util/IdGenerator.java      |  51 ++
 .../persistence/graph/GraphManager.java         |   3 +-
 .../persistence/graph/GraphManagerFactory.java  |   2 +
 .../persistence/graph/guice/GraphModule.java    | 103 ++-
 .../graph/impl/GraphManagerImpl.java            |  32 +-
 .../graph/impl/stage/EdgeDeleteRepairImpl.java  |   9 +-
 .../graph/impl/stage/EdgeMetaRepairImpl.java    |  16 +-
 .../impl/stage/NodeDeleteListenerImpl.java      |  16 +-
 .../EdgeMetadataSerialization.java              |   3 +-
 .../graph/serialization/EdgesObservable.java    |  33 +
 .../graph/serialization/TargetIdObservable.java |  38 ++
 .../EdgeMetadataSerializationProxyImpl.java     | 161 +++--
 .../impl/EdgeMetadataSerializationV1Impl.java   |   6 +
 .../impl/EdgeMetadataSerializationV2Impl.java   |   6 +
 .../serialization/impl/EdgesObservableImpl.java |  80 +++
 .../serialization/impl/GraphDataVersions.java   |  43 ++
 .../impl/GraphManagerFactoryImpl.java           |  95 +++
 .../impl/NodeSerializationImpl.java             |  36 +-
 .../impl/TargetIdObservableImpl.java            |  72 ++
 .../impl/migration/EdgeDataMigrationImpl.java   | 149 ++++
 .../impl/migration/GraphMigration.java          |  53 ++
 .../impl/migration/GraphMigrationPlugin.java    |  69 ++
 .../serialization/impl/migration/GraphNode.java |  39 ++
 .../shard/count/NodeShardApproximationImpl.java |   4 +-
 .../NodeShardCounterSerializationImpl.java      |  25 +-
 .../shard/impl/NodeShardAllocationImpl.java     |  16 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |  64 +-
 .../persistence/graph/GraphManagerIT.java       |  21 +-
 .../persistence/graph/GraphManagerLoadTest.java |  13 +-
 .../graph/GraphManagerShardConsistencyIT.java   |  10 +-
 .../graph/GraphManagerShardingIT.java           |  13 +-
 .../graph/GraphManagerStressTest.java           |  11 +-
 .../graph/guice/TestGraphModule.java            |  27 +-
 .../graph/impl/EdgeDeleteListenerTest.java      |  11 +-
 .../graph/impl/NodeDeleteListenerTest.java      |   9 +-
 .../graph/impl/stage/EdgeDeleteRepairTest.java  |   7 +-
 .../graph/impl/stage/EdgeMetaRepairTest.java    |  29 +-
 .../EdgeMetaDataSerializationProxyV1Test.java   |  14 +-
 .../EdgeMetaDataSerializationProxyV2Test.java   |  21 +-
 .../EdgeMetaDataSerializationV1Test.java        |   7 +-
 .../EdgeMetaDataSerializationV2Test.java        |   7 +-
 .../EdgeMetadataSerializationTest.java          |  39 +-
 .../EdgeSerializationChopTest.java              |   7 +-
 .../serialization/EdgeSerializationTest.java    |  21 +-
 .../serialization/NodeSerializationTest.java    |  15 +-
 .../migration/EdgeDataMigrationImplTest.java    | 177 +++++
 .../impl/shard/EdgeShardSerializationTest.java  |   7 +-
 .../impl/shard/NodeShardAllocationTest.java     |  27 +-
 .../impl/shard/NodeShardCacheTest.java          |   7 +-
 .../impl/shard/ShardGroupCompactionTest.java    |   7 +-
 .../shard/count/NodeShardApproximationTest.java |   9 +-
 .../NodeShardCounterSerializationTest.java      |   6 +-
 .../shard/impl/ShardEntryGroupIteratorTest.java |  15 +-
 ...rceDirectedEdgeDescendingComparatorTest.java |  23 +-
 ...getDirectedEdgeDescendingComparatorTest.java |  23 +-
 .../graph/test/util/EdgeTestUtils.java          |  29 +-
 .../usergrid/persistence/map/MapManager.java    |  25 +-
 .../persistence/map/guice/MapModule.java        |   8 +-
 .../map/impl/MapManagerFactoryImpl.java         |  29 +-
 .../persistence/map/guice/TestMapModule.java    |   3 +-
 .../persistence/model/entity/Entity.java        |  23 +-
 .../persistence/model/entity/EntityMap.java     |  66 ++
 .../model/entity/EntityToMapConverter.java      | 133 ++++
 .../model/entity/MapToEntityConverter.java      | 203 ++++++
 .../persistence/model/entity/SimpleId.java      |   1 -
 .../persistence/model/field/AbstractField.java  |   8 +
 .../persistence/model/field/ListField.java      |   2 +-
 .../persistence/model/field/StringField.java    |  20 +
 .../usergrid/persistence/index/EntityIndex.java |   4 +-
 .../index/impl/EsEntityIndexImpl.java           |  96 +--
 .../index/guice/TestIndexModule.java            |   6 +-
 .../index/impl/EntityIndexMapUtils.java         |  52 +-
 .../queue/guice/TestQueueModule.java            |   3 +-
 stack/pom.xml                                   |   2 +-
 .../org/apache/usergrid/rest/IndexResource.java |   3 +
 .../apache/usergrid/rest/MigrateResource.java   |  94 ++-
 .../applications/ApplicationsResource.java      |   6 -
 232 files changed, 9242 insertions(+), 5174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/12c2a1a0/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 79a021a,27fe705..23dfe06
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@@ -37,8 -32,6 +34,7 @@@ public class TestIndexModule extends Te
          install( new CommonModule());
  
          // configure collections and our core astyanax framework
-         install( new CollectionModule() );
 -        install( new IndexModule() );
 +        install( new IndexModule()  );
 +        install( new GuicyFigModule(IndexTestFig.class) );
      }
  }


[12/12] incubator-usergrid git commit: merge issues

Posted by sf...@apache.org.
merge issues


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/336c2228
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/336c2228
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/336c2228

Branch: refs/heads/USERGRID-480
Commit: 336c222851cd8f18aa7b0719ca307a95d103c598
Parents: 75d61b2
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Mar 20 10:51:31 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Mar 20 10:51:31 2015 -0600

----------------------------------------------------------------------
 .../persistence/index/impl/EsEntityIndexImpl.java       |  1 -
 .../persistence/index/impl/EntityIndexTest.java         |  7 +------
 .../persistence/index/impl/IndexLoadTestsIT.java        | 12 ++++++++----
 3 files changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/336c2228/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 18c3d67..2be3e51 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -63,7 +63,6 @@ import org.elasticsearch.rest.action.admin.indices.alias.delete.AliasesMissingEx
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 import java.io.IOException;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/336c2228/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index a21ccda..b562ccf 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.usergrid.persistence.index.*;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -43,12 +44,6 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.core.util.Health;
-import org.apache.usergrid.persistence.index.AliasedEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.guice.TestIndexModule;
 import org.apache.usergrid.persistence.index.query.CandidateResults;
 import org.apache.usergrid.persistence.index.query.Query;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/336c2228/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
index 82af950..623f9dd 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.index.impl;
 import java.util.List;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.junit.ClassRule;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -69,6 +70,9 @@ public class IndexLoadTestsIT extends BaseIT {
     @Inject
     public EntityIndexFactory entityIndexFactory;
 
+    @Inject
+    public EntityIndex index;
+
 
     @Test
     public void testHeavyLoad() {
@@ -78,19 +82,19 @@ public class IndexLoadTestsIT extends BaseIT {
         final Id applicationId = new SimpleId( applicationUUID, "application" );
         final ApplicationScope scope = new ApplicationScopeImpl( applicationId );
 
-        final EntityIndex index = entityIndexFactory.createEntityIndex( scope );
 
         //create our index if it doesn't exist
         index.initializeIndex();
 
-        final Observable<Entity> createEntities = createStreamFromWorkers( index, applicationId );
+        ApplicationEntityIndex applicationEntityIndex = entityIndexFactory.createApplicationEntityIndex(scope);
+        final Observable<Entity> createEntities = createStreamFromWorkers( applicationEntityIndex, applicationId );
 
         //run them all
         createEntities.toBlocking().last();
     }
 
 
-    public Observable<Entity> createStreamFromWorkers( final EntityIndex entityIndex, final Id ownerId ) {
+    public Observable<Entity> createStreamFromWorkers( final ApplicationEntityIndex entityIndex, final Id ownerId ) {
 
         //create a sequence of observables.  Each index will be it's own worker thread using the Schedulers.newthread()
         return Observable.range( 0, indexTestFig.getNumberOfWorkers() ).flatMap(
@@ -98,7 +102,7 @@ public class IndexLoadTestsIT extends BaseIT {
     }
 
 
-    private Observable<Entity> createWriteObservable( final EntityIndex entityIndex, final Id ownerId,
+    private Observable<Entity> createWriteObservable( final ApplicationEntityIndex entityIndex, final Id ownerId,
                                                       final int workerIndex ) {
 
 


[10/12] incubator-usergrid git commit: Merge branch 'USERGRID-451' into USERGRID-405

Posted by sf...@apache.org.
Merge branch 'USERGRID-451' into USERGRID-405

Upgrade to Java 8
Upgrades RX to latest release


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/dcf46937
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/dcf46937
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/dcf46937

Branch: refs/heads/USERGRID-480
Commit: dcf469378f5d68551cc452368bc6a5378926c7aa
Parents: 0a16033 6dc742e
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 20 07:41:23 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 20 07:41:23 2015 -0600

----------------------------------------------------------------------
 stack/core/pom.xml                              |  26 +-
 .../corepersistence/CpEntityManager.java        |   2 +-
 .../corepersistence/CpEntityManagerFactory.java |   7 +-
 .../corepersistence/CpRelationManager.java      |  24 +-
 .../usergrid/corepersistence/CpWalker.java      |  81 ++---
 .../events/EntityVersionDeletedHandler.java     |  72 ++--
 .../migration/EntityTypeMappingMigration.java   |  41 +--
 .../persistence/ObservableIterator.java         |   1 +
 .../migration/EntityTypeMappingMigrationIT.java |   2 +-
 .../PerformanceEntityRebuildIndexTest.java      |   4 +-
 stack/corepersistence/collection/pom.xml        |  14 +-
 .../impl/EntityCollectionManagerImpl.java       |  10 +-
 .../collection/impl/EntityDeletedTask.java      |  20 +-
 .../impl/EntityVersionCleanupTask.java          |  40 +--
 .../impl/EntityVersionCreatedTask.java          |  26 +-
 .../mvcc/stage/write/WriteCommit.java           |   2 +-
 .../mvcc/stage/write/WriteUniqueVerify.java     |   8 +-
 .../MvccEntitySerializationStrategyImpl.java    |  92 ++---
 .../MvccEntitySerializationStrategyV3Impl.java  |  94 ++---
 .../UniqueValueSerializationStrategyImpl.java   |   8 -
 .../migration/MvccEntityDataMigrationImpl.java  | 171 ++++------
 .../collection/util/EntityUtils.java            |  72 ----
 .../mvcc/stage/AbstractEntityStageTest.java     |   2 +-
 .../mvcc/stage/AbstractMvccEntityStageTest.java |   2 +-
 .../mvcc/stage/TestEntityGenerator.java         |   2 +-
 .../persistence/collection/rx/ParallelTest.java |  10 +-
 ...MvccEntitySerializationStrategyImplTest.java |   4 +-
 ...ccEntitySerializationStrategyV1ImplTest.java |   4 +-
 ...ccEntitySerializationStrategyV2ImplTest.java |   2 +-
 .../impl/SerializationComparison.java           |   4 +-
 ...ctMvccEntityDataMigrationV1ToV3ImplTest.java |   2 +-
 .../collection/util/InvalidEntityGenerator.java |   1 +
 stack/corepersistence/common/pom.xml            |  15 +-
 .../astyanax/MultiKeyColumnNameIterator.java    |   4 +-
 .../MultiKeyColumnNameIteratorTest.java         | 187 +++++-----
 .../astyanax/MultiRowColumnIteratorTest.java    |  50 ++-
 .../graph/impl/GraphManagerImpl.java            |   6 +-
 .../graph/impl/stage/EdgeMetaRepairImpl.java    |   2 +
 .../impl/stage/NodeDeleteListenerImpl.java      |   2 +-
 .../impl/migration/EdgeDataMigrationImpl.java   |  87 +++--
 .../persistence/graph/GraphManagerIT.java       |  17 +-
 .../graph/GraphManagerShardConsistencyIT.java   |   2 +-
 .../usergrid/persistence/graph/SimpleTest.java  |  12 +-
 .../migration/EdgeDataMigrationImplTest.java    |   2 +-
 stack/corepersistence/model/pom.xml             |   1 -
 .../persistence/model/util/EntityUtils.java     |  72 ++++
 stack/corepersistence/pom.xml                   |   8 +-
 stack/corepersistence/queryindex/pom.xml        |   6 -
 .../index/impl/EsEntityIndexImpl.java           |   2 +-
 .../persistence/index/query/EntityResults.java  | 108 ------
 .../persistence/index/query/Results.java        | 148 --------
 .../persistence/index/utils/ListUtils.java      |   6 +-
 .../persistence/index/guice/IndexTestFig.java   |  57 ++++
 .../index/guice/TestIndexModule.java            |   8 +-
 .../index/impl/CorePerformanceIT.java           | 339 -------------------
 .../impl/EntityConnectionIndexImplTest.java     | 306 -----------------
 .../persistence/index/impl/EntityIndexTest.java |  32 +-
 .../index/impl/IndexLoadTestsIT.java            | 138 ++++++++
 stack/pom.xml                                   |   8 +-
 .../management/importer/ImportServiceImpl.java  |  34 +-
 .../impl/ApplicationQueueManagerImpl.java       | 195 +++++------
 .../setup/ConcurrentProcessSingleton.java       |  16 +-
 62 files changed, 900 insertions(+), 1820 deletions(-)
----------------------------------------------------------------------



[06/12] incubator-usergrid git commit: Resolves class loader problems during test

Posted by sf...@apache.org.
Resolves class loader problems during test


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/61067f13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/61067f13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/61067f13

Branch: refs/heads/USERGRID-480
Commit: 61067f1300353b5a58f4b40e28faeae3977bdc4a
Parents: 282e227
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Mar 19 18:31:34 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Mar 19 18:31:34 2015 -0600

----------------------------------------------------------------------
 stack/core/pom.xml     | 7 +++----
 stack/pom.xml          | 6 ++++--
 stack/rest/pom.xml     | 1 +
 stack/services/pom.xml | 1 +
 4 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61067f13/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index 119a52b..ba73150 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -88,10 +88,9 @@
                 <forkCount>${usergrid.it.forkCount}</forkCount>
                 <threadCount>${usergrid.it.threads}</threadCount>
                 <reuseForks>true</reuseForks>
-                <argLine>-Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true -Xmx${ug.heapmax}
-                    -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
-                    -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}
-                </argLine>
+                <argLine>-Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true -Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline} </argLine>
+                <!-- see this page for documentation on classloading issues http://maven.apache.org/surefire/maven-surefire-plugin/examples/class-loading.html -->
+                <useSystemClassLoader>false</useSystemClassLoader>
                 <includes>
                     <include>**/*IT.java</include>
                     <include>**/*Test.java</include>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61067f13/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index efbda2d..a107114 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -109,7 +109,7 @@
       <jersey-version>1.18.1</jersey-version>
       <junit-version>4.12</junit-version>
       <log4j-version>1.2.16</log4j-version>
-      <org.springframework.version>3.1.2.RELEASE</org.springframework.version>
+      <org.springframework.version>3.2.13.RELEASE</org.springframework.version>
       <shiro-version>1.2.3</shiro-version>
       <slf4j-version>1.6.1</slf4j-version>
       <snakeyaml-version>1.8</snakeyaml-version>
@@ -1461,6 +1461,8 @@
                           <forkCount>${usergrid.it.forkCount}</forkCount>
                           <reuseForks>${usergrid.it.reuseForks}</reuseForks>
                           <threadCount>${usergrid.it.forkCount}</threadCount>
+                          <!-- see this page for documentation on classloading issues http://maven.apache.org/surefire/maven-surefire-plugin/examples/class-loading.html -->
+                          <useSystemClassLoader>false</useSystemClassLoader>
                           <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin}  -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}</argLine>
                           <testFailureIgnore>false</testFailureIgnore>
                       </configuration>
@@ -1471,7 +1473,7 @@
                       <dependencies>
                           <dependency>
                               <groupId>org.apache.maven.surefire</groupId>
-                              <artifactId>surefire-junit4</artifactId>
+                              <artifactId>${surefire.plugin.artifactName}</artifactId>
                               <version>${surefire.plugin.version}</version>
 
                               <!--<exclusions>-->

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61067f13/stack/rest/pom.xml
----------------------------------------------------------------------
diff --git a/stack/rest/pom.xml b/stack/rest/pom.xml
index deb11f8..974945f 100644
--- a/stack/rest/pom.xml
+++ b/stack/rest/pom.xml
@@ -102,6 +102,7 @@
                     <parallel>methods</parallel>
                     <forkCount>1</forkCount>
                     <threadCount>${usergrid.rest.threads}</threadCount>
+                    <useSystemClassLoader>false</useSystemClassLoader>
                     <reuseForks>true</reuseForks>
                     <argLine>-Dwebapp.directory=${basedir}/src/main/webapp -Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true -Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}
                     </argLine>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61067f13/stack/services/pom.xml
----------------------------------------------------------------------
diff --git a/stack/services/pom.xml b/stack/services/pom.xml
index 68f1a4a..6074e65 100644
--- a/stack/services/pom.xml
+++ b/stack/services/pom.xml
@@ -92,6 +92,7 @@
                 <forkCount>${usergrid.it.forkCount}</forkCount>
                 <threadCount>${usergrid.it.threads}</threadCount>
                 <reuseForks>${usergrid.it.reuseForks}</reuseForks>
+                <useSystemClassLoader>false</useSystemClassLoader>
                 <argLine>-Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true -Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}
                 </argLine>
                 <includes>


[11/12] incubator-usergrid git commit: merge from 405

Posted by sf...@apache.org.
merge from 405


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/75d61b27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/75d61b27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/75d61b27

Branch: refs/heads/USERGRID-480
Commit: 75d61b270efca8cd7182543a977deafcd136582c
Parents: f7e78f4 dcf4693
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Mar 20 08:44:34 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Mar 20 08:44:34 2015 -0600

----------------------------------------------------------------------
 stack/core/pom.xml                              |  26 +-
 .../corepersistence/CpEntityManager.java        |   2 +-
 .../corepersistence/CpEntityManagerFactory.java |   7 +-
 .../corepersistence/CpRelationManager.java      |  24 +-
 .../usergrid/corepersistence/CpWalker.java      |  81 ++---
 .../events/EntityVersionDeletedHandler.java     |  71 ++--
 .../migration/EntityTypeMappingMigration.java   |  41 +--
 .../persistence/ObservableIterator.java         |   1 +
 .../migration/EntityTypeMappingMigrationIT.java |   2 +-
 .../PerformanceEntityRebuildIndexTest.java      |   4 +-
 stack/corepersistence/collection/pom.xml        |  14 +-
 .../impl/EntityCollectionManagerImpl.java       |  10 +-
 .../collection/impl/EntityDeletedTask.java      |  20 +-
 .../impl/EntityVersionCleanupTask.java          |  40 +--
 .../impl/EntityVersionCreatedTask.java          |  26 +-
 .../mvcc/stage/write/WriteCommit.java           |   2 +-
 .../mvcc/stage/write/WriteUniqueVerify.java     |   8 +-
 .../MvccEntitySerializationStrategyImpl.java    |  92 ++---
 .../MvccEntitySerializationStrategyV3Impl.java  |  94 ++---
 .../UniqueValueSerializationStrategyImpl.java   |   8 -
 .../migration/MvccEntityDataMigrationImpl.java  | 171 ++++------
 .../collection/util/EntityUtils.java            |  72 ----
 .../mvcc/stage/AbstractEntityStageTest.java     |   2 +-
 .../mvcc/stage/AbstractMvccEntityStageTest.java |   2 +-
 .../mvcc/stage/TestEntityGenerator.java         |   2 +-
 .../persistence/collection/rx/ParallelTest.java |  10 +-
 ...MvccEntitySerializationStrategyImplTest.java |   4 +-
 ...ccEntitySerializationStrategyV1ImplTest.java |   4 +-
 ...ccEntitySerializationStrategyV2ImplTest.java |   2 +-
 .../impl/SerializationComparison.java           |   4 +-
 ...ctMvccEntityDataMigrationV1ToV3ImplTest.java |   2 +-
 .../collection/util/InvalidEntityGenerator.java |   1 +
 stack/corepersistence/common/pom.xml            |  15 +-
 .../astyanax/MultiKeyColumnNameIterator.java    |   4 +-
 .../MultiKeyColumnNameIteratorTest.java         | 187 +++++-----
 .../astyanax/MultiRowColumnIteratorTest.java    |  50 ++-
 .../graph/impl/GraphManagerImpl.java            |   6 +-
 .../graph/impl/stage/EdgeMetaRepairImpl.java    |   2 +
 .../impl/stage/NodeDeleteListenerImpl.java      |   2 +-
 .../impl/migration/EdgeDataMigrationImpl.java   |  87 +++--
 .../persistence/graph/GraphManagerIT.java       |  17 +-
 .../graph/GraphManagerShardConsistencyIT.java   |   2 +-
 .../usergrid/persistence/graph/SimpleTest.java  |  12 +-
 .../migration/EdgeDataMigrationImplTest.java    |   2 +-
 stack/corepersistence/model/pom.xml             |   1 -
 .../persistence/model/util/EntityUtils.java     |  72 ++++
 stack/corepersistence/pom.xml                   |   8 +-
 stack/corepersistence/queryindex/pom.xml        |   6 -
 .../persistence/index/query/EntityResults.java  | 108 ------
 .../persistence/index/query/Results.java        | 148 --------
 .../persistence/index/utils/ListUtils.java      |   6 +-
 .../persistence/index/guice/IndexTestFig.java   |  57 ++++
 .../index/guice/TestIndexModule.java            |   8 +-
 .../index/impl/CorePerformanceIT.java           | 339 -------------------
 .../impl/EntityConnectionIndexImplTest.java     | 302 -----------------
 .../persistence/index/impl/EntityIndexTest.java |  32 +-
 .../index/impl/IndexLoadTestsIT.java            | 138 ++++++++
 stack/pom.xml                                   |   8 +-
 .../management/importer/ImportServiceImpl.java  |  34 +-
 .../impl/ApplicationQueueManagerImpl.java       | 195 +++++------
 .../setup/ConcurrentProcessSingleton.java       |  16 +-
 61 files changed, 899 insertions(+), 1814 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/75d61b27/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/75d61b27/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/75d61b27/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index df01c47,c4e970d..f505fa3
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@@ -1055,9 -1058,9 +1055,9 @@@ public class CpRelationManager implemen
                  cpHeadEntity.getId(), edgeType, targetEntity.getId(), System.currentTimeMillis() );
  
          GraphManager gm = managerCache.getGraphManager( applicationScope );
-         gm.writeEdge( edge ).toBlockingObservable().last();
+         gm.writeEdge( edge ).toBlocking().last();
  
 -        EntityIndex ei = managerCache.getEntityIndex( applicationScope );
 +        ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope );
          EntityIndexBatch batch = ei.createBatch();
  
          // Index the new connection in app|source|type context
@@@ -1287,9 -1290,9 +1287,9 @@@
                  System.currentTimeMillis() );
  
          GraphManager gm = managerCache.getGraphManager( applicationScope );
-         gm.deleteEdge( edge ).toBlockingObservable().last();
+         gm.deleteEdge( edge ).toBlocking().last();
  
 -        final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
 +        final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope );
          final EntityIndexBatch batch = ei.createBatch();
  
          // Deindex the connection in app|source|type context

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/75d61b27/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index ece7562,23f5a32..f0dd136
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@@ -17,21 -17,18 +17,20 @@@
   */
  package org.apache.usergrid.corepersistence.events;
  
- import com.google.inject.Inject;
- import com.google.inject.Singleton;
  
  import java.util.List;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
  import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
- import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
  import org.apache.usergrid.persistence.EntityManagerFactory;
  import org.apache.usergrid.persistence.collection.CollectionScope;
- import org.apache.usergrid.persistence.collection.MvccEntity;
  import org.apache.usergrid.persistence.collection.MvccLogEntry;
  import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 +import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 +import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
  import org.apache.usergrid.persistence.index.EntityIndex;
- import org.apache.usergrid.persistence.index.EntityIndexBatch;
  import org.apache.usergrid.persistence.index.IndexScope;
  import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
  import org.apache.usergrid.persistence.model.entity.Id;
@@@ -84,28 -78,21 +80,21 @@@ public class EntityVersionDeletedHandle
                  } );
          }
  
-         CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf;
+         CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
  
 -        final EntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
 +        final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
  
-         final IndexScope indexScope = new IndexScopeImpl(
-                 new SimpleId(scope.getOwner().getUuid(), scope.getOwner().getType()),
-                 scope.getName()
-         );
- 
-         Observable.from( entityVersions )
-             .collect( ei.createBatch(), new Action2<EntityIndexBatch, MvccLogEntry>() {
-                 @Override
-                 public void call( final EntityIndexBatch entityIndexBatch, final MvccLogEntry mvccLogEntry ) {
-                     entityIndexBatch.deindex( indexScope, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() );
-                 }
-             } ).doOnNext( new Action1<EntityIndexBatch>() {
-             @Override
-             public void call( final EntityIndexBatch entityIndexBatch ) {
+         final IndexScope indexScope =
+             new IndexScopeImpl( new SimpleId( scope.getOwner().getUuid(), scope.getOwner().getType() ),
+                 scope.getName() );
+ 
+         //create our batch, and then collect all of them into a single batch
+         Observable.from( entityVersions ).collect( () -> ei.createBatch(), ( entityIndexBatch, mvccLogEntry ) -> {
+             entityIndexBatch.deindex( indexScope, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() );
+         } )
+             //after our batch is collected, execute it
+             .doOnNext( entityIndexBatch -> {
                  entityIndexBatch.execute();
-             }
-         } ).toBlocking().last();
+             } ).toBlocking().last();
      }
- 
- 
  }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/75d61b27/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/75d61b27/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
index 303d481,0000000..e69de29
mode 100644,000000..100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/75d61b27/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index 8e8d6c0,0000000..e69de29
mode 100644,000000..100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/75d61b27/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------


[07/12] incubator-usergrid git commit: Added error logging to the iterator

Posted by sf...@apache.org.
Added error logging to the iterator


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c7e3459e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c7e3459e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c7e3459e

Branch: refs/heads/USERGRID-480
Commit: c7e3459ec79ed64e74adba0c5fc9c6dc8cd12252
Parents: 61067f1
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Mar 19 19:46:58 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Mar 19 19:46:58 2015 -0600

----------------------------------------------------------------------
 .../java/org/apache/usergrid/persistence/ObservableIterator.java | 1 +
 .../usergrid/persistence/PerformanceEntityRebuildIndexTest.java  | 4 +---
 .../usergrid/persistence/index/impl/EsEntityIndexImpl.java       | 2 +-
 3 files changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7e3459e/stack/core/src/main/java/org/apache/usergrid/persistence/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/ObservableIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/ObservableIterator.java
index b91bd22..9befb79 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/ObservableIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/ObservableIterator.java
@@ -70,6 +70,7 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T>
 
         //if any error occurs, we need to notify the observer so it can perform it's own error handling
         catch ( Throwable t ) {
+            log.error( "Unable to emit items from iterator {}", name, t );
             subscriber.onError( t );
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7e3459e/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index f1f165d..52d4a48 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -81,7 +81,6 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
 
     @After
     public void printReport() {
-
         logger.debug("Printing metrics report");
         reporter.report();
         reporter.stop();
@@ -428,8 +427,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
         }
 
         if ( expectedEntities != -1 && expectedEntities != count ) {
-            throw new RuntimeException("Did not get expected "
-                    + expectedEntities + " entities, instead got " + count );
+            throw new RuntimeException("Did not get expected " + expectedEntities + " entities, instead got " + count );
         }
         return count;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7e3459e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 8814583..d92ae7d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -662,7 +662,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
         if ( response.isAcknowledged() ) {
             logger.info( "Deleted index: read {} write {}", alias.getReadAlias(), alias.getWriteAlias());
-            //invlaidate the alias
+            //invalidate the alias
             aliasCache.invalidate(alias);
         }
         else {