You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/08/17 21:48:12 UTC

[05/38] usergrid git commit: Convert map manager serialization to read and write using CQL using the original Astyanax data modeling in Cassandra (aka no migration required).

Convert map manager serialization to read and write using CQL using the original Astyanax data modeling in Cassandra (aka no migration required).


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2203f433
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2203f433
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2203f433

Branch: refs/heads/master
Commit: 2203f43336ae86ebcad823fb740207b4eecc36bb
Parents: ad4a337
Author: Michael Russo <mi...@gmail.com>
Authored: Thu Feb 11 14:49:44 2016 -0800
Committer: Michael Russo <mi...@gmail.com>
Committed: Thu Feb 11 14:49:44 2016 -0800

----------------------------------------------------------------------
 .../core/astyanax/CassandraConfig.java          |  19 +
 .../core/astyanax/CassandraConfigImpl.java      |  27 ++
 .../persistence/core/datastax/CQLUtils.java     |  77 +++-
 .../core/datastax/TableDefinition.java          |  22 +-
 .../core/astyanax/ColumnNameIteratorTest.java   |  16 +
 .../MultiKeyColumnNameIteratorTest.java         |  16 +
 .../astyanax/MultiRowColumnIteratorTest.java    |  16 +
 .../persistence/core/datastax/CQLUtilsTest.java |  15 +-
 .../core/datastax/TableDefinitionTest.java      |   8 +-
 .../persistence/map/impl/MapManagerImpl.java    |   1 -
 .../map/impl/MapSerializationImpl.java          | 391 ++++++++++---------
 .../persistence/map/MapManagerTest.java         |  16 +
 12 files changed, 407 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
index c506d2d..8cb96ac 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
@@ -48,6 +48,25 @@ public interface CassandraConfig {
      */
     ConsistencyLevel getWriteCL();
 
+
+    /**
+     * Get the currently configured read CL for DataStax driver
+     * @return
+     */
+    com.datastax.driver.core.ConsistencyLevel getDataStaxReadCl();
+
+    /**
+     * Get the currently configured write CL for DataStax driver
+     * @return
+     */
+    com.datastax.driver.core.ConsistencyLevel getDataStaxWriteCl();
+
+    /**
+     * Get the currently configured consistent read CL for DataStax driver
+     * @return
+     */
+    com.datastax.driver.core.ConsistencyLevel getDataStaxReadConsistentCl();
+
     /**
      * Return the number of shards that has been set in the property file
      * @return

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
index a9b37fd..15f434c 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
@@ -41,6 +41,12 @@ public class CassandraConfigImpl implements CassandraConfig {
     private int[] shardSettings;
     private ConsistencyLevel consistentCl;
 
+    // DataStax driver's CL
+    private com.datastax.driver.core.ConsistencyLevel dataStaxReadCl;
+    private com.datastax.driver.core.ConsistencyLevel dataStaxWriteCl;
+    private com.datastax.driver.core.ConsistencyLevel dataStaxReadConsistentCl;
+
+
 
     @Inject
     public CassandraConfigImpl( final CassandraFig cassandraFig ) {
@@ -53,6 +59,12 @@ public class CassandraConfigImpl implements CassandraConfig {
 
         this.consistentCl = ConsistencyLevel.valueOf(cassandraFig.getAstyanaxConsistentReadCL());
 
+        this.dataStaxReadCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getReadCl());
+
+        this.dataStaxReadConsistentCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getReadCl());
+
+        this.dataStaxWriteCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getWriteCl() );
+
         //add the listeners to update the values
         cassandraFig.addPropertyChangeListener( new PropertyChangeListener() {
             @Override
@@ -89,6 +101,21 @@ public class CassandraConfigImpl implements CassandraConfig {
         return writeCl;
     }
 
+    @Override
+    public com.datastax.driver.core.ConsistencyLevel getDataStaxReadCl() {
+        return dataStaxReadCl;
+    }
+
+    @Override
+    public com.datastax.driver.core.ConsistencyLevel getDataStaxWriteCl() {
+        return dataStaxWriteCl;
+    }
+
+    @Override
+    public com.datastax.driver.core.ConsistencyLevel getDataStaxReadConsistentCl() {
+        return dataStaxReadConsistentCl;
+    }
+
 
     @Override
     public int[] getShardSettings() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java
index 0a7408a..7dee9c8 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java
@@ -18,14 +18,14 @@
  */
 package org.apache.usergrid.persistence.core.datastax;
 
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.usergrid.persistence.core.util.StringUtils;
 
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.StringJoiner;
+import java.nio.ByteBuffer;
+import java.util.*;
 
 public class CQLUtils {
 
@@ -45,6 +45,10 @@ public class CQLUtils {
     static String PRIMARY_KEY = "PRIMARY KEY";
     static String COMPACT_STORAGE = "COMPACT STORAGE";
     static String CLUSTERING_ORDER_BY = "CLUSTERING ORDER BY";
+    static String COMMA = ",";
+    static String PAREN_LEFT = "(";
+    static String PAREN_RIGHT = ")";
+
 
     private final static ObjectMapper mapper = new ObjectMapper();
 
@@ -80,22 +84,28 @@ public class CQLUtils {
             throw new Exception("Invalid Action specified.  Must of of type CQLUtils.Action");
         }
 
-        cql.add( "\""+tableDefinition.getTableName()+"\"" );
-
+        cql.add( quote( tableDefinition.getTableName() ) );
 
-        StringJoiner columnsString = new StringJoiner(",");
-        Map<String, String> columns = tableDefinition.getColumns();
-        columns.forEach( (key, value) -> columnsString.add(key+" "+value));
-        columnsString.add(PRIMARY_KEY +" ( "+StringUtils.join(tableDefinition.getPrimaryKeys(), ",") + " )");
 
-        StringJoiner orderingString = new StringJoiner(" ");
-        Map<String, String> ordering = tableDefinition.getClusteringOrder();
-        ordering.forEach( (key, value) -> orderingString.add(key+" "+value));
 
         if ( tableAction.equals(ACTION.CREATE) ){
-            cql.add("(").add(columnsString.toString()).add(")")
+
+            cql.add(PAREN_LEFT).add( spaceSeparatedKeyValue(tableDefinition.getColumns()) ).add(COMMA)
+                .add(PRIMARY_KEY)
+                .add(PAREN_LEFT).add(PAREN_LEFT)
+                .add( StringUtils.join(tableDefinition.getPartitionKeys(), COMMA) ).add(PAREN_RIGHT);
+
+            if ( tableDefinition.getColumnKeys() != null && !tableDefinition.getColumnKeys().isEmpty() ){
+
+                cql.add(COMMA).add( StringUtils.join(tableDefinition.getColumnKeys(), COMMA) );
+            }
+
+            cql.add(PAREN_RIGHT).add(PAREN_RIGHT)
                 .add(WITH)
-                .add(CLUSTERING_ORDER_BY).add("(").add(orderingString.toString()).add(")")
+                .add(CLUSTERING_ORDER_BY)
+                .add(PAREN_LEFT)
+                .add( spaceSeparatedKeyValue(tableDefinition.getClusteringOrder()) )
+                .add(PAREN_RIGHT)
                 .add(AND)
                 .add(COMPACT_STORAGE)
                 .add(AND);
@@ -118,5 +128,42 @@ public class CQLUtils {
 
     }
 
+    public static String quote( String value){
+
+        return "\"" + value + "\"";
+
+    }
+
+    public static String spaceSeparatedKeyValue(Map<String, String> columns){
+
+        StringJoiner columnsSchema = new StringJoiner(",");
+        columns.forEach( (key, value) -> columnsSchema.add(key+" "+value));
+
+        return columnsSchema.toString();
+
+    }
+
+
+    /**
+     * Below functions borrowed from Astyanax until the schema is re-written to be more CQL friendly
+     */
+
+    public static int getShortLength(ByteBuffer bb) {
+        int length = (bb.get() & 255) << 8;
+        return length | bb.get() & 255;
+    }
+
+    public static ByteBuffer getBytes(ByteBuffer bb, int length) {
+        ByteBuffer copy = bb.duplicate();
+        copy.limit(copy.position() + length);
+        bb.position(bb.position() + length);
+        return copy;
+    }
+
+    public static ByteBuffer getWithShortLength(ByteBuffer bb) {
+        int length = getShortLength(bb);
+        return getBytes(bb, length);
+    }
+
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java
index 801eaa7..0635b93 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java
@@ -53,7 +53,8 @@ public class TableDefinition {
 
 
     private final String tableName;
-    private final Collection<String> primaryKeys;
+    private final Collection<String> partitionKeys;
+    private final Collection<String> columnKeys;
     private final Map<String, String> columns;
     private final CacheOption cacheOption;
     private final Map<String, Object> compaction;
@@ -63,18 +64,19 @@ public class TableDefinition {
     private final String gcGraceSeconds;
     private final Map<String, String> clusteringOrder;
 
-    public TableDefinition( final String tableName, final Collection<String> primaryKeys,
-                            final Map<String, String> columns, final CacheOption cacheOption,
-                            final Map<String, String> clusteringOrder){
+    public TableDefinition( final String tableName, final Collection<String> partitionKeys,
+                            final Collection<String> columnKeys, final Map<String, String> columns,
+                            final CacheOption cacheOption, final Map<String, String> clusteringOrder){
 
         Preconditions.checkNotNull(tableName, "Table name cannot be null");
-        Preconditions.checkNotNull(primaryKeys, "Primary Key(s) cannot be null");
+        Preconditions.checkNotNull(partitionKeys, "Primary Key(s) cannot be null");
         Preconditions.checkNotNull(columns, "Columns cannot be null");
         Preconditions.checkNotNull(cacheOption, "CacheOption cannot be null");
 
 
         this.tableName = tableName;
-        this.primaryKeys = primaryKeys;
+        this.partitionKeys = partitionKeys;
+        this.columnKeys = columnKeys;
         this.columns = columns;
         this.cacheOption = cacheOption;
         this.clusteringOrder = clusteringOrder;
@@ -97,8 +99,12 @@ public class TableDefinition {
         return tableName;
     }
 
-    public Collection<String> getPrimaryKeys() {
-        return primaryKeys;
+    public Collection<String> getPartitionKeys() {
+        return partitionKeys;
+    }
+
+    public Collection<String> getColumnKeys() {
+        return columnKeys;
     }
 
     public Map<String, String> getColumns() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
index 18c9327..dccbd45 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
@@ -93,6 +93,22 @@ public class ColumnNameIteratorTest {
                 return ConsistencyLevel.CL_QUORUM;
             }
 
+            @Override
+            public com.datastax.driver.core.ConsistencyLevel getDataStaxReadCl() {
+                return com.datastax.driver.core.ConsistencyLevel.LOCAL_ONE;
+            }
+
+            @Override
+            public com.datastax.driver.core.ConsistencyLevel getDataStaxReadConsistentCl() {
+                return com.datastax.driver.core.ConsistencyLevel.ALL;
+            }
+
+
+            @Override
+            public com.datastax.driver.core.ConsistencyLevel getDataStaxWriteCl() {
+                return com.datastax.driver.core.ConsistencyLevel.QUORUM;
+            }
+
 
             @Override
             public int[] getShardSettings() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
index bd1ea55..d020949 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
@@ -98,6 +98,22 @@ public class MultiKeyColumnNameIteratorTest {
                 return ConsistencyLevel.CL_QUORUM;
             }
 
+            @Override
+            public com.datastax.driver.core.ConsistencyLevel getDataStaxReadCl() {
+                return com.datastax.driver.core.ConsistencyLevel.LOCAL_ONE;
+            }
+
+            @Override
+            public com.datastax.driver.core.ConsistencyLevel getDataStaxReadConsistentCl() {
+                return com.datastax.driver.core.ConsistencyLevel.ALL;
+            }
+
+
+            @Override
+            public com.datastax.driver.core.ConsistencyLevel getDataStaxWriteCl() {
+                return com.datastax.driver.core.ConsistencyLevel.QUORUM;
+            }
+
 
             @Override
             public int[] getShardSettings() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
index 9f5741b..8bcdcb2 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
@@ -101,6 +101,22 @@ public class MultiRowColumnIteratorTest {
                 return ConsistencyLevel.CL_QUORUM;
             }
 
+            @Override
+            public com.datastax.driver.core.ConsistencyLevel getDataStaxReadCl() {
+                return com.datastax.driver.core.ConsistencyLevel.LOCAL_ONE;
+            }
+
+            @Override
+            public com.datastax.driver.core.ConsistencyLevel getDataStaxReadConsistentCl() {
+                return com.datastax.driver.core.ConsistencyLevel.ALL;
+            }
+
+
+            @Override
+            public com.datastax.driver.core.ConsistencyLevel getDataStaxWriteCl() {
+                return com.datastax.driver.core.ConsistencyLevel.QUORUM;
+            }
+
 
             @Override
             public int[] getShardSettings() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java
index 8ddfa3f..76fcefe 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java
@@ -43,9 +43,11 @@ public class CQLUtilsTest {
         columns.put("column1", "text");
         columns.put("value", "blob");
 
-        List<String> primaryKeys = new ArrayList<>();
-        primaryKeys.add("key");
-        primaryKeys.add("column1");
+        List<String> partitionKeys = new ArrayList<>();
+        partitionKeys.add("key");
+
+        List<String> columnKeys = new ArrayList<>();
+        columnKeys.add("column1");
 
         Map<String, String> clusteringOrder = new HashMap<>();
         clusteringOrder.put("column1", "DESC");
@@ -54,7 +56,8 @@ public class CQLUtilsTest {
 
         TableDefinition table1 = new TableDefinition(
             "table1",
-            primaryKeys,
+            partitionKeys,
+            columnKeys,
             columns,
             TableDefinition.CacheOption.KEYS,
             clusteringOrder
@@ -65,8 +68,8 @@ public class CQLUtilsTest {
 
         assertTrue( createCQL.contains( CQLUtils.CREATE_TABLE ) && !createCQL.contains( CQLUtils.ALTER_TABLE ) );
         assertTrue( updateCQL.contains( CQLUtils.ALTER_TABLE ) && !updateCQL.contains( CQLUtils.CREATE_TABLE ) );
-        //logger.info("CREATE: {}", createCQL);
-        //logger.info("UPDATE: {}", updateCQL);
+        logger.info("CREATE: {}", createCQL);
+        logger.info("UPDATE: {}", updateCQL);
 
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java
index 792864b..3acce69 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java
@@ -32,7 +32,7 @@ public class TableDefinitionTest {
     public void testNullTableName(){
 
         try{
-            TableDefinition table1 = new TableDefinition(null, null, null, null, null);
+            TableDefinition table1 = new TableDefinition(null, null, null, null, null, null);
         } catch (NullPointerException npe){
             assertEquals("Table name cannot be null", npe.getMessage());
         }
@@ -44,7 +44,7 @@ public class TableDefinitionTest {
     public void testNullPrimaryKeys(){
 
         try{
-            TableDefinition table1 = new TableDefinition("table1", null, null, null, null);
+            TableDefinition table1 = new TableDefinition("table1", null, null, null, null, null);
         } catch (NullPointerException npe){
             assertEquals("Primary Key(s) cannot be null", npe.getMessage());
         }
@@ -57,7 +57,7 @@ public class TableDefinitionTest {
 
         try{
             TableDefinition table1 = new TableDefinition("table1",
-                new ArrayList<>(), null, null, null);
+                new ArrayList<>(), null, null, null, null);
         } catch (NullPointerException npe){
             assertEquals("Columns cannot be null", npe.getMessage());
         }
@@ -71,7 +71,7 @@ public class TableDefinitionTest {
         try{
             TableDefinition table1 = new TableDefinition("table1",
                 new ArrayList<>(),
-                new HashMap<>(), null, null);
+                new ArrayList<>(), new HashMap<>(), null, null);
         } catch (NullPointerException npe){
             assertEquals("CacheOption cannot be null", npe.getMessage());
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
index 501ade7..ae05057 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
@@ -51,7 +51,6 @@ public class MapManagerImpl implements MapManager {
         return mapSerialization.getString( scope, key );
     }
 
-
     @Override
     public String getStringHighConsistency( final String key ) {
         return mapSerialization.getStringHighConsistency(scope, key);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index f90f80c..5fc6ee1 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.persistence.map.impl;
 
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -28,6 +29,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Using;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 
@@ -39,6 +44,7 @@ import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
+import org.apache.usergrid.persistence.core.datastax.CQLUtils;
 import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator;
 import org.apache.usergrid.persistence.core.shard.StringHashUtils;
 import org.apache.usergrid.persistence.map.MapScope;
@@ -47,17 +53,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.hash.Funnel;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.netflix.astyanax.ColumnListMutation;
 import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
-import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.CompositeBuilder;
 import com.netflix.astyanax.model.CompositeParser;
-import com.netflix.astyanax.model.ConsistencyLevel;
-import com.netflix.astyanax.model.Row;
-import com.netflix.astyanax.model.Rows;
 import com.netflix.astyanax.serializers.BooleanSerializer;
 import com.netflix.astyanax.serializers.StringSerializer;
 
@@ -65,6 +63,9 @@ import com.netflix.astyanax.serializers.StringSerializer;
 @Singleton
 public class MapSerializationImpl implements MapSerialization {
 
+    private static final String MAP_KEYS_TABLE = CQLUtils.quote("Map_Keys");
+    private static final String MAP_ENTRIES_TABLE = CQLUtils.quote("Map_Entries");
+
     private static final MapKeySerializer KEY_SERIALIZER = new MapKeySerializer();
 
     private static final BucketScopedRowKeySerializer<String> MAP_KEY_SERIALIZER =
@@ -81,7 +82,7 @@ public class MapSerializationImpl implements MapSerialization {
     private static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
 
 
-    private static final StringResultsBuilder STRING_RESULTS_BUILDER = new StringResultsBuilder();
+    private static final StringResultsBuilderCQL STRING_RESULTS_BUILDER_CQL = new StringResultsBuilderCQL();
 
 
     /**
@@ -117,137 +118,111 @@ public class MapSerializationImpl implements MapSerialization {
     private final Keyspace keyspace;
     private final CassandraConfig cassandraConfig;
 
+    private final Session session;
+
 
     @Inject
-    public MapSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig ) {
+    public MapSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
+                                 final Session session ) {
         this.keyspace = keyspace;
+        this.session = session;
         this.cassandraConfig = cassandraConfig;
     }
 
 
     @Override
     public String getString( final MapScope scope, final String key ) {
-        Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL()  );
-        return ( col != null ) ? col.getStringValue() : null;
+
+        ByteBuffer value = getValueCQL( scope, key, cassandraConfig.getDataStaxReadCl() ) ;
+        return value != null ? (String)DataType.text().deserialize(value,ProtocolVersion.NEWEST_SUPPORTED ): null;
     }
 
 
     @Override
     public String getStringHighConsistency( final MapScope scope, final String key ) {
-        Column<Boolean> col = getValue( scope, key, cassandraConfig.getConsistentReadCL() ); // TODO: why boolean?
-        return ( col != null ) ? col.getStringValue() : null;
+
+        ByteBuffer value = getValueCQL( scope, key, cassandraConfig.getDataStaxReadConsistentCl() ) ;
+        return value != null ? (String)DataType.text().deserialize(value,ProtocolVersion.NEWEST_SUPPORTED ): null;
     }
 
 
     @Override
     public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys ) {
-        return getValues( scope, keys, STRING_RESULTS_BUILDER );
+        return getValuesCQL( scope, keys, STRING_RESULTS_BUILDER_CQL );
     }
 
 
     @Override
     public void putString( final MapScope scope, final String key, final String value ) {
-        final RowOp op = new RowOp() {
-            @Override
-            public void putValue( final ColumnListMutation<Boolean> columnListMutation ) {
-                columnListMutation.putColumn( true, value );
-            }
-
 
-            @Override
-            public void putKey( final ColumnListMutation<String> keysMutation ) {
-                keysMutation.putColumn( key, true );
-            }
-        };
-
-
-        writeString( scope, key, value, op );
+        writeStringCQL( scope, key, value, -1 );
     }
 
 
     @Override
     public void putString( final MapScope scope, final String key, final String value, final int ttl ) {
-        Preconditions.checkArgument( ttl > 0, "ttl must be > than 0" );
-
-        final RowOp op = new RowOp() {
-            @Override
-            public void putValue( final ColumnListMutation<Boolean> columnListMutation ) {
-                columnListMutation.putColumn( true, value, ttl );
-            }
 
-
-            @Override
-            public void putKey( final ColumnListMutation<String> keysMutation ) {
-                keysMutation.putColumn( key, true, ttl );
-            }
-        };
-
-
-        writeString( scope, key, value, op );
+        Preconditions.checkArgument( ttl > 0, "ttl must be > than 0" );
+        writeStringCQL( scope, key, value, ttl );
     }
 
 
     /**
      * Write our string index with the specified row op
      */
-    private void writeString( final MapScope scope, final String key, final String value, final RowOp rowOp ) {
+    private void writeStringCQL( final MapScope scope, final String key, final String value, int ttl ) {
 
         Preconditions.checkNotNull( scope, "mapscope is required" );
         Preconditions.checkNotNull( key, "key is required" );
         Preconditions.checkNotNull( value, "value is required" );
 
-        final MutationBatch batch = keyspace.prepareMutationBatch();
-
-        //add it to the entry
-        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
+        Statement mapEntry;
+        Statement mapKey;
+        if (ttl > 0){
+            Using timeToLive = QueryBuilder.ttl(ttl);
 
-        //serialize to the
-        // entry
+            mapEntry = QueryBuilder.insertInto(MAP_ENTRIES_TABLE)
+                .using(timeToLive)
+                .value("key", getMapEntryPartitionKey(scope, key))
+                .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED))
+                .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED));
 
 
-        rowOp.putValue( batch.withRow( MAP_ENTRIES, entryRowKey ) );
+            final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
+            mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE)
+                .using(timeToLive)
+                .value("key", getMapKeyPartitionKey(scope, key, bucket))
+                .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED))
+                .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED));
+        }else{
 
+            mapEntry = QueryBuilder.insertInto(MAP_ENTRIES_TABLE)
+                .value("key", getMapEntryPartitionKey(scope, key))
+                .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED))
+                .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED));
 
-        //add it to the keys
+            // get a bucket number for the map keys table
+            final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
 
-        final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
-
-        final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
-
-        //serialize to the entry
+            mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE)
+                .value("key", getMapKeyPartitionKey(scope, key, bucket))
+                .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED))
+                .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED));
 
-        rowOp.putKey( batch.withRow( MAP_KEYS, keyRowKey ) );
+        }
 
+        session.execute(mapEntry);
+        session.execute(mapKey);
 
-        executeBatch( batch );
     }
 
 
-    /**
-     * Callbacks for performing row operations
-     */
-    private static interface RowOp {
-
-        /**
-         * Callback to do the row
-         *
-         * @param columnListMutation The column mutation
-         */
-        void putValue( final ColumnListMutation<Boolean> columnListMutation );
-
-
-        /**
-         * Write the key
-         */
-        void putKey( final ColumnListMutation<String> keysMutation );
-    }
-
 
     @Override
     public UUID getUuid( final MapScope scope, final String key ) {
 
-        Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() );
-        return ( col != null ) ? col.getUUIDValue() : null;
+        ByteBuffer value = getValueCQL( scope, key, cassandraConfig.getDataStaxReadCl() );
+        return value != null ? (UUID)DataType.uuid().deserialize(value, ProtocolVersion.NEWEST_SUPPORTED ) : null;
     }
 
 
@@ -258,31 +233,34 @@ public class MapSerializationImpl implements MapSerialization {
         Preconditions.checkNotNull( key, "key is required" );
         Preconditions.checkNotNull( putUuid, "value is required" );
 
-        final MutationBatch batch = keyspace.prepareMutationBatch();
 
-        //add it to the entry
-        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
+        Statement mapEntry = QueryBuilder.insertInto(MAP_ENTRIES_TABLE)
+            .value("key", getMapEntryPartitionKey(scope, key))
+            .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED))
+            .value("value", DataType.uuid().serialize(putUuid, ProtocolVersion.NEWEST_SUPPORTED));
 
-        //serialize to the entry
-        batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, putUuid );
+        session.execute(mapEntry);
 
-        //add it to the keys
 
         final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
+        Statement mapKey;
+        mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE)
+            .value("key", getMapKeyPartitionKey(scope, key, bucket))
+            .value("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED))
+            .value("value", DataType.serializeValue(null, ProtocolVersion.NEWEST_SUPPORTED));
+
+        session.execute(mapKey);
+    }
 
-        final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
 
-        //serialize to the entry
-        batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
 
-        executeBatch( batch );
-    }
 
 
     @Override
     public Long getLong( final MapScope scope, final String key ) {
-        Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() );
-        return ( col != null ) ? col.getLongValue() : null;
+
+        ByteBuffer value = getValueCQL( scope, key, cassandraConfig.getDataStaxReadCl());
+        return value != null ? (Long)DataType.bigint().deserialize(value, ProtocolVersion.NEWEST_SUPPORTED ) : null;
     }
 
 
@@ -293,46 +271,50 @@ public class MapSerializationImpl implements MapSerialization {
         Preconditions.checkNotNull( key, "key is required" );
         Preconditions.checkNotNull( value, "value is required" );
 
-        final MutationBatch batch = keyspace.prepareMutationBatch();
+        Statement mapEntry = QueryBuilder.insertInto(MAP_ENTRIES_TABLE)
+            .value("key", getMapEntryPartitionKey(scope, key))
+            .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED))
+            .value("value", DataType.bigint().serialize(value, ProtocolVersion.NEWEST_SUPPORTED));
 
-        //add it to the entry
-        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
+        session.execute(mapEntry);
 
-        //serialize to the entry
-        batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, value );
 
-        //add it to the keys
         final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
+        Statement mapKey;
+        mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE)
+            .value("key", getMapKeyPartitionKey(scope, key, bucket))
+            .value("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED))
+            .value("value", DataType.serializeValue(null, ProtocolVersion.NEWEST_SUPPORTED));
 
-        final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
-
-        //serialize to the entry
-        batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
-
-        executeBatch( batch );
+        session.execute(mapKey);
     }
 
 
     @Override
     public void delete( final MapScope scope, final String key ) {
-        final MutationBatch batch = keyspace.prepareMutationBatch();
-        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
 
-        //serialize to the entry
-        batch.withRow( MAP_ENTRIES, entryRowKey ).delete();
-
-        //add it to the keys, we're not sure which one it may have come from
-        final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key );
+        Statement deleteMapEntry;
+        Clause equalsEntryKey = QueryBuilder.eq("key", getMapEntryPartitionKey(scope, key));
+        deleteMapEntry = QueryBuilder.delete().from(MAP_ENTRIES_TABLE)
+            .where(equalsEntryKey);
+        session.execute(deleteMapEntry);
 
 
-        final List<BucketScopedRowKey<String>> rowKeys =
-            BucketScopedRowKey.fromRange( scope.getApplication(), key, buckets );
 
-        for ( BucketScopedRowKey<String> rowKey : rowKeys ) {
-            batch.withRow( MAP_KEYS, rowKey ).deleteColumn( key );
+        // not sure which bucket the value is in, execute a delete against them all
+        final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key );
+        List<ByteBuffer> mapKeys = new ArrayList<>();
+        for( int bucket :  buckets){
+            mapKeys.add( getMapKeyPartitionKey(scope, key, bucket));
         }
 
-        executeBatch( batch );
+        Statement deleteMapKey;
+        Clause inKey = QueryBuilder.in("key", mapKeys);
+        deleteMapKey = QueryBuilder.delete().from(MAP_KEYS_TABLE)
+            .where(inKey);
+        session.execute(deleteMapKey);
+
+
     }
 
 
@@ -353,72 +335,38 @@ public class MapSerializationImpl implements MapSerialization {
     }
 
 
-    private Column<Boolean> getValue( MapScope scope, String key, final ConsistencyLevel consistencyLevel ) {
+    private ByteBuffer getValueCQL( MapScope scope, String key, final ConsistencyLevel consistencyLevel ) {
 
-        //add it to the entry
-        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
+        Clause in = QueryBuilder.in("key", getMapEntryPartitionKey(scope, key) );
+        Statement statement = QueryBuilder.select().all().from(MAP_ENTRIES_TABLE)
+            .where(in)
+            .setConsistencyLevel(consistencyLevel);
 
-        //now get all columns, including the "old row key value"
-        try {
-            final Column<Boolean> result =
-                keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( consistencyLevel ).getKey( entryRowKey ).getColumn( true ).execute().getResult();
+        ResultSet resultSet = session.execute(statement);
+        com.datastax.driver.core.Row row = resultSet.one();
 
-            return result;
-        }
-        catch ( NotFoundException nfe ) {
-            //nothing to return
-            return null;
-        }
-        catch ( ConnectionException e ) {
-            throw new RuntimeException( "Unable to connect to cassandra", e );
-        }
+        return row != null ? row.getBytes("value") : null;
     }
 
 
-    /**
-     * Get multiple values, using the string builder
-     */
-    private <T> T getValues( final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder ) {
-
 
-        final List<ScopedRowKey<MapEntryKey>> rowKeys = new ArrayList<>( keys.size() );
+    private <T> T getValuesCQL( final MapScope scope, final Collection<String> keys, final ResultsBuilderCQL<T> builder ) {
 
-        for ( final String key : keys ) {
-            //add it to the entry
-            final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
+        final List<ByteBuffer> serializedKeys = new ArrayList<>();
 
-            rowKeys.add( entryRowKey );
-        }
+        keys.forEach(key -> serializedKeys.add(getMapEntryPartitionKey(scope,key)));
 
+        Clause in = QueryBuilder.in("key", serializedKeys );
+        Statement statement = QueryBuilder.select().all().from(MAP_ENTRIES_TABLE)
+            .where(in);
 
-        //now get all columns, including the "old row key value"
-        try {
-            final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows =
-                keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( cassandraConfig.getReadCL() ).getKeySlice(
-                    rowKeys ).withColumnSlice( true ).execute()
-                        .getResult();
 
+        ResultSet resultSet = session.execute(statement);
 
-            return builder.buildResults( rows );
-        }
-        catch ( NotFoundException nfe ) {
-            //nothing to return
-            return null;
-        }
-        catch ( ConnectionException e ) {
-            throw new RuntimeException( "Unable to connect to cassandra", e );
-        }
+        return builder.buildResultsCQL( resultSet );
     }
 
 
-    private void executeBatch( MutationBatch batch ) {
-        try {
-            batch.execute();
-        }
-        catch ( ConnectionException e ) {
-            throw new RuntimeException( "Unable to connect to cassandra", e );
-        }
-    }
 
 
     /**
@@ -491,37 +439,114 @@ public class MapSerializationImpl implements MapSerialization {
     }
 
 
+
     /**
      * Build the results from the row keys
      */
-    private static interface ResultsBuilder<T> {
 
-        public T buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows );
+    private interface ResultsBuilderCQL<T> {
+
+        T buildResultsCQL( final ResultSet resultSet );
     }
 
 
-    public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>> {
+    public static class StringResultsBuilderCQL implements ResultsBuilderCQL<Map<String, String>> {
 
         @Override
-        public Map<String, String> buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows ) {
-            final int size = rows.size();
+        public Map<String, String> buildResultsCQL( final ResultSet resultSet ) {
 
-            final Map<String, String> results = new HashMap<>( size );
 
-            for ( int i = 0; i < size; i++ ) {
+            final Map<String, String> results = new HashMap<>();
 
-                final Row<ScopedRowKey<MapEntryKey>, Boolean> row = rows.getRowByIndex( i );
+            resultSet.all().forEach( row -> {
 
-                final String value = row.getColumns().getStringValue( true, null );
+                @SuppressWarnings("unchecked")
+                List<Object> keys = (List) deserializeMapEntryKey(row.getBytes("key"));
+                String value = (String)DataType.text().deserialize( row.getBytes("value"),
+                    ProtocolVersion.NEWEST_SUPPORTED );
 
-                if ( value == null ) {
-                    continue;
-                }
+                // the actual string key value is the last element
+                results.put((String)keys.get(keys.size() -1), value);
 
-                results.put( row.getKey().getKey().key, value );
-            }
+            });
 
             return results;
         }
     }
+
+    private static Object deserializeMapEntryKey(ByteBuffer bb){
+
+        List<Object> stuff = new ArrayList<>();
+        while(bb.hasRemaining()){
+            ByteBuffer data = CQLUtils.getWithShortLength(bb);
+            if(stuff.size() == 0){
+                stuff.add(DataType.uuid().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+            }else{
+                stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+            }
+            byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+        }
+
+        return stuff;
+
+    }
+
+    public static ByteBuffer serializeKeys(UUID ownerUUID, String ownerType, String mapName, String mapKey,
+                                           int bucketNumber ){
+
+        List<Object> keys = new ArrayList<>(4);
+        keys.add(0, ownerUUID);
+        keys.add(1, ownerType);
+        keys.add(2, mapName);
+        keys.add(3, mapKey);
+
+        if( bucketNumber > 0){
+            keys.add(4, bucketNumber);
+        }
+
+        // UUIDs are 16 bytes, allocate the buffer accordingly
+        int size = 16+ownerType.length()+mapName.length()+mapKey.length();
+        if(bucketNumber > 0 ){
+            // ints are 4 bytes
+            size += 4;
+        }
+
+        // we always need to add length for the 2 byte short and 1 byte equality
+        size += keys.size()*3;
+
+        ByteBuffer stuff = ByteBuffer.allocate(size);
+
+        for (Object key : keys) {
+
+            ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+            if (kb == null) {
+                kb = ByteBuffer.allocate(0);
+            }
+
+            stuff.putShort((short) kb.remaining());
+            stuff.put(kb.slice());
+            stuff.put((byte) 0);
+
+
+        }
+        stuff.flip();
+        return stuff.duplicate();
+
+    }
+
+
+    private ByteBuffer getMapEntryPartitionKey(MapScope scope, String key){
+
+        return serializeKeys(scope.getApplication().getUuid(),
+            scope.getApplication().getType(), scope.getName(), key, -1);
+
+    }
+
+    private ByteBuffer getMapKeyPartitionKey(MapScope scope, String key, int bucketNumber){
+
+        return serializeKeys(scope.getApplication().getUuid(),
+            scope.getApplication().getType(), scope.getName(), key, bucketNumber);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
index 41286ab..2a68247 100644
--- a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
+++ b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
@@ -82,6 +82,22 @@ public class MapManagerTest {
         assertEquals( value, returned );
     }
 
+    @Test
+    public void writeReadStringWithLongKey() {
+        MapManager mm = mmf.createMapManager( this.scope );
+
+        final String key = "key1234567890123456789012345678901234567890123456789012345678901234567890" +
+            "123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" +
+            "123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890";
+        final String value = "value";
+
+        mm.putString( key, value );
+
+        final String returned = mm.getString( key );
+
+        assertEquals( value, returned );
+    }
+
 
     @Test
     public void multiReadNoKey() {