You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/11/21 22:44:32 UTC
[23/28] incubator-usergrid git commit: Added tests for when we have
to make multiple trips to Cassandra.
Added tests for when we have to make multiple trips to Cassandra.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/56ce7ce6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/56ce7ce6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/56ce7ce6
Branch: refs/heads/index-alias
Commit: 56ce7ce6df21b08282aa484ead6c3a19397f0d14
Parents: 3e625b1
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Nov 21 13:32:52 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Nov 21 13:32:52 2014 -0700
----------------------------------------------------------------------
.../MvccEntitySerializationStrategyImpl.java | 132 ++++++++++++++-----
.../MvccEntitySerializationStrategyV1Impl.java | 6 +-
.../MvccEntitySerializationStrategyV2Impl.java | 5 +-
.../MvccEntitySerializationStrategyV2Test.java | 101 +++++++++++---
4 files changed, 188 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56ce7ce6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
index f3f4c13..c9ec9a8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
@@ -42,9 +42,9 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImp
import org.apache.usergrid.persistence.collection.serialization.EntityRepair;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
@@ -61,9 +61,15 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.model.Row;
+import com.netflix.astyanax.model.Rows;
import com.netflix.astyanax.query.RowQuery;
import com.netflix.astyanax.serializers.AbstractSerializer;
-import com.netflix.astyanax.serializers.UUIDSerializer;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+import rx.functions.Func2;
+import rx.schedulers.Schedulers;
/**
@@ -76,14 +82,17 @@ public abstract class MvccEntitySerializationStrategyImpl implements MvccEntityS
protected final Keyspace keyspace;
protected final SerializationFig serializationFig;
+ protected final CassandraFig cassandraFig;
protected final EntityRepair repair;
private final MultiTennantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> columnFamily;
@Inject
- public MvccEntitySerializationStrategyImpl( final Keyspace keyspace, final SerializationFig serializationFig ) {
+ public MvccEntitySerializationStrategyImpl( final Keyspace keyspace, final SerializationFig serializationFig,
+ final CassandraFig cassandraFig ) {
this.keyspace = keyspace;
this.serializationFig = serializationFig;
+ this.cassandraFig = cassandraFig;
this.repair = new EntityRepairImpl( this, serializationFig );
this.columnFamily = getColumnFamily();
}
@@ -100,17 +109,8 @@ public abstract class MvccEntitySerializationStrategyImpl implements MvccEntityS
return doWrite( collectionScope, entityId, new RowOp() {
@Override
public void doOp( final ColumnListMutation<UUID> colMutation ) {
-// try {
colMutation.putColumn( colName, getEntitySerializer()
.toByteBuffer( new EntityWrapper( entity.getStatus(), entity.getEntity() ) ) );
-// }
-// catch ( Exception e ) {
-// // throw better exception if we can
-// if ( entity != null || entity.getEntity().get() != null ) {
-// throw new CollectionRuntimeException( entity, collectionScope, e );
-// }
-// throw e;
-// }
}
} );
}
@@ -152,46 +152,110 @@ public abstract class MvccEntitySerializationStrategyImpl implements MvccEntityS
rowKeys.add( rowKey );
}
+ /**
+ * Our settings may mean we exceed our maximum thrift buffer size. If we do, we have to make multiple requests, not just one.
+ * Perform the calculations and the appropriate request patterns
+ *
+ */
+
+ final int maxEntityResultSizeInBytes = serializationFig.getMaxEntitySize() * entityIds.size();
- final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> latestEntityColumns;
+ //if we're less than 1, set the number of requests to 1
+ final int numberRequests = Math.max(1, maxEntityResultSizeInBytes / cassandraFig.getThriftBufferSize());
+ final int entitiesPerRequest = entityIds.size() / numberRequests;
- try {
- latestEntityColumns = keyspace.prepareQuery( columnFamily ).getKeySlice( rowKeys )
- .withColumnRange( maxVersion, null, false, 1 ).execute().getResult()
- .iterator();
+
+ final Scheduler scheduler;
+
+ //if it's a single request, run it on the same thread
+ if(numberRequests == 1){
+ scheduler = Schedulers.immediate();
}
- catch ( ConnectionException e ) {
- throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra",
- e );
+ //if it's more than 1 request, run them on the I/O scheduler
+ else{
+ scheduler = Schedulers.io();
}
- final EntitySetImpl entitySetResults = new EntitySetImpl( entityIds.size() );
+ final EntitySetImpl entitySetResults = Observable.from( rowKeys )
+ //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary)
+ .buffer(entitiesPerRequest )
+ .parallel( new Func1<Observable<List<ScopedRowKey
+ <CollectionPrefixedKey<Id>>>>, Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>>>() {
+
+
+ @Override
+ public Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> call(
+ final Observable<List<ScopedRowKey<CollectionPrefixedKey<Id>>>> listObservable ) {
+
+
+ //here, we execute our query then emit the items either in parallel, or on the current thread if we have more than 1 request
+ return listObservable.map( new Func1<List<ScopedRowKey<CollectionPrefixedKey<Id>>>,
+ Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>>() {
+
+
+ @Override
+ public Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> call(
+ final List<ScopedRowKey<CollectionPrefixedKey<Id>>> scopedRowKeys ) {
+
+ try {
+ return keyspace.prepareQuery( columnFamily ).getKeySlice( rowKeys )
+ .withColumnRange( maxVersion, null, false,
+ 1 ).execute().getResult();
+ }
+ catch ( ConnectionException e ) {
+ throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra",
+ e );
+ }
+ }
+ } );
- while ( latestEntityColumns.hasNext() ) {
- final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> row = latestEntityColumns.next();
- final ColumnList<UUID> columns = row.getColumns();
- if ( columns.size() == 0 ) {
- continue;
}
+ }, scheduler )
- final Id entityId = row.getKey().getKey().getSubKey();
+ //reduce all the output into a single Entity set
+ .reduce( new EntitySetImpl( entityIds.size() ),
+ new Func2<EntitySetImpl, Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>, EntitySetImpl>() {
+ @Override
+ public EntitySetImpl call( final EntitySetImpl entitySet,
+ final Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> rows ) {
- final Column<UUID> column = columns.getColumnByIndex( 0 );
+ final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> latestEntityColumns = rows.iterator();
- final MvccEntity parsedEntity =
- new MvccColumnParser( entityId, getEntitySerializer() ).parseColumn( column );
+ while ( latestEntityColumns.hasNext() ) {
+ final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> row = latestEntityColumns.next();
- //we *might* need to repair, it's not clear so check before loading into result sets
- final MvccEntity maybeRepaired = repair.maybeRepair( collectionScope, parsedEntity );
+ final ColumnList<UUID> columns = row.getColumns();
- entitySetResults.addEntity( maybeRepaired );
- }
+ if ( columns.size() == 0 ) {
+ continue;
+ }
+
+ final Id entityId = row.getKey().getKey().getSubKey();
+
+ final Column<UUID> column = columns.getColumnByIndex( 0 );
+
+ final MvccEntity parsedEntity =
+ new MvccColumnParser( entityId, getEntitySerializer() ).parseColumn( column );
+
+ //we *might* need to repair, it's not clear so check before loading into result sets
+ final MvccEntity maybeRepaired = repair.maybeRepair( collectionScope, parsedEntity );
+
+ entitySet.addEntity( maybeRepaired );
+ }
+
+
+
+ return entitySet;
+ }
+ } ).toBlocking().last();
return entitySetResults;
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56ce7ce6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
index b40243d..119fb6d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
@@ -26,6 +26,8 @@ import java.util.UUID;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
@@ -69,8 +71,8 @@ public class MvccEntitySerializationStrategyV1Impl extends MvccEntitySerializati
@Inject
- public MvccEntitySerializationStrategyV1Impl( final Keyspace keyspace, final SerializationFig serializationFig ) {
- super( keyspace, serializationFig );
+ public MvccEntitySerializationStrategyV1Impl( final Keyspace keyspace, final SerializationFig serializationFig, final CassandraFig cassandraFig ) {
+ super( keyspace, serializationFig, cassandraFig );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56ce7ce6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
index 923c399..cd46c1e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
@@ -27,6 +27,7 @@ import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
import org.apache.usergrid.persistence.collection.exception.EntityTooLargeException;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.FieldBuffer;
import org.apache.usergrid.persistence.core.astyanax.FieldBufferBuilder;
import org.apache.usergrid.persistence.core.astyanax.FieldBufferParser;
@@ -72,8 +73,8 @@ public class MvccEntitySerializationStrategyV2Impl extends MvccEntitySerializati
@Inject
- public MvccEntitySerializationStrategyV2Impl( final Keyspace keyspace, final SerializationFig serializationFig ) {
- super( keyspace, serializationFig );
+ public MvccEntitySerializationStrategyV2Impl( final Keyspace keyspace, final SerializationFig serializationFig, final CassandraFig cassandraFig ) {
+ super( keyspace, serializationFig, cassandraFig );
entitySerializer = new EntitySerializer( serializationFig );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56ce7ce6/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java
index a496519..5f633a1 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java
@@ -20,25 +20,25 @@
package org.apache.usergrid.persistence.collection.serialization.impl;
-import java.lang.annotation.Annotation;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import java.util.UUID;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.safehaus.guicyfig.Bypass;
-import org.safehaus.guicyfig.Env;
-import org.safehaus.guicyfig.Option;
import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntitySet;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.exception.EntityTooLargeException;
import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.util.EntityHelper;
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
import org.apache.usergrid.persistence.core.guicyfig.SetConfigTestBypass;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Entity;
@@ -49,6 +49,7 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.google.inject.Inject;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -58,21 +59,27 @@ public abstract class MvccEntitySerializationStrategyV2Test extends MvccEntitySe
@Inject
protected SerializationFig serializationFig;
+ @Inject
+ protected CassandraFig cassandraFig;
+
+
private int setMaxEntitySize;
@Before
- public void setUp(){
+ public void setUp() {
- setMaxEntitySize = serializationFig.getMaxEntitySize();
+ setMaxEntitySize = serializationFig.getMaxEntitySize();
}
+
@After
- public void tearDown(){
+ public void tearDown() {
SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", setMaxEntitySize + "" );
}
+
/**
* Tests an entity with more than 65535 bytes worth of data is successfully stored and retrieved
*/
@@ -83,7 +90,7 @@ public abstract class MvccEntitySerializationStrategyV2Test extends MvccEntitySe
//this is the size it works out to be when serialized, we want to allow this size
- SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", 65535*10+"");
+ SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", 65535 * 10 + "" );
final Entity entity = EntityHelper.generateEntity( setSize );
//now we have one massive, entity, save it and retrieve it.
@@ -106,7 +113,12 @@ public abstract class MvccEntitySerializationStrategyV2Test extends MvccEntitySe
getMvccEntitySerializationStrategy().loadDescendingHistory( context, id, version, 100 );
- assertLargeEntity( mvccEntity, loaded );
+ assertTrue( loaded.hasNext() );
+
+ final MvccEntity loadedEntity = loaded.next();
+
+ assertLargeEntity( mvccEntity, loadedEntity );
+
MvccEntity returned =
serializationStrategy.load( context, Collections.singleton( id ), version ).getEntity( id );
@@ -115,7 +127,6 @@ public abstract class MvccEntitySerializationStrategyV2Test extends MvccEntitySe
}
-
/**
* Tests an entity with more than 65535 bytes worth of data is successfully stored and retrieved
*/
@@ -142,23 +153,77 @@ public abstract class MvccEntitySerializationStrategyV2Test extends MvccEntitySe
}
- protected void assertLargeEntity( final MvccEntity expected, final Iterator<MvccEntity> returned ) {
- assertTrue( returned.hasNext() );
+ /**
+ * Tests an entity with more than 65535 bytes worth of data is successfully stored and retrieved
+ */
+ @Test
+ public void largeEntityReadWrite() throws ConnectionException {
+
+ //this is the size it works out to be when serialized, we want to allow this size
+
+ //extreme edge case, we can only get 2 entities per call
+ final int thriftBuffer = cassandraFig.getThriftBufferSize();
- final MvccEntity loadedEntity = returned.next();
- assertLargeEntity( expected, loadedEntity );
- }
+ //we use 20, using 2 causes cassandra to OOM. We don't have a large enough instance running locally
- protected void assertLargeEntity( final MvccEntity expected, final MvccEntity returned ) {
+ final int maxEntitySize = ( int ) ( ( thriftBuffer * .9 ) / 20 );
- org.junit.Assert.assertEquals( "The loaded entity should match the stored entity", expected, returned );
- EntityHelper.verifyDeepEquals( expected.getEntity().get(), returned.getEntity().get() );
+ SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", maxEntitySize + "" );
+
+
+ final int size = 100;
+
+ final HashMap<Id, MvccEntity> entities = new HashMap<>( size );
+
+ CollectionScope context =
+ new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "parent" ), "tests" );
+
+
+ for ( int i = 0; i < size; i++ ) {
+ final Entity entity = EntityHelper.generateEntity( ( int ) (maxEntitySize*.4) );
+
+ //now we have one massive, entity, save it and retrieve it.
+
+ final Id id = entity.getId();
+ ValidationUtils.verifyIdentity( id );
+ final UUID version = UUIDGenerator.newTimeUUID();
+ final MvccEntity.Status status = MvccEntity.Status.COMPLETE;
+
+ final MvccEntity mvccEntity = new MvccEntityImpl( id, version, status, entity );
+
+
+ getMvccEntitySerializationStrategy().write( context, mvccEntity ).execute();
+
+ entities.put( id, mvccEntity );
+ }
+
+
+ //now load it, we ask for 100 and we only are allowed 2 per trip due to our max size constraints. Should all
+ //still load (note that users should not be encouraged to use this strategy, it's a bad idea!)
+ final EntitySet loaded =
+ getMvccEntitySerializationStrategy().load( context, entities.keySet(), UUIDGenerator.newTimeUUID() );
+
+ assertNotNull( "Entity set was loaded", loaded );
+
+
+ for ( Map.Entry<Id, MvccEntity> entry : entities.entrySet() ) {
+
+ final MvccEntity returned = loaded.getEntity( entry.getKey() );
+
+ assertLargeEntity( entry.getValue(), returned );
+ }
}
+ protected void assertLargeEntity( final MvccEntity expected, final MvccEntity returned ) {
+
+ org.junit.Assert.assertEquals( "The loaded entity should match the stored entity", expected, returned );
+
+ EntityHelper.verifyDeepEquals( expected.getEntity().get(), returned.getEntity().get() );
+ }
}