You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/08/17 21:48:29 UTC
[22/38] usergrid git commit: Re-write SCOPED_CACHE serialization to
use Datastax driver and CQL.
Re-write SCOPED_CACHE serialization to use Datastax driver and CQL.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4475158f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4475158f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4475158f
Branch: refs/heads/master
Commit: 4475158f1b48aa485d6af7d90caa91948ac2f46f
Parents: 6a1fd22
Author: Michael Russo <mr...@apigee.com>
Authored: Sun May 1 16:56:56 2016 +0800
Committer: Michael Russo <mr...@apigee.com>
Committed: Sun May 1 16:56:56 2016 +0800
----------------------------------------------------------------------
.../impl/ScopedCacheSerializationImpl.java | 311 ++++++++++---------
1 file changed, 170 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4475158f/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
index 1646d36..ffa5f1f 100644
--- a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
+++ b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
@@ -16,10 +16,16 @@
*/
package org.apache.usergrid.persistence.cache.impl;
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Using;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.hash.Funnel;
@@ -39,6 +45,7 @@ import com.netflix.astyanax.serializers.StringSerializer;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.usergrid.persistence.cache.CacheScope;
import org.apache.usergrid.persistence.core.astyanax.*;
+import org.apache.usergrid.persistence.core.datastax.CQLUtils;
import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator;
import org.apache.usergrid.persistence.core.shard.StringHashUtils;
@@ -46,9 +53,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
+import java.nio.ByteBuffer;
+import java.util.*;
import java.util.concurrent.Callable;
@@ -57,55 +63,50 @@ import java.util.concurrent.Callable;
*/
public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerialization<K,V> {
- // row-keys are application ID + consistent hash key
- // column names are K key toString()
- // column values are serialization of V value
-
public static final Logger logger = LoggerFactory.getLogger(ScopedCacheSerializationImpl.class);
+ // row-keys are (app UUID, application type, app UUID as string, consistent hash int as bucket number)
+ // column names are K key toString()
+ // column values are serialization of V value
- private static final CacheRowKeySerializer ROWKEY_SERIALIZER = new CacheRowKeySerializer();
-
- private static final BucketScopedRowKeySerializer<String> BUCKET_ROWKEY_SERIALIZER =
- new BucketScopedRowKeySerializer<>( ROWKEY_SERIALIZER );
-
- private static final Serializer<String> COLUMN_NAME_SERIALIZER = StringSerializer.get();
+ private static final String SCOPED_CACHE_TABLE = CQLUtils.quote("SCOPED_CACHE");
+ private static final Collection<String> SCOPED_CACHE_PARTITION_KEYS = Collections.singletonList("key");
+ private static final Collection<String> SCOPED_CACHE_COLUMN_KEYS = Collections.singletonList("column1");
+ private static final Map<String, DataType.Name> SCOPED_CACHE_COLUMNS =
+ new HashMap<String, DataType.Name>() {{
+ put( "key", DataType.Name.BLOB );
+ put( "column1", DataType.Name.BLOB );
+ put( "value", DataType.Name.BLOB ); }};
+ private static final Map<String, String> SCOPED_CACHE_CLUSTERING_ORDER =
+ new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
- private static final ObjectSerializer COLUMN_VALUE_SERIALIZER = ObjectSerializer.get();
- public static final MultiTenantColumnFamily<BucketScopedRowKey<String>, String> SCOPED_CACHE
- = new MultiTenantColumnFamily<>( "SCOPED_CACHE",
- BUCKET_ROWKEY_SERIALIZER, COLUMN_NAME_SERIALIZER, COLUMN_VALUE_SERIALIZER );
/** Number of buckets to hash across */
private static final int[] NUM_BUCKETS = {20};
/** How to funnel keys for buckets */
- private static final Funnel<String> MAP_KEY_FUNNEL = new Funnel<String>() {
-
- @Override
- public void funnel( final String key, final PrimitiveSink into ) {
- into.putString(key, StringHashUtils.UTF8);
- }
- };
+ private static final Funnel<String> MAP_KEY_FUNNEL =
+ (Funnel<String>) (key, into) -> into.putString(key, StringHashUtils.UTF8);
- /**
- * Locator to get us all buckets
- */
+ /** Locator to get us all buckets */
private static final ExpandingShardLocator<String>
BUCKET_LOCATOR = new ExpandingShardLocator<>(MAP_KEY_FUNNEL, NUM_BUCKETS);
- private final Keyspace keyspace;
+ private final Session session;
+ private final CassandraConfig cassandraConfig;
private final ObjectMapper MAPPER = new ObjectMapper();
- //------------------------------------------------------------------------------------------
+
@Inject
- public ScopedCacheSerializationImpl( final Keyspace keyspace ) {
- this.keyspace = keyspace;
- //MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ public ScopedCacheSerializationImpl( final Session session,
+ final CassandraConfig cassandraConfig ) {
+ this.session = session;
+ this.cassandraConfig = cassandraConfig;
+
MAPPER.enableDefaultTyping();
MAPPER.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE);
MAPPER.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
@@ -115,77 +116,76 @@ public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerializati
@Override
public V readValue(CacheScope scope, K key, TypeReference typeRef ) {
+ return readValueCQL( scope, key, typeRef);
+
+ }
+
+
+ private V readValueCQL(CacheScope scope, K key, TypeReference typeRef){
+
Preconditions.checkNotNull(scope, "scope is required");
Preconditions.checkNotNull(key, "key is required");
- // determine bucketed row-key based application UUID
- String rowKeyString = scope.getApplication().getUuid().toString();
+ final String rowKeyString = scope.getApplication().getUuid().toString();
final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString);
- final BucketScopedRowKey<String> keyRowKey =
- BucketScopedRowKey.fromKey(scope.getApplication(), rowKeyString, bucket);
// determine column name based on K key to string
- String columnName = key.toString();
+ final String columnName = key.toString();
- try {
- try {
- Column<String> result = keyspace.prepareQuery(SCOPED_CACHE)
- .getKey(keyRowKey).getColumn( columnName ).execute().getResult();
-
- result.getByteBufferValue();
- //V value = MAPPER.readValue(result.getByteArrayValue(), new TypeReference<V>() {});
- V value = MAPPER.readValue(result.getByteArrayValue(), typeRef);
-
- logger.debug("Read cache item from scope {}\n key/value types {}/{}\n key:value: {}:{}",
- scope.getApplication().getUuid(),
- key.getClass().getSimpleName(),
- value.getClass().getSimpleName(),
- key,
- value);
-
- return value;
-
- } catch (NotFoundException nfe) {
- if(logger.isDebugEnabled()) {
- logger.debug("Value not found");
- }
-
- } catch (IOException ioe) {
- logger.error("Unable to read cached value", ioe);
- throw new RuntimeException("Unable to read cached value", ioe);
+ final Clause inKey = QueryBuilder.eq("key", getPartitionKey(scope, rowKeyString, bucket) );
+ final Clause inColumn = QueryBuilder.eq("column1", DataType.text().serialize(columnName, ProtocolVersion.NEWEST_SUPPORTED) );
+
+ final Statement statement = QueryBuilder.select().all().from(SCOPED_CACHE_TABLE)
+ .where(inKey)
+ .and(inColumn)
+ .setConsistencyLevel(cassandraConfig.getDataStaxReadCl());
+
+ final ResultSet resultSet = session.execute(statement);
+ final com.datastax.driver.core.Row row = resultSet.one();
+
+ if (row == null){
+
+ if(logger.isDebugEnabled()){
+ logger.debug("Cache value not found for key {}", key );
}
- } catch (ConnectionException e) {
- throw new RuntimeException("Unable to connect to cassandra", e);
+ return null;
}
- if(logger.isDebugEnabled()){
- logger.debug("Cache value not found for key {}", key );
+ try {
+
+ return MAPPER.readValue(row.getBytes("value").array(), typeRef);
+
+ } catch (IOException ioe) {
+ logger.error("Unable to read cached value", ioe);
+ throw new RuntimeException("Unable to read cached value", ioe);
}
- return null;
+
}
@Override
public V writeValue(CacheScope scope, K key, V value, Integer ttl) {
+ return writeValueCQL( scope, key, value, ttl);
+
+ }
+
+ private V writeValueCQL(CacheScope scope, K key, V value, Integer ttl) {
+
Preconditions.checkNotNull( scope, "scope is required");
Preconditions.checkNotNull( key, "key is required" );
Preconditions.checkNotNull( value, "value is required");
Preconditions.checkNotNull( ttl, "ttl is required");
- // determine bucketed row-key based application UUID
- String rowKeyString = scope.getApplication().getUuid().toString();
+ final String rowKeyString = scope.getApplication().getUuid().toString();
final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString);
- final BucketScopedRowKey<String> keyRowKey =
- BucketScopedRowKey.fromKey(scope.getApplication(), rowKeyString, bucket);
-
// determine column name based on K key to string
- String columnName = key.toString();
+ final String columnName = key.toString();
// serialize cache item
byte[] cacheBytes;
@@ -195,127 +195,156 @@ public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerializati
throw new RuntimeException("Unable to serialize cache value", jpe);
}
- // serialize to the entry
- final MutationBatch batch = keyspace.prepareMutationBatch();
- batch.withRow(SCOPED_CACHE, keyRowKey).putColumn(columnName, cacheBytes, ttl);
+ final Using timeToLive = QueryBuilder.ttl(ttl);
+
+
+ // convert to ByteBuffer for the blob DataType in Cassandra
+ final ByteBuffer bb = ByteBuffer.allocate(cacheBytes.length);
+ bb.put(cacheBytes);
+ bb.flip();
+
+ final Statement cacheEntry = QueryBuilder.insertInto(SCOPED_CACHE_TABLE)
+ .using(timeToLive)
+ .value("key", getPartitionKey(scope, rowKeyString, bucket))
+ .value("column1", DataType.text().serialize(columnName, ProtocolVersion.NEWEST_SUPPORTED))
+ .value("value", bb);
+
- executeBatch(batch);
+ session.execute(cacheEntry);
logger.debug("Wrote cache item to scope {}\n key/value types {}/{}\n key:value: {}:{}",
- scope.getApplication().getUuid(),
- key.getClass().getSimpleName(),
- value.getClass().getSimpleName(),
- key,
- value);
+ scope.getApplication().getUuid(),
+ key.getClass().getSimpleName(),
+ value.getClass().getSimpleName(),
+ key,
+ value);
return value;
+
}
+
@Override
public void removeValue(CacheScope scope, K key) {
+ removeValueCQL(scope, key);
+
+ }
+
+
+ private void removeValueCQL(CacheScope scope, K key) {
+
Preconditions.checkNotNull( scope, "scope is required");
Preconditions.checkNotNull( key, "key is required" );
// determine bucketed row-key based application UUID
- String rowKeyString = scope.getApplication().getUuid().toString();
+ final String rowKeyString = scope.getApplication().getUuid().toString();
final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString);
- final BucketScopedRowKey<String> keyRowKey =
- BucketScopedRowKey.fromKey(scope.getApplication(), rowKeyString, bucket);
-
// determine column name based on K key to string
- String columnName = key.toString();
+ final String columnName = key.toString();
- final MutationBatch batch = keyspace.prepareMutationBatch();
- batch.withRow(SCOPED_CACHE, keyRowKey).deleteColumn(columnName);
- executeBatch(batch);
- }
+ final Clause inKey = QueryBuilder.eq("key", getPartitionKey(scope, rowKeyString, bucket) );
+ final Clause inColumn = QueryBuilder.eq("column1", DataType.text().serialize(columnName, ProtocolVersion.NEWEST_SUPPORTED) );
+
+ final Statement statement = QueryBuilder.delete().from(SCOPED_CACHE_TABLE)
+ .where(inKey)
+ .and(inColumn);
+ session.execute(statement);
+ }
+
+
@Override
public void invalidate(CacheScope scope) {
+ invalidateCQL(scope);
+ logger.debug("Invalidated scope {}", scope.getApplication().getUuid());
+
+ }
+
+ private void invalidateCQL(CacheScope scope){
+
Preconditions.checkNotNull(scope, "scope is required");
// determine bucketed row-key based application UUID
- String rowKeyString = scope.getApplication().getUuid().toString();
+ final String rowKeyString = scope.getApplication().getUuid().toString();
final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString);
- final BucketScopedRowKey<String> keyRowKey =
- BucketScopedRowKey.fromKey(scope.getApplication(), rowKeyString, bucket);
- final MutationBatch batch = keyspace.prepareMutationBatch();
+ final Clause inKey = QueryBuilder.eq("key", getPartitionKey(scope, rowKeyString, bucket) );
- batch.withRow(SCOPED_CACHE, keyRowKey).delete();
+ final Statement statement = QueryBuilder.delete().from(SCOPED_CACHE_TABLE)
+ .where(inKey);
- final OperationResult<Void> result = executeBatch(batch);
+ session.execute(statement);
- logger.debug("Invalidated scope {}", scope.getApplication().getUuid());
}
+ @Override
+ public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
- private class MutationBatchExec implements Callable<Void> {
- private final MutationBatch myBatch;
- private MutationBatchExec(MutationBatch batch) {
- myBatch = batch;
- }
- @Override
- public Void call() throws Exception {
- myBatch.execute();
- return null;
- }
+ return Collections.emptyList();
}
+ @Override
+ public Collection<TableDefinition> getTables() {
- private OperationResult<Void> executeBatch(MutationBatch batch) {
- try {
- return batch.execute();
+ final TableDefinition scopedCache =
+ new TableDefinition( SCOPED_CACHE_TABLE, SCOPED_CACHE_PARTITION_KEYS, SCOPED_CACHE_COLUMN_KEYS,
+ SCOPED_CACHE_COLUMNS, TableDefinition.CacheOption.KEYS, SCOPED_CACHE_CLUSTERING_ORDER);
- } catch (ConnectionException e) {
- throw new RuntimeException("Unable to connect to cassandra", e);
- }
+ return Collections.singletonList(scopedCache);
}
- //------------------------------------------------------------------------------------------
- @Override
- public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
- final MultiTenantColumnFamilyDefinition scopedCache =
- new MultiTenantColumnFamilyDefinition( SCOPED_CACHE,
- BytesType.class.getSimpleName(),
- BytesType.class.getSimpleName(),
- BytesType.class.getSimpleName(),
- MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
+ private ByteBuffer getPartitionKey(CacheScope scope, String key, int bucketNumber){
+
+ return serializeKeys(scope.getApplication().getUuid(),
+ scope.getApplication().getType(), bucketNumber, key);
- return Collections.singletonList(scopedCache);
}
- @Override
- public Collection<TableDefinition> getTables() {
+ private static ByteBuffer serializeKeys(UUID ownerUUID, String ownerType, int bucketNumber, String rowKeyString ){
- return Collections.emptyList();
- }
+ List<Object> keys = new ArrayList<>(4);
+ keys.add(0, ownerUUID);
+ keys.add(1, ownerType);
+ keys.add(2, bucketNumber);
+ keys.add(3, rowKeyString);
+ // UUIDs are 16 bytes, allocate the buffer accordingly
+ int size = 16+ownerType.length()+rowKeyString.length();
- /**
- * Inner class to serialize cache key
- */
- private static class CacheRowKeySerializer implements CompositeFieldSerializer<String> {
+ // ints are 4 bytes, add for the bucket
+ size += 4;
+
+
+ // we always need to add length for the 2 byte short and 1 byte equality
+ size += keys.size()*3;
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+ for (Object key : keys) {
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ stuff.putShort((short) kb.remaining());
+ stuff.put(kb.slice());
+ stuff.put((byte) 0);
- @Override
- public void toComposite( final CompositeBuilder builder, final String key ) {
- builder.addString(key);
- }
- @Override
- public String fromComposite( final CompositeParser composite ) {
- final String key = composite.readString();
- return key;
}
+ stuff.flip();
+ return stuff.duplicate();
+
}
}