You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/08/17 21:48:29 UTC

[22/38] usergrid git commit: Re-write SCOPED_CACHE serialization to use Datastax driver and CQL.

Re-write SCOPED_CACHE serialization to use Datastax driver and CQL.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4475158f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4475158f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4475158f

Branch: refs/heads/master
Commit: 4475158f1b48aa485d6af7d90caa91948ac2f46f
Parents: 6a1fd22
Author: Michael Russo <mr...@apigee.com>
Authored: Sun May 1 16:56:56 2016 +0800
Committer: Michael Russo <mr...@apigee.com>
Committed: Sun May 1 16:56:56 2016 +0800

----------------------------------------------------------------------
 .../impl/ScopedCacheSerializationImpl.java      | 311 ++++++++++---------
 1 file changed, 170 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/4475158f/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
index 1646d36..ffa5f1f 100644
--- a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
+++ b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
@@ -16,10 +16,16 @@
  */
 package org.apache.usergrid.persistence.cache.impl;
 
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Using;
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.hash.Funnel;
@@ -39,6 +45,7 @@ import com.netflix.astyanax.serializers.StringSerializer;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.usergrid.persistence.cache.CacheScope;
 import org.apache.usergrid.persistence.core.astyanax.*;
+import org.apache.usergrid.persistence.core.datastax.CQLUtils;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator;
 import org.apache.usergrid.persistence.core.shard.StringHashUtils;
@@ -46,9 +53,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
+import java.nio.ByteBuffer;
+import java.util.*;
 import java.util.concurrent.Callable;
 
 
@@ -57,55 +63,50 @@ import java.util.concurrent.Callable;
  */
 public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerialization<K,V> {
 
-    // row-keys are application ID + consistent hash key
-    // column names are K key toString()
-    // column values are serialization of V value
-
     public static final Logger logger = LoggerFactory.getLogger(ScopedCacheSerializationImpl.class);
 
+    // row-keys are (app UUID, application type, app UUID as string, consistent hash int as bucket number)
+    // column names are K key toString()
+    // column values are serialization of V value
 
-    private static final CacheRowKeySerializer ROWKEY_SERIALIZER = new CacheRowKeySerializer();
-
-    private static final BucketScopedRowKeySerializer<String> BUCKET_ROWKEY_SERIALIZER =
-        new BucketScopedRowKeySerializer<>( ROWKEY_SERIALIZER );
-
-    private static final Serializer<String> COLUMN_NAME_SERIALIZER = StringSerializer.get();
+    private static final String SCOPED_CACHE_TABLE = CQLUtils.quote("SCOPED_CACHE");
+    private static final Collection<String> SCOPED_CACHE_PARTITION_KEYS = Collections.singletonList("key");
+    private static final Collection<String> SCOPED_CACHE_COLUMN_KEYS = Collections.singletonList("column1");
+    private static final Map<String, DataType.Name> SCOPED_CACHE_COLUMNS =
+        new HashMap<String, DataType.Name>() {{
+            put( "key", DataType.Name.BLOB );
+            put( "column1", DataType.Name.BLOB );
+            put( "value", DataType.Name.BLOB ); }};
+    private static final Map<String, String> SCOPED_CACHE_CLUSTERING_ORDER =
+        new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
 
-    private static final ObjectSerializer COLUMN_VALUE_SERIALIZER = ObjectSerializer.get();
 
-    public static final MultiTenantColumnFamily<BucketScopedRowKey<String>, String> SCOPED_CACHE
-        = new MultiTenantColumnFamily<>( "SCOPED_CACHE",
-            BUCKET_ROWKEY_SERIALIZER, COLUMN_NAME_SERIALIZER, COLUMN_VALUE_SERIALIZER );
 
     /** Number of buckets to hash across */
     private static final int[] NUM_BUCKETS = {20};
 
     /** How to funnel keys for buckets */
-    private static final Funnel<String> MAP_KEY_FUNNEL = new Funnel<String>() {
-
-        @Override
-        public void funnel( final String key, final PrimitiveSink into ) {
-            into.putString(key, StringHashUtils.UTF8);
-        }
-    };
+    private static final Funnel<String> MAP_KEY_FUNNEL =
+        (Funnel<String>) (key, into) -> into.putString(key, StringHashUtils.UTF8);
 
-    /**
-     * Locator to get us all buckets
-     */
+    /** Locator to get us all buckets */
     private static final ExpandingShardLocator<String>
         BUCKET_LOCATOR = new ExpandingShardLocator<>(MAP_KEY_FUNNEL, NUM_BUCKETS);
 
-    private final Keyspace keyspace;
 
+    private final Session session;
+    private final CassandraConfig cassandraConfig;
     private final ObjectMapper MAPPER = new ObjectMapper();
 
 
-    //------------------------------------------------------------------------------------------
+
 
     @Inject
-    public ScopedCacheSerializationImpl( final Keyspace keyspace ) {
-        this.keyspace = keyspace;
-        //MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    public ScopedCacheSerializationImpl( final Session session,
+                                         final CassandraConfig cassandraConfig ) {
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+
         MAPPER.enableDefaultTyping();
         MAPPER.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE);
         MAPPER.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
@@ -115,77 +116,76 @@ public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerializati
     @Override
     public V readValue(CacheScope scope, K key, TypeReference typeRef ) {
 
+        return readValueCQL( scope, key, typeRef);
+
+    }
+
+
+    private V readValueCQL(CacheScope scope, K key, TypeReference typeRef){
+
         Preconditions.checkNotNull(scope, "scope is required");
         Preconditions.checkNotNull(key, "key is required");
 
-        // determine bucketed row-key based application UUID
-        String rowKeyString = scope.getApplication().getUuid().toString();
+        final String rowKeyString = scope.getApplication().getUuid().toString();
         final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString);
-        final BucketScopedRowKey<String> keyRowKey =
-            BucketScopedRowKey.fromKey(scope.getApplication(), rowKeyString, bucket);
 
         // determine column name based on K key to string
-        String columnName = key.toString();
+        final String columnName = key.toString();
 
-        try {
-            try {
-                Column<String> result = keyspace.prepareQuery(SCOPED_CACHE)
-                    .getKey(keyRowKey).getColumn( columnName ).execute().getResult();
-
-                result.getByteBufferValue();
-                //V value = MAPPER.readValue(result.getByteArrayValue(), new TypeReference<V>() {});
-                V value = MAPPER.readValue(result.getByteArrayValue(), typeRef);
-
-                logger.debug("Read cache item from scope {}\n   key/value types {}/{}\n   key:value: {}:{}",
-                        scope.getApplication().getUuid(),
-                        key.getClass().getSimpleName(),
-                        value.getClass().getSimpleName(),
-                        key,
-                        value);
-
-                return value;
-
-            } catch (NotFoundException nfe) {
-                if(logger.isDebugEnabled()) {
-                    logger.debug("Value not found");
-                }
-
-            } catch (IOException ioe) {
-                logger.error("Unable to read cached value", ioe);
-                throw new RuntimeException("Unable to read cached value", ioe);
+        final Clause inKey = QueryBuilder.eq("key", getPartitionKey(scope, rowKeyString, bucket) );
+        final Clause inColumn = QueryBuilder.eq("column1", DataType.text().serialize(columnName, ProtocolVersion.NEWEST_SUPPORTED) );
+
+        final Statement statement = QueryBuilder.select().all().from(SCOPED_CACHE_TABLE)
+            .where(inKey)
+            .and(inColumn)
+            .setConsistencyLevel(cassandraConfig.getDataStaxReadCl());
+
+        final ResultSet resultSet = session.execute(statement);
+        final com.datastax.driver.core.Row row = resultSet.one();
+
+        if (row == null){
+
+            if(logger.isDebugEnabled()){
+                logger.debug("Cache value not found for key {}", key );
             }
 
-        } catch (ConnectionException e) {
-            throw new RuntimeException("Unable to connect to cassandra", e);
+            return null;
         }
 
-        if(logger.isDebugEnabled()){
-            logger.debug("Cache value not found for key {}", key );
 
+        try {
+
+            return MAPPER.readValue(row.getBytes("value").array(), typeRef);
+
+        } catch (IOException ioe) {
+            logger.error("Unable to read cached value", ioe);
+            throw new RuntimeException("Unable to read cached value", ioe);
         }
 
-        return null;
+
     }
 
 
     @Override
     public V writeValue(CacheScope scope, K key, V value, Integer ttl) {
 
+        return writeValueCQL( scope, key, value, ttl);
+
+    }
+
+    private V writeValueCQL(CacheScope scope, K key, V value, Integer ttl) {
+
         Preconditions.checkNotNull( scope, "scope is required");
         Preconditions.checkNotNull( key, "key is required" );
         Preconditions.checkNotNull( value, "value is required");
         Preconditions.checkNotNull( ttl, "ttl is required");
 
-        // determine bucketed row-key based application UUID
 
-        String rowKeyString = scope.getApplication().getUuid().toString();
+        final String rowKeyString = scope.getApplication().getUuid().toString();
         final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString);
 
-        final BucketScopedRowKey<String> keyRowKey =
-            BucketScopedRowKey.fromKey(scope.getApplication(), rowKeyString, bucket);
-
         // determine column name based on K key to string
-        String columnName = key.toString();
+        final String columnName = key.toString();
 
         // serialize cache item
         byte[] cacheBytes;
@@ -195,127 +195,156 @@ public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerializati
             throw new RuntimeException("Unable to serialize cache value", jpe);
         }
 
-        // serialize to the entry
-        final MutationBatch batch = keyspace.prepareMutationBatch();
-        batch.withRow(SCOPED_CACHE, keyRowKey).putColumn(columnName, cacheBytes, ttl);
+        final Using timeToLive = QueryBuilder.ttl(ttl);
+
+
+        // convert to ByteBuffer for the blob DataType in Cassandra
+        final ByteBuffer bb = ByteBuffer.allocate(cacheBytes.length);
+        bb.put(cacheBytes);
+        bb.flip();
+
+        final Statement cacheEntry = QueryBuilder.insertInto(SCOPED_CACHE_TABLE)
+            .using(timeToLive)
+            .value("key", getPartitionKey(scope, rowKeyString, bucket))
+            .value("column1", DataType.text().serialize(columnName, ProtocolVersion.NEWEST_SUPPORTED))
+            .value("value", bb);
+
 
-        executeBatch(batch);
+        session.execute(cacheEntry);
 
         logger.debug("Wrote cache item to scope {}\n   key/value types {}/{}\n   key:value: {}:{}",
-                scope.getApplication().getUuid(),
-                key.getClass().getSimpleName(),
-                value.getClass().getSimpleName(),
-                key,
-                value);
+            scope.getApplication().getUuid(),
+            key.getClass().getSimpleName(),
+            value.getClass().getSimpleName(),
+            key,
+            value);
 
         return value;
+
     }
 
 
+
     @Override
     public void removeValue(CacheScope scope, K key) {
 
+        removeValueCQL(scope, key);
+
+    }
+
+
+    private void removeValueCQL(CacheScope scope, K key) {
+
         Preconditions.checkNotNull( scope, "scope is required");
         Preconditions.checkNotNull( key, "key is required" );
 
         // determine bucketed row-key based application UUID
 
-        String rowKeyString = scope.getApplication().getUuid().toString();
+        final String rowKeyString = scope.getApplication().getUuid().toString();
         final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString);
 
-        final BucketScopedRowKey<String> keyRowKey =
-            BucketScopedRowKey.fromKey(scope.getApplication(), rowKeyString, bucket);
-
         // determine column name based on K key to string
-        String columnName = key.toString();
+        final String columnName = key.toString();
 
-        final MutationBatch batch = keyspace.prepareMutationBatch();
-        batch.withRow(SCOPED_CACHE, keyRowKey).deleteColumn(columnName);
 
-        executeBatch(batch);
-    }
+        final Clause inKey = QueryBuilder.eq("key", getPartitionKey(scope, rowKeyString, bucket) );
+        final Clause inColumn = QueryBuilder.eq("column1", DataType.text().serialize(columnName, ProtocolVersion.NEWEST_SUPPORTED) );
+
+        final Statement statement = QueryBuilder.delete().from(SCOPED_CACHE_TABLE)
+            .where(inKey)
+            .and(inColumn);
 
+        session.execute(statement);
 
+    }
+
+    
     @Override
     public void invalidate(CacheScope scope) {
 
+        invalidateCQL(scope);
+        logger.debug("Invalidated scope {}", scope.getApplication().getUuid());
+
+    }
+
+    private void invalidateCQL(CacheScope scope){
+
         Preconditions.checkNotNull(scope, "scope is required");
 
         // determine bucketed row-key based application UUID
-        String rowKeyString = scope.getApplication().getUuid().toString();
+        final String rowKeyString = scope.getApplication().getUuid().toString();
         final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString);
-        final BucketScopedRowKey<String> keyRowKey =
-            BucketScopedRowKey.fromKey(scope.getApplication(), rowKeyString, bucket);
 
-        final MutationBatch batch = keyspace.prepareMutationBatch();
+        final Clause inKey = QueryBuilder.eq("key", getPartitionKey(scope, rowKeyString, bucket) );
 
-        batch.withRow(SCOPED_CACHE, keyRowKey).delete();
+        final Statement statement = QueryBuilder.delete().from(SCOPED_CACHE_TABLE)
+            .where(inKey);
 
-        final OperationResult<Void> result = executeBatch(batch);
+        session.execute(statement);
 
-        logger.debug("Invalidated scope {}", scope.getApplication().getUuid());
     }
 
+    @Override
+    public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
 
-    private class MutationBatchExec implements Callable<Void> {
-        private final MutationBatch myBatch;
-        private MutationBatchExec(MutationBatch batch) {
-            myBatch = batch;
-        }
-        @Override
-        public Void call() throws Exception {
-            myBatch.execute();
-            return null;
-        }
+        return Collections.emptyList();
     }
 
+    @Override
+    public Collection<TableDefinition> getTables() {
 
-    private OperationResult<Void> executeBatch(MutationBatch batch) {
-        try {
-            return batch.execute();
+        final TableDefinition scopedCache =
+            new TableDefinition( SCOPED_CACHE_TABLE, SCOPED_CACHE_PARTITION_KEYS, SCOPED_CACHE_COLUMN_KEYS,
+                SCOPED_CACHE_COLUMNS, TableDefinition.CacheOption.KEYS, SCOPED_CACHE_CLUSTERING_ORDER);
 
-        } catch (ConnectionException e) {
-            throw new RuntimeException("Unable to connect to cassandra", e);
-        }
+        return Collections.singletonList(scopedCache);
     }
 
 
-    //------------------------------------------------------------------------------------------
 
-    @Override
-    public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
-        final MultiTenantColumnFamilyDefinition scopedCache =
-            new MultiTenantColumnFamilyDefinition( SCOPED_CACHE,
-                BytesType.class.getSimpleName(),
-                BytesType.class.getSimpleName(),
-                BytesType.class.getSimpleName(),
-                MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
+    private ByteBuffer getPartitionKey(CacheScope scope, String key, int bucketNumber){
+
+        return serializeKeys(scope.getApplication().getUuid(),
+            scope.getApplication().getType(), bucketNumber, key);
 
-        return Collections.singletonList(scopedCache);
     }
 
-    @Override
-    public Collection<TableDefinition> getTables() {
+    private static ByteBuffer serializeKeys(UUID ownerUUID, String ownerType, int bucketNumber, String rowKeyString ){
 
-        return Collections.emptyList();
-    }
+        List<Object> keys = new ArrayList<>(4);
+        keys.add(0, ownerUUID);
+        keys.add(1, ownerType);
+        keys.add(2, bucketNumber);
+        keys.add(3, rowKeyString);
 
+        // UUIDs are 16 bytes, allocate the buffer accordingly
+        int size = 16+ownerType.length()+rowKeyString.length();
 
-    /**
-     * Inner class to serialize cache key
-     */
-    private static class CacheRowKeySerializer implements CompositeFieldSerializer<String> {
+        // ints are 4 bytes, add for the bucket
+        size += 4;
+
+
+        // we always need to add length for the 2 byte short and 1 byte equality
+        size += keys.size()*3;
+
+        ByteBuffer stuff = ByteBuffer.allocate(size);
+
+        for (Object key : keys) {
+
+            ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+            if (kb == null) {
+                kb = ByteBuffer.allocate(0);
+            }
+
+            stuff.putShort((short) kb.remaining());
+            stuff.put(kb.slice());
+            stuff.put((byte) 0);
 
-        @Override
-        public void toComposite( final CompositeBuilder builder, final String key ) {
-            builder.addString(key);
-        }
 
-        @Override
-        public String fromComposite( final CompositeParser composite ) {
-            final String key = composite.readString();
-            return key;
         }
+        stuff.flip();
+        return stuff.duplicate();
+
     }
 
 }