You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/11/21 22:44:32 UTC

[23/28] incubator-usergrid git commit: Added tests for when we have to make multiple trips to Cassandra.

Added tests for when we have to make multiple trips to Cassandra.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/56ce7ce6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/56ce7ce6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/56ce7ce6

Branch: refs/heads/index-alias
Commit: 56ce7ce6df21b08282aa484ead6c3a19397f0d14
Parents: 3e625b1
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Nov 21 13:32:52 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Nov 21 13:32:52 2014 -0700

----------------------------------------------------------------------
 .../MvccEntitySerializationStrategyImpl.java    | 132 ++++++++++++++-----
 .../MvccEntitySerializationStrategyV1Impl.java  |   6 +-
 .../MvccEntitySerializationStrategyV2Impl.java  |   5 +-
 .../MvccEntitySerializationStrategyV2Test.java  | 101 +++++++++++---
 4 files changed, 188 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56ce7ce6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
index f3f4c13..c9ec9a8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
@@ -42,9 +42,9 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImp
 import org.apache.usergrid.persistence.collection.serialization.EntityRepair;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
 import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
@@ -61,9 +61,15 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.ColumnList;
 import com.netflix.astyanax.model.Row;
+import com.netflix.astyanax.model.Rows;
 import com.netflix.astyanax.query.RowQuery;
 import com.netflix.astyanax.serializers.AbstractSerializer;
-import com.netflix.astyanax.serializers.UUIDSerializer;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+import rx.functions.Func2;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -76,14 +82,17 @@ public abstract class MvccEntitySerializationStrategyImpl implements MvccEntityS
 
     protected final Keyspace keyspace;
     protected final SerializationFig serializationFig;
+    protected final CassandraFig cassandraFig;
     protected final EntityRepair repair;
     private final MultiTennantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>  columnFamily;
 
 
     @Inject
-    public MvccEntitySerializationStrategyImpl( final Keyspace keyspace, final SerializationFig serializationFig ) {
+    public MvccEntitySerializationStrategyImpl( final Keyspace keyspace, final SerializationFig serializationFig,
+                                                final CassandraFig cassandraFig ) {
         this.keyspace = keyspace;
         this.serializationFig = serializationFig;
+        this.cassandraFig = cassandraFig;
         this.repair = new EntityRepairImpl( this, serializationFig );
         this.columnFamily = getColumnFamily();
     }
@@ -100,17 +109,8 @@ public abstract class MvccEntitySerializationStrategyImpl implements MvccEntityS
         return doWrite( collectionScope, entityId, new RowOp() {
             @Override
             public void doOp( final ColumnListMutation<UUID> colMutation ) {
-//                try {
                     colMutation.putColumn( colName, getEntitySerializer()
                             .toByteBuffer( new EntityWrapper( entity.getStatus(), entity.getEntity() ) ) );
-//                }
-//                catch ( Exception e ) {
-//                    // throw better exception if we can
-//                    if ( entity != null || entity.getEntity().get() != null ) {
-//                        throw new CollectionRuntimeException( entity, collectionScope, e );
-//                    }
-//                    throw e;
-//                }
             }
         } );
     }
@@ -152,46 +152,110 @@ public abstract class MvccEntitySerializationStrategyImpl implements MvccEntityS
             rowKeys.add( rowKey );
         }
 
+        /**
+         * Our settings may mean we exceed our maximum thrift buffer size. If we do, we have to make multiple requests, not just one.
+         * Perform the calculations and the appropriate request patterns
+         *
+         */
+
+        final int maxEntityResultSizeInBytes = serializationFig.getMaxEntitySize() * entityIds.size();
 
-        final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> latestEntityColumns;
+        //if we're less than 1, set the number of requests to 1
+        final int numberRequests = Math.max(1, maxEntityResultSizeInBytes / cassandraFig.getThriftBufferSize());
 
+        final int entitiesPerRequest = entityIds.size() / numberRequests;
 
-        try {
-            latestEntityColumns = keyspace.prepareQuery( columnFamily ).getKeySlice( rowKeys )
-                                          .withColumnRange( maxVersion, null, false, 1 ).execute().getResult()
-                                          .iterator();
+
+        final Scheduler scheduler;
+
+        //if it's a single request, run it on the same thread
+        if(numberRequests == 1){
+            scheduler = Schedulers.immediate();
         }
-        catch ( ConnectionException e ) {
-            throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra",
-                    e );
+        //if it's more than 1 request, run them on the I/O scheduler
+        else{
+            scheduler = Schedulers.io();
         }
 
 
-        final EntitySetImpl entitySetResults = new EntitySetImpl( entityIds.size() );
+       final EntitySetImpl entitySetResults =  Observable.from( rowKeys )
+               //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary)
+                                                         .buffer(entitiesPerRequest )
+                                                         .parallel( new Func1<Observable<List<ScopedRowKey
+                <CollectionPrefixedKey<Id>>>>, Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>>>() {
+
+
+            @Override
+            public Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> call(
+                    final Observable<List<ScopedRowKey<CollectionPrefixedKey<Id>>>> listObservable ) {
+
+
+                 //here, we execute our query then emit the items either in parallel, or on the current thread if we have more than 1 request
+                return listObservable.map( new Func1<List<ScopedRowKey<CollectionPrefixedKey<Id>>>,
+                        Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>>() {
+
+
+                    @Override
+                    public Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> call(
+                            final List<ScopedRowKey<CollectionPrefixedKey<Id>>> scopedRowKeys ) {
+
+                            try {
+                                return keyspace.prepareQuery( columnFamily ).getKeySlice( rowKeys )
+                                                              .withColumnRange( maxVersion, null, false,
+                                                                      1 ).execute().getResult();
+                            }
+                            catch ( ConnectionException e ) {
+                                throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra",
+                                        e );
+                            }
+                    }
+                } );
 
-        while ( latestEntityColumns.hasNext() ) {
-            final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> row = latestEntityColumns.next();
 
-            final ColumnList<UUID> columns = row.getColumns();
 
-            if ( columns.size() == 0 ) {
-                continue;
             }
+        }, scheduler )
 
-            final Id entityId = row.getKey().getKey().getSubKey();
+               //reduce all the output into a single Entity set
+               .reduce( new EntitySetImpl( entityIds.size() ),
+                new Func2<EntitySetImpl, Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>, EntitySetImpl>() {
+                    @Override
+                    public EntitySetImpl call( final EntitySetImpl entitySet,
+                                               final Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> rows ) {
 
-            final Column<UUID> column = columns.getColumnByIndex( 0 );
+                        final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> latestEntityColumns = rows.iterator();
 
-            final MvccEntity parsedEntity =
-                    new MvccColumnParser( entityId, getEntitySerializer() ).parseColumn( column );
+                        while ( latestEntityColumns.hasNext() ) {
+                                   final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> row = latestEntityColumns.next();
 
-            //we *might* need to repair, it's not clear so check before loading into result sets
-            final MvccEntity maybeRepaired = repair.maybeRepair( collectionScope, parsedEntity );
+                                   final ColumnList<UUID> columns = row.getColumns();
 
-            entitySetResults.addEntity( maybeRepaired );
-        }
+                                   if ( columns.size() == 0 ) {
+                                       continue;
+                                   }
+
+                                   final Id entityId = row.getKey().getKey().getSubKey();
+
+                                   final Column<UUID> column = columns.getColumnByIndex( 0 );
+
+                                   final MvccEntity parsedEntity =
+                                           new MvccColumnParser( entityId, getEntitySerializer() ).parseColumn( column );
+
+                                   //we *might* need to repair, it's not clear so check before loading into result sets
+                                   final MvccEntity maybeRepaired = repair.maybeRepair( collectionScope, parsedEntity );
+
+                                entitySet.addEntity( maybeRepaired );
+                               }
+
+
+
+                        return entitySet;
+                    }
+                } ).toBlocking().last();
 
         return entitySetResults;
+
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56ce7ce6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
index b40243d..119fb6d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
@@ -26,6 +26,8 @@ import java.util.UUID;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
@@ -69,8 +71,8 @@ public class MvccEntitySerializationStrategyV1Impl extends MvccEntitySerializati
 
 
     @Inject
-    public MvccEntitySerializationStrategyV1Impl( final Keyspace keyspace, final SerializationFig serializationFig ) {
-        super( keyspace, serializationFig );
+    public MvccEntitySerializationStrategyV1Impl( final Keyspace keyspace, final SerializationFig serializationFig, final CassandraFig cassandraFig ) {
+        super( keyspace, serializationFig, cassandraFig );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56ce7ce6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
index 923c399..cd46c1e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
@@ -27,6 +27,7 @@ import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
 import org.apache.usergrid.persistence.collection.exception.EntityTooLargeException;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.apache.usergrid.persistence.core.astyanax.FieldBuffer;
 import org.apache.usergrid.persistence.core.astyanax.FieldBufferBuilder;
 import org.apache.usergrid.persistence.core.astyanax.FieldBufferParser;
@@ -72,8 +73,8 @@ public class MvccEntitySerializationStrategyV2Impl extends MvccEntitySerializati
 
 
     @Inject
-    public MvccEntitySerializationStrategyV2Impl( final Keyspace keyspace, final SerializationFig serializationFig ) {
-        super( keyspace, serializationFig );
+    public MvccEntitySerializationStrategyV2Impl( final Keyspace keyspace, final SerializationFig serializationFig, final CassandraFig cassandraFig ) {
+        super( keyspace, serializationFig, cassandraFig );
         entitySerializer = new EntitySerializer( serializationFig );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56ce7ce6/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java
index a496519..5f633a1 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java
@@ -20,25 +20,25 @@
 package org.apache.usergrid.persistence.collection.serialization.impl;
 
 
-import java.lang.annotation.Annotation;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.UUID;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.safehaus.guicyfig.Bypass;
-import org.safehaus.guicyfig.Env;
-import org.safehaus.guicyfig.Option;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntitySet;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.exception.EntityTooLargeException;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.util.EntityHelper;
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.apache.usergrid.persistence.core.guicyfig.SetConfigTestBypass;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -49,6 +49,7 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import com.google.inject.Inject;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 
@@ -58,21 +59,27 @@ public abstract class MvccEntitySerializationStrategyV2Test extends MvccEntitySe
     @Inject
     protected SerializationFig serializationFig;
 
+    @Inject
+    protected CassandraFig cassandraFig;
+
+
     private int setMaxEntitySize;
 
 
     @Before
-    public void setUp(){
+    public void setUp() {
 
 
-      setMaxEntitySize =  serializationFig.getMaxEntitySize();
+        setMaxEntitySize = serializationFig.getMaxEntitySize();
     }
 
+
     @After
-    public void tearDown(){
+    public void tearDown() {
         SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", setMaxEntitySize + "" );
     }
 
+
     /**
      * Tests an entity with more than  65535 bytes worth of data is successfully stored and retrieved
      */
@@ -83,7 +90,7 @@ public abstract class MvccEntitySerializationStrategyV2Test extends MvccEntitySe
 
         //this is the size it works out to be when serialized, we want to allow this size
 
-        SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", 65535*10+"");
+        SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", 65535 * 10 + "" );
         final Entity entity = EntityHelper.generateEntity( setSize );
 
         //now we have one massive, entity, save it and retrieve it.
@@ -106,7 +113,12 @@ public abstract class MvccEntitySerializationStrategyV2Test extends MvccEntitySe
                 getMvccEntitySerializationStrategy().loadDescendingHistory( context, id, version, 100 );
 
 
-        assertLargeEntity( mvccEntity, loaded );
+        assertTrue( loaded.hasNext() );
+
+        final MvccEntity loadedEntity = loaded.next();
+
+        assertLargeEntity( mvccEntity, loadedEntity );
+
 
         MvccEntity returned =
                 serializationStrategy.load( context, Collections.singleton( id ), version ).getEntity( id );
@@ -115,7 +127,6 @@ public abstract class MvccEntitySerializationStrategyV2Test extends MvccEntitySe
     }
 
 
-
     /**
      * Tests an entity with more than  65535 bytes worth of data is successfully stored and retrieved
      */
@@ -142,23 +153,77 @@ public abstract class MvccEntitySerializationStrategyV2Test extends MvccEntitySe
     }
 
 
-    protected void assertLargeEntity( final MvccEntity expected, final Iterator<MvccEntity> returned ) {
-        assertTrue( returned.hasNext() );
+    /**
+     * Tests an entity with more than  65535 bytes worth of data is successfully stored and retrieved
+     */
+    @Test
+    public void largeEntityReadWrite() throws ConnectionException {
+
+        //this is the size it works out to be when serialized, we want to allow this size
+
+        //extreme edge case, we can only get 2 entities per call
+        final int thriftBuffer = cassandraFig.getThriftBufferSize();
 
-        final MvccEntity loadedEntity = returned.next();
 
-        assertLargeEntity( expected, loadedEntity );
-    }
 
+        //we use 20, using 2 causes cassandra to OOM. We don't have a large enough instance running locally
 
-    protected void assertLargeEntity( final MvccEntity expected, final MvccEntity returned ) {
+        final int maxEntitySize = ( int ) ( ( thriftBuffer * .9 ) / 20  );
 
-        org.junit.Assert.assertEquals( "The loaded entity should match the stored entity", expected, returned );
 
-        EntityHelper.verifyDeepEquals( expected.getEntity().get(), returned.getEntity().get() );
+        SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", maxEntitySize + "" );
+
+
+        final int size = 100;
+
+        final HashMap<Id, MvccEntity> entities = new HashMap<>( size );
+
+        CollectionScope context =
+                new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "parent" ), "tests" );
+
+
+        for ( int i = 0; i < size; i++ ) {
+            final Entity entity = EntityHelper.generateEntity( ( int ) (maxEntitySize*.4) );
+
+            //now we have one massive, entity, save it and retrieve it.
+
+            final Id id = entity.getId();
+            ValidationUtils.verifyIdentity( id );
+            final UUID version = UUIDGenerator.newTimeUUID();
+            final MvccEntity.Status status = MvccEntity.Status.COMPLETE;
+
+            final MvccEntity mvccEntity = new MvccEntityImpl( id, version, status, entity );
+
+
+            getMvccEntitySerializationStrategy().write( context, mvccEntity ).execute();
+
+            entities.put( id, mvccEntity );
+        }
+
+
+        //now load it, we ask for 100 and we only are allowed 2 per trip due to our max size constraints.  Should all
+        //still load (note that users should not be encouraged to use this strategy, it's a bad idea!)
+        final EntitySet loaded =
+                getMvccEntitySerializationStrategy().load( context, entities.keySet(), UUIDGenerator.newTimeUUID() );
+
+        assertNotNull( "Entity set was loaded", loaded );
+
+
+        for ( Map.Entry<Id, MvccEntity> entry : entities.entrySet() ) {
+
+            final MvccEntity returned = loaded.getEntity( entry.getKey() );
+
+            assertLargeEntity( entry.getValue(), returned );
+        }
     }
 
 
 
 
+    protected void assertLargeEntity( final MvccEntity expected, final MvccEntity returned ) {
+
+        org.junit.Assert.assertEquals( "The loaded entity should match the stored entity", expected, returned );
+
+        EntityHelper.verifyDeepEquals( expected.getEntity().get(), returned.getEntity().get() );
+    }
 }