You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/31 03:03:27 UTC

[3/3] git commit: working out multiget

working out multiget


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/94a92c39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/94a92c39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/94a92c39

Branch: refs/heads/datastax-driver
Commit: 94a92c394a5bfb8e04545076a13e2921c090236c
Parents: 6de7cbc
Author: Todd Nine <to...@apache.org>
Authored: Sat Aug 30 14:36:36 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Sat Aug 30 19:03:06 2014 -0600

----------------------------------------------------------------------
 .../collection/guice/MigrationManagerRule.java  |  11 +-
 .../core/astyanax/AstyanaxKeyspaceProvider.java |   1 +
 .../persistence/core/astyanax/CassandraFig.java |   6 +-
 .../persistence/core/guice/CommonModule.java    |  15 +-
 .../ArchaiusHystrixPoolConfiguration.java       |  86 ---
 .../core/javadriver/BatchStatementUtils.java    |  72 +++
 .../javadriver/DatastaxSessionProvider.java     |  68 +--
 .../javadriver/GuicyFigPoolConfiguration.java   | 106 ++++
 .../core/javadriver/RowIterator.java            |  98 ++++
 .../persistence/core/javadriver/RowParser.java  |  40 ++
 .../core/migration/DatastaxMigration.java       |  39 ++
 .../migration/DatastaxMigrationManager.java     |  36 ++
 .../migration/DatastaxMigrationManagerImpl.java | 127 +++++
 .../cass-templates/cassandra-template-ug.yaml   | 250 ++++-----
 .../persistence/core/astyanax/TestUtils.java    |   2 +-
 .../core/javadriver/HystrixSessionTest.java     |   2 +-
 .../persistence/graph/guice/GraphModule.java    |  11 +-
 .../graph/impl/GraphManagerImpl.java            |  34 +-
 .../graph/impl/stage/EdgeDeleteRepairImpl.java  |  17 +-
 .../graph/impl/stage/EdgeMetaRepairImpl.java    |  64 ++-
 .../impl/stage/NodeDeleteListenerImpl.java      |  23 +-
 .../EdgeMetadataSerialization.java              |  23 +-
 .../graph/serialization/EdgeSerialization.java  |   6 +-
 .../graph/serialization/NodeSerialization.java  |   8 +-
 .../impl/EdgeMetadataSerializationImpl.java     | 531 ++++++++++---------
 .../impl/EdgeSerializationImpl.java             |   9 +-
 .../impl/NodeSerializationImpl.java             | 221 +++++---
 .../impl/shard/ShardEntryGroup.java             |   4 +-
 .../persistence/graph/GraphManagerIT.java       |  25 +-
 .../graph/impl/EdgeDeleteListenerTest.java      |  22 +-
 .../graph/impl/NodeDeleteListenerTest.java      |  11 +-
 .../graph/impl/stage/EdgeDeleteRepairTest.java  |  10 +-
 .../graph/impl/stage/EdgeMetaRepairTest.java    |  80 +--
 .../EdgeMetadataSerializationTest.java          | 285 +++++-----
 .../EdgeSerializationChopTest.java              |   7 +-
 .../serialization/EdgeSerializationTest.java    | 179 +++----
 .../serialization/NodeSerializationTest.java    |  27 +-
 .../graph/test/util/EdgeTestUtils.java          |   2 +-
 stack/corepersistence/pom.xml                   |   5 +-
 39 files changed, 1563 insertions(+), 1000 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java
index 2d92adb..27c0b43 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java
@@ -5,6 +5,8 @@ import org.junit.rules.ExternalResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.persistence.core.migration.DatastaxMigration;
+import org.apache.usergrid.persistence.core.migration.DatastaxMigrationManager;
 import org.apache.usergrid.persistence.core.migration.MigrationException;
 import org.apache.usergrid.persistence.core.migration.MigrationManager;
 
@@ -18,12 +20,13 @@ import com.google.inject.Singleton;
 public class MigrationManagerRule extends ExternalResource {
     private static final Logger LOG = LoggerFactory.getLogger( MigrationManagerRule.class );
 
-    private MigrationManager migrationManager;
+
+    private DatastaxMigrationManager datastaxMigration;
 
 
     @Inject
-    public void setMigrationManager( MigrationManager migrationManager )  {
-        this.migrationManager = migrationManager;
+    public void setDsMigrationManager(final DatastaxMigrationManager datastaxMigration){
+        this.datastaxMigration = datastaxMigration;
     }
 
 
@@ -31,7 +34,7 @@ public class MigrationManagerRule extends ExternalResource {
     protected void before() throws MigrationException {
         LOG.info( "Starting migration" );
 
-        migrationManager.migrate();
+        datastaxMigration.migrate();
 
         LOG.info( "Migration complete" );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
index 7caeaeb..fff471d 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
@@ -59,6 +59,7 @@ public class AstyanaxKeyspaceProvider implements Provider<Keyspace> {
         AstyanaxConfiguration config = new AstyanaxConfigurationImpl()
                 .setDiscoveryType( NodeDiscoveryType.valueOf( cassandraFig.getDiscoveryType() ) )
                 .setTargetCassandraVersion( cassandraFig.getVersion() )
+                .setCqlVersion( "3.0.0" )
                 .setDefaultReadConsistencyLevel( cassandraConfig.getReadCL() )
                 .setDefaultWriteConsistencyLevel( cassandraConfig.getWriteCL() );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
index 6e18a9a..630dece 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
@@ -41,7 +41,7 @@ public interface CassandraFig extends GuicyFig {
     String getHosts();
 
     @Key( "cassandra.version" )
-    @Default( "1.2" )
+    @Default( "2.1" )
     String getVersion();
 
     @Key( "cassandra.cluster_name" )
@@ -76,6 +76,10 @@ public interface CassandraFig extends GuicyFig {
     @Default( "false" )
     boolean isEmbedded();
 
+    @Key("cassandra.concurrent.invocations")
+    @Default( "100" )
+    int getConcurrentInvocations();
+
 
     @Default("CL_QUORUM")
     @Key(READ_CL)

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index a4cc98a..849dc52 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -27,10 +27,15 @@ import org.apache.usergrid.persistence.core.astyanax.CassandraConfigImpl;
 import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
+import org.apache.usergrid.persistence.core.javadriver.DatastaxSessionProvider;
+import org.apache.usergrid.persistence.core.migration.DatastaxMigration;
+import org.apache.usergrid.persistence.core.migration.DatastaxMigrationManager;
+import org.apache.usergrid.persistence.core.migration.DatastaxMigrationManagerImpl;
 import org.apache.usergrid.persistence.core.migration.MigrationManager;
 import org.apache.usergrid.persistence.core.migration.MigrationManagerFig;
 import org.apache.usergrid.persistence.core.migration.MigrationManagerImpl;
 
+import com.datastax.driver.core.Session;
 import com.google.inject.AbstractModule;
 import com.netflix.astyanax.Keyspace;
 
@@ -52,13 +57,21 @@ public class CommonModule extends AbstractModule {
              // bind our keyspace to the AstyanaxKeyspaceProvider
         bind( Keyspace.class ).toProvider( AstyanaxKeyspaceProvider.class ).asEagerSingleton();
 
+
+        bind( Session.class).toProvider( DatastaxSessionProvider.class ).asEagerSingleton();
+
+
+        //bind our datastax migration manager
+        bind( DatastaxMigrationManager.class ).to( DatastaxMigrationManagerImpl.class ).asEagerSingleton();
+
         // bind our migration manager
-        bind( MigrationManager.class ).to( MigrationManagerImpl.class );
+//        bind( MigrationManager.class ).to( MigrationManagerImpl.class );
 
         bind( TimeService.class ).to( TimeServiceImpl.class );
 
         bind( CassandraConfig.class ).to( CassandraConfigImpl.class );
 
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/ArchaiusHystrixPoolConfiguration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/ArchaiusHystrixPoolConfiguration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/ArchaiusHystrixPoolConfiguration.java
deleted file mode 100644
index 441fabc..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/ArchaiusHystrixPoolConfiguration.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied.  See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
- *
- */
-package org.apache.usergrid.persistence.core.javadriver;
-
-
-import java.util.Collection;
-
-
-/**
- *
- */
-public class ArchaiusHystrixPoolConfiguration implements HystrixPoolConfiguration {
-
-    //TODO implement this
-
-    @Override
-    public String getServiceName() {
-        return null;
-    }
-
-
-    @Override
-    public int getPort() {
-        return 0;
-    }
-
-
-    @Override
-    public void addPortLister( final PropertyChangeListener<Integer> listener ) {
-
-    }
-
-
-    @Override
-    public Collection<String> getSeedNodes() {
-        return null;
-    }
-
-
-    @Override
-    public void addSeedNodesLister( final PropertyChangeListener<Collection<String>> listener ) {
-
-    }
-
-
-    @Override
-    public int getMaxConnectionsPerHost() {
-        return 0;
-    }
-
-
-    @Override
-    public void addMaxConnectionsLister( final PropertyChangeListener<Integer> listener ) {
-
-    }
-
-
-    @Override
-    public int getDefaultConnectionsPerIdentity() {
-        return 0;
-    }
-
-
-    @Override
-    public void addDefaultConnectionsPerIdentity( final PropertyChangeListener<Integer> listener ) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/BatchStatementUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/BatchStatementUtils.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/BatchStatementUtils.java
new file mode 100644
index 0000000..feedd6e
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/BatchStatementUtils.java
@@ -0,0 +1,72 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.core.javadriver;
+
+
+import java.util.Collection;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+
+
+/**
+ * Utility class for rolling up multiple statements as a batch
+ *
+ */
+public class BatchStatementUtils {
+
+
+    /**
+     * Roll up the statements into batches (non atomic) and execute them
+     * @param session
+     * @param statements
+     */
+    public static ResultSet runBatches( final Session session,
+                                                                 Collection<? extends Statement>... statements ){
+
+
+        final BatchStatement batch = fastStatement();
+
+        for(Collection<? extends Statement> statement: statements){
+            batch.addAll( statement );
+        }
+
+
+        return session.execute( batch );
+    }
+
+
+    /**
+     * We're running on an eventually consistent system, don't use logged statement (paxos & atomic) unless you
+     * absolutely must!
+     *
+     * @return
+     */
+    public static BatchStatement fastStatement(){
+        return new BatchStatement( BatchStatement.Type.UNLOGGED );
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/DatastaxSessionProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/DatastaxSessionProvider.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/DatastaxSessionProvider.java
index 776793d..168d781 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/DatastaxSessionProvider.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/DatastaxSessionProvider.java
@@ -21,20 +21,12 @@
 package org.apache.usergrid.persistence.core.javadriver;
 
 
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
 import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 
+import com.datastax.driver.core.Session;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
-import com.netflix.astyanax.AstyanaxConfiguration;
-import com.netflix.astyanax.AstyanaxContext;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.connectionpool.ConnectionPoolConfiguration;
-import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
-import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
-import com.netflix.astyanax.connectionpool.impl.Slf4jConnectionPoolMonitorImpl;
-import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
-import com.netflix.astyanax.thrift.ThriftFamilyFactory;
+import com.google.inject.Singleton;
 
 
 /**
@@ -43,56 +35,24 @@ import com.netflix.astyanax.thrift.ThriftFamilyFactory;
  *
  * @author tnine
  */
-public class DatastaxSessionProvider implements Provider<Keyspace> {
-    private final CassandraFig cassandraFig;
-    private final CassandraConfig cassandraConfig;
+@Singleton
+public class DatastaxSessionProvider implements Provider<Session> {
 
+    private final HystrixPool pool;
+    private final Session session;
 
-    @Inject
-    public DatastaxSessionProvider( final CassandraFig cassandraFig, final CassandraConfig cassandraConfig ) {
-        this.cassandraFig = cassandraFig;
-        this.cassandraConfig = cassandraConfig;
-    }
-
-
-    @Override
-    public Keyspace get() {
-
-        AstyanaxConfiguration config = new AstyanaxConfigurationImpl()
-                .setDiscoveryType( NodeDiscoveryType.valueOf( cassandraFig.getDiscoveryType() ) )
-                .setTargetCassandraVersion( cassandraFig.getVersion() )
-                .setDefaultReadConsistencyLevel( cassandraConfig.getReadCL() )
-                .setDefaultWriteConsistencyLevel( cassandraConfig.getWriteCL() );
-
-        ConnectionPoolConfiguration connectionPoolConfiguration =
-                new ConnectionPoolConfigurationImpl( "UsergridConnectionPool" )
-                        .setPort( cassandraFig.getThriftPort() )
-                        .setMaxConnsPerHost( cassandraFig.getConnections() )
-                        .setSeeds( cassandraFig.getHosts() )
-                        .setSocketTimeout( cassandraFig.getTimeout() );
 
-        AstyanaxContext<Keyspace> context =
-                new AstyanaxContext.Builder().forCluster( cassandraFig.getClusterName() )
-                        .forKeyspace( cassandraFig.getKeyspaceName() )
-
-                        /*
-                         * TODO tnine Filter this by adding a host supplier.  We will get token discovery from cassandra
-                         * but only connect
-                         * to nodes that have been specified.  Good for real time updates of the cass system without
-                         * adding
-                         * load to them during runtime
-                         */
-
-                        .withAstyanaxConfiguration( config )
-                        .withConnectionPoolConfiguration( connectionPoolConfiguration )
-                        .withConnectionPoolMonitor( new Slf4jConnectionPoolMonitorImpl() )
-                        .buildKeyspace( ThriftFamilyFactory.getInstance() );
-
-        context.start();
+    @Inject
+    public DatastaxSessionProvider( final CassandraFig cassandraFig ) {
 
+        pool = new HystrixPoolImpl( new GuicyFigPoolConfiguration( cassandraFig ) );
+        session = pool.getSession( "usergrid" );
 
-        return context.getClient();
     }
 
 
+    @Override
+    public Session get() {
+        return session;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/GuicyFigPoolConfiguration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/GuicyFigPoolConfiguration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/GuicyFigPoolConfiguration.java
new file mode 100644
index 0000000..1bd003f
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/GuicyFigPoolConfiguration.java
@@ -0,0 +1,106 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.core.javadriver;
+
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ *
+ */
+@Singleton
+public class GuicyFigPoolConfiguration implements HystrixPoolConfiguration {
+
+    private final CassandraFig cassandraFig;
+
+
+    @Inject
+    public GuicyFigPoolConfiguration( final CassandraFig cassandraFig ) {this.cassandraFig = cassandraFig;}
+
+
+    @Override
+    public String getServiceName() {
+        return "Usergrid";
+    }
+
+
+    @Override
+    public int getPort() {
+        return cassandraFig.getNativePort();
+    }
+
+
+    @Override
+    public void addPortLister( final PropertyChangeListener<Integer> listener ) {
+
+    }
+
+
+    @Override
+    public Collection<String> getSeedNodes() {
+
+        String hosts = cassandraFig.getHosts();
+
+        String[] splitHosts = hosts.split( "," );
+
+        return Arrays.asList( splitHosts );
+    }
+
+
+    @Override
+    public void addSeedNodesLister( final PropertyChangeListener<Collection<String>> listener ) {
+
+    }
+
+
+    @Override
+    public int getMaxConnectionsPerHost() {
+        return this.cassandraFig.getConnections();
+    }
+
+
+    @Override
+    public void addMaxConnectionsLister( final PropertyChangeListener<Integer> listener ) {
+
+    }
+
+
+    @Override
+    public int getDefaultConnectionsPerIdentity() {
+        /**
+         * This is arbitrary and SUPER high.  TODO T.N. Make this come from a
+         */
+        return this.cassandraFig.getConcurrentInvocations();
+    }
+
+
+    @Override
+    public void addDefaultConnectionsPerIdentity( final PropertyChangeListener<Integer> listener ) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/RowIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/RowIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/RowIterator.java
new file mode 100644
index 0000000..f47ffa3
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/RowIterator.java
@@ -0,0 +1,98 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.core.javadriver;
+
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.query.RowQuery;
+
+
+/**
+ * Simple iterator that wraps a Row query and will keep executing it's paging until there are no more results to read
+ * from cassandra
+ */
+public class RowIterator<T> implements Iterable<T>, Iterator<T> {
+
+
+    private final RowParser<T> parser;
+    private final Session session;
+    private final Statement statement;
+
+    private Iterator<Row> iterator;
+
+
+
+    public RowIterator( final Session session, final Statement statement, final RowParser<T> parser ) {
+        this.session = session;
+        this.statement = statement;
+        this.parser = parser;
+    }
+
+
+    @Override
+    public Iterator<T> iterator() {
+        return this;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+
+        if ( iterator == null ) {
+          startIterator();
+        }
+
+        return iterator.hasNext();
+    }
+
+
+    @Override
+    public T next() {
+
+        if ( !hasNext() ) {
+            throw new NoSuchElementException();
+        }
+
+        return parser.parseRow( iterator.next());
+    }
+
+
+    @Override
+    public void remove() {
+       throw new UnsupportedOperationException( "Remove is unsupported" );
+    }
+
+
+    /**
+     * Execute the query again and set the results
+     */
+    private void startIterator() {
+
+        iterator = session.execute( statement ).iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/RowParser.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/RowParser.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/RowParser.java
new file mode 100644
index 0000000..a620244
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/javadriver/RowParser.java
@@ -0,0 +1,40 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.core.javadriver;
+
+
+import com.datastax.driver.core.Row;
+import com.netflix.astyanax.model.Column;
+
+
+/**
+ * Column parser to be used in iterators
+ */
+public interface RowParser<T> {
+
+    /**
+     * Parse the row and return the object
+     * @param row The row to parse
+     * @return
+     */
+    public T parseRow( Row row );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigration.java
new file mode 100644
index 0000000..928278b
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigration.java
@@ -0,0 +1,39 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.core.migration;
+
+
+import java.util.Collection;
+
+
+
+public interface DatastaxMigration {
+
+    /**
+     * Get the column families required for this implementation.  If one does not exist it will be created.
+     */
+    public Collection<String> getColumnFamilies();
+
+    /**
+     * Create all prepared statements required for the implementation
+     */
+    public void prepareStatements();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigrationManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigrationManager.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigrationManager.java
new file mode 100644
index 0000000..a2ef364
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigrationManager.java
@@ -0,0 +1,36 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.core.migration;
+
+
+/**
+ * A manager that will perform any migrations necessary.  Setup code should invoke the implementation of this interface
+ *
+ * @author tnine
+ */
+public interface DatastaxMigrationManager {
+
+    /**
+     * Perform any migration necessary in the application.  Will only create keyspaces and column families if they do
+     * not exist
+     */
+    public void migrate() throws MigrationException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigrationManagerImpl.java
new file mode 100644
index 0000000..3a3e2db
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/DatastaxMigrationManagerImpl.java
@@ -0,0 +1,127 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.core.migration;
+
+
+import java.util.Collection;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * Implementation of the migration manager to set up keyspace
+ *
+ * @author tnine
+ */
+@Singleton
+public class DatastaxMigrationManagerImpl implements DatastaxMigrationManager {
+
+
+    private static final Logger logger = LoggerFactory.getLogger( DatastaxMigrationManagerImpl.class );
+
+    private final Set<DatastaxMigration> migrations;
+    private final Session session;
+
+    private final CassandraFig cassandraFig;
+
+
+    @Inject
+    public DatastaxMigrationManagerImpl( final Session session, final CassandraFig cassandraFig,
+                                         final Set<DatastaxMigration> migrations ) {
+        this.session = session;
+        this.migrations = migrations;
+        this.cassandraFig = cassandraFig;
+    }
+
+
+    @Override
+    public void migrate() throws MigrationException {
+
+
+        testAndCreateKeyspace();
+
+        for ( DatastaxMigration migration : migrations ) {
+
+            final Collection<String> columnFamilies = migration.getColumnFamilies();
+
+
+            if ( columnFamilies == null || columnFamilies.size() == 0 ) {
+                logger.warn( "Class {} implements {} but returns null column families for migration.  Either implement"
+                                + " this method or remove the interface from the class", migration.getClass(),
+                        Migration.class );
+                continue;
+            }
+
+            for ( String cf : columnFamilies ) {
+                testAndCreateColumnFamilyDef( cf );
+            }
+
+            /**
+             * Now that all the column families have been created intilaize the prepared statements
+             */
+            migration.prepareStatements();
+        }
+    }
+
+
+    /**
+     * Check if the column family exists.  If it dosn't create it
+     */
+    private void testAndCreateColumnFamilyDef( String columnFamilyExecute ) throws MigrationException {
+
+        try {
+            session.execute( columnFamilyExecute );
+        }
+        catch ( Throwable t ) {
+            final String message =
+                    String.format( "Unable to create column family.  Input string was \"%s\"  ", columnFamilyExecute );
+            logger.error( message, t );
+            throw new MigrationException( message, t );
+        }
+    }
+
+
+    /**
+     * Check if they keyspace exists.  If it doesn't create it
+     */
+    private void testAndCreateKeyspace() {
+
+
+        final String keyspaceName = cassandraFig.getKeyspaceName();
+
+
+        final String createStatement = "CREATE KEYSPACE IF NOT EXISTS " + keyspaceName
+                + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };";
+        session.execute( createStatement );
+
+        //set our keyspace
+        session.execute( "USE " + keyspaceName );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/test/cass-templates/cassandra-template-ug.yaml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/cass-templates/cassandra-template-ug.yaml b/stack/corepersistence/common/src/test/cass-templates/cassandra-template-ug.yaml
index 5e0e7d6..03e27db 100644
--- a/stack/corepersistence/common/src/test/cass-templates/cassandra-template-ug.yaml
+++ b/stack/corepersistence/common/src/test/cass-templates/cassandra-template-ug.yaml
@@ -9,7 +9,6 @@
 # one logical cluster from joining another.
 cluster_name: '$CLUSTER$'
 
-
 # This defines the number of tokens randomly assigned to this node on the ring
 # The more tokens, relative to other nodes, the larger the proportion of data
 # that this node will store. You probably want all nodes to have the same number
@@ -20,23 +19,19 @@ cluster_name: '$CLUSTER$'
 #
 # Specifying initial_token will override this setting.
 #
-# If you already have a cluster with 1 token per node, and wish to migrate to
+# If you already have a cluster with 1 token per node, and wish to migrate to 
 # multiple tokens per node, see http://wiki.apache.org/cassandra/Operations
-# num_tokens: 256
+num_tokens: 256
 
-# If you haven't specified num_tokens, or have set it to the default of 1 then
-# you should always specify InitialToken when setting up a production
-# cluster for the first time, and often when adding capacity later.
-# The principle is that each node should be given an equal slice of
-# the token ring; see http://wiki.apache.org/cassandra/Operations
-# for more details.
-#
-# If blank, Cassandra will request a token bisecting the range of
-# the heaviest-loaded existing node.  If there is no load information
-# available, such as is the case with a new cluster, it will pick
-# a random token, which will lead to hot spots.
-initial_token:
+# initial_token allows you to specify tokens manually.  While you can use # it with
+# vnodes (num_tokens > 1, above) -- in which case you should provide a 
+# comma-separated list -- it's primarily used when adding nodes # to legacy clusters 
+# that do not have vnodes enabled.
+# initial_token:
 
+# May either be "true" or "false" to enable globally, or contain a list
+# of data centers to enable per-datacenter.
+# hinted_handoff_enabled: DC1,DC2
 # See http://wiki.apache.org/cassandra/HintedHandoff
 hinted_handoff_enabled: true
 # this defines the maximum amount of time a dead host will have hints
@@ -58,11 +53,6 @@ max_hints_delivery_threads: 2
 # reduced proportionally to the number of nodes in the cluster.
 batchlog_replay_throttle_in_kb: 1024
 
-# The following setting populates the page cache on memtable flush and compaction
-# WARNING: Enable this setting only when the whole node's data fits in memory.
-# Defaults to: false
-# populate_io_cache_on_flush: false
-
 # Authentication backend, implementing IAuthenticator; used to identify users
 # Out of the box, Cassandra provides org.apache.cassandra.auth.{AllowAllAuthenticator,
 # PasswordAuthenticator}.
@@ -88,27 +78,16 @@ authorizer: AllowAllAuthorizer
 # Will be disabled automatically for AllowAllAuthorizer.
 permissions_validity_in_ms: 2000
 
-# The partitioner is responsible for distributing rows (by key) across
-# nodes in the cluster.  Any IPartitioner may be used, including your
-# own as long as it is on the classpath.  Out of the box, Cassandra
-# provides org.apache.cassandra.dht.{Murmur3Partitioner, RandomPartitioner
-# ByteOrderedPartitioner, OrderPreservingPartitioner (deprecated)}.
-#
-# - RandomPartitioner distributes rows across the cluster evenly by md5.
-#   This is the default prior to 1.2 and is retained for compatibility.
-# - Murmur3Partitioner is similar to RandomPartioner but uses Murmur3_128
-#   Hash Function instead of md5.  When in doubt, this is the best option.
-# - ByteOrderedPartitioner orders rows lexically by key bytes.  BOP allows
-#   scanning rows in key order, but the ordering can generate hot spots
-#   for sequential insertion workloads.
-# - OrderPreservingPartitioner is an obsolete form of BOP, that stores
-# - keys in a less-efficient format and only works with keys that are
-#   UTF8-encoded Strings.
-# - CollatingOPP collates according to EN,US rules rather than lexical byte
-#   ordering.  Use this as an example if you need custom collation.
-#
-# See http://wiki.apache.org/cassandra/Operations for more on
-# partitioners and token selection.
+# The partitioner is responsible for distributing groups of rows (by
+# partition key) across nodes in the cluster.  You should leave this
+# alone for new clusters.  The partitioner can NOT be changed without
+# reloading all data, so when upgrading you should set this to the
+# same partitioner you were already using.
+#
+# Besides Murmur3Partitioner, partitioners included for backwards
+# compatibility include RandomPartitioner, ByteOrderedPartitioner, and
+# OrderPreservingPartitioner.
+#
 partitioner: org.apache.cassandra.dht.Murmur3Partitioner
 
 # Directories where Cassandra should store data on disk.  Cassandra
@@ -121,6 +100,7 @@ data_file_directories:
 commitlog_directory: $DIR$/commitlog
 
 # policy for data disk failures:
+# stop_paranoid: shut down gossip and Thrift even for single-sstable errors.
 # stop: shut down gossip and Thrift, leaving the node effectively dead, but
 #       can still be inspected via JMX.
 # best_effort: stop using the failed disk and respond to requests based on
@@ -129,6 +109,14 @@ commitlog_directory: $DIR$/commitlog
 # ignore: ignore fatal errors and let requests fail, as in pre-1.2 Cassandra
 disk_failure_policy: stop
 
+# policy for commit disk failures:
+# stop: shut down gossip and Thrift, leaving the node effectively dead, but
+#       can still be inspected via JMX.
+# stop_commit: shutdown the commit log, letting writes collect but 
+#              continuing to service reads, as in pre-2.0.5 Cassandra
+# ignore: ignore fatal errors and let the batches fail
+commit_failure_policy: stop
+
 # Maximum size of the key cache in memory.
 #
 # Each key cache hit saves 1 seek and each row cache hit saves 2 seeks at the
@@ -179,28 +167,23 @@ row_cache_save_period: 0
 # Disabled by default, meaning all keys are going to be saved
 # row_cache_keys_to_save: 100
 
-# The provider for the row cache to use.
-#
-# Supported values are: ConcurrentLinkedHashCacheProvider, SerializingCacheProvider
+# The off-heap memory allocator.  Affects storage engine metadata as
+# well as caches.  Experiments show that JEMAlloc saves some memory
+# than the native GCC allocator (i.e., JEMalloc is more
+# fragmentation-resistant).
+# 
+# Supported values are: NativeAllocator, JEMallocAllocator
 #
-# SerializingCacheProvider serialises the contents of the row and stores
-# it in native memory, i.e., off the JVM Heap. Serialized rows take
-# significantly less memory than "live" rows in the JVM, so you can cache
-# more rows in a given memory footprint.  And storing the cache off-heap
-# means you can use smaller heap sizes, reducing the impact of GC pauses.
-# Note however that when a row is requested from the row cache, it must be
-# deserialized into the heap for use.
+# If you intend to use JEMallocAllocator you have to install JEMalloc as library and
+# modify cassandra-env.sh as directed in the file.
 #
-# It is also valid to specify the fully-qualified class name to a class
-# that implements org.apache.cassandra.cache.IRowCacheProvider.
-#
-# Defaults to SerializingCacheProvider
-row_cache_provider: SerializingCacheProvider
+# Defaults to NativeAllocator
+# memory_allocator: NativeAllocator
 
 # saved caches
 saved_caches_directory: $DIR$/saved_caches
 
-# commitlog_sync may be either "periodic" or "batch."
+# commitlog_sync may be either "periodic" or "batch." 
 # When in batch mode, Cassandra won't ack writes until the commit log
 # has been fsynced to disk.  It will wait up to
 # commitlog_sync_batch_window_in_ms milliseconds for other writes, before
@@ -222,7 +205,7 @@ commitlog_sync_period_in_ms: 10000
 # The size of the individual commitlog file segments.  A commitlog
 # segment may be archived, deleted, or recycled once all the data
 # in it (potentially from each columnfamily in the system) has been
-# flushed to sstables.
+# flushed to sstables.  
 #
 # The default size is 32, which is almost always fine, but if you are
 # archiving commitlog segments (see commitlog_archiving.properties),
@@ -233,7 +216,7 @@ commitlog_segment_size_in_mb: 32
 # any class that implements the SeedProvider interface and has a
 # constructor that takes a Map<String, String> of parameters will do.
 seed_provider:
-    # Addresses of hosts that are deemed contact points.
+    # Addresses of hosts that are deemed contact points. 
     # Cassandra nodes use this list of hosts to find each other and learn
     # the topology of the ring.  You must change this if you are running
     # multiple nodes!
@@ -243,31 +226,6 @@ seed_provider:
           # Ex: "<ip1>,<ip2>,<ip3>"
           - seeds: "127.0.0.1"
 
-# emergency pressure valve: each time heap usage after a full (CMS)
-# garbage collection is above this fraction of the max, Cassandra will
-# flush the largest memtables.
-#
-# Set to 1.0 to disable.  Setting this lower than
-# CMSInitiatingOccupancyFraction is not likely to be useful.
-#
-# RELYING ON THIS AS YOUR PRIMARY TUNING MECHANISM WILL WORK POORLY:
-# it is most effective under light to moderate load, or read-heavy
-# workloads; under truly massive write load, it will often be too
-# little, too late.
-flush_largest_memtables_at: 0.75
-
-# emergency pressure valve #2: the first time heap usage after a full
-# (CMS) garbage collection is above this fraction of the max,
-# Cassandra will reduce cache maximum _capacity_ to the given fraction
-# of the current _size_.  Should usually be set substantially above
-# flush_largest_memtables_at, since that will have less long-term
-# impact on the system.
-#
-# Set to 1.0 to disable.  Setting this lower than
-# CMSInitiatingOccupancyFraction is not likely to be useful.
-reduce_cache_sizes_at: 0.85
-reduce_cache_capacity_to: 0.6
-
 # For workloads with more data than can fit in memory, Cassandra's
 # bottleneck will be reads that need to fetch data from
 # disk. "concurrent_reads" should be set to (16 * number_of_drives) in
@@ -280,9 +238,13 @@ reduce_cache_capacity_to: 0.6
 concurrent_reads: 32
 concurrent_writes: 32
 
+# Total memory to use for sstable-reading buffers.  Defaults to
+# the smaller of 1/4 of heap or 512MB.
+# file_cache_size_in_mb: 512
+
 # Total memory to use for memtables.  Cassandra will flush the largest
 # memtable when this much memory is used.
-# If omitted, Cassandra will set it to 1/3 of the heap.
+# If omitted, Cassandra will set it to 1/4 of the heap.
 # memtable_total_space_in_mb: 2048
 
 # Total space to use for commitlogs.  Since commitlog segments are
@@ -325,7 +287,7 @@ ssl_storage_port: 7001
 # Address to bind to and tell other Cassandra nodes to connect to. You
 # _must_ change this if you want multiple nodes to be able to
 # communicate!
-#
+# 
 # Leaving it blank leaves it up to InetAddress.getLocalHost(). This
 # will always do the Right Thing _if_ the node is properly configured
 # (hostname, name resolution, etc), and the Right Thing is to use the
@@ -348,13 +310,15 @@ listen_address: localhost
 start_native_transport: true
 # port for the CQL native transport to listen for clients on
 native_transport_port: $NATIVE_PORT$
-# The minimum and maximum threads for handling requests when the native
-# transport is used. They are similar to rpc_min_threads and rpc_max_threads,
-# though the defaults differ slightly.
-# NOTE: native_transport_min_threads is now deprecated and ignored (but kept
-# in the 1.2.x series for compatibility sake).
-# native_transport_min_threads: 16
+# The maximum threads for handling requests when the native transport is used.
+# This is similar to rpc_max_threads though the default differs slightly (and
+# there is no native_transport_min_threads, idle threads will always be stopped
+# after 30 seconds).
 # native_transport_max_threads: 128
+#
+# The maximum size of allowed frame. Frame (requests) larger than this will
+# be rejected as invalid. The default is 256MB.
+# native_transport_max_frame_size_in_mb: 256
 
 # Whether to start the thrift rpc server.
 start_rpc: true
@@ -366,16 +330,16 @@ start_rpc: true
 # (i.e. it will be based on the configured hostname of the node).
 #
 # Note that unlike ListenAddress above, it is allowed to specify 0.0.0.0
-# here if you want to listen on all interfaces, but that will break clients
+# here if you want to listen on all interfaces, but that will break clients 
 # that rely on node auto-discovery.
 rpc_address: localhost
 # port for Thrift to listen for clients on
 rpc_port: $THRIFT_PORT$
 
-# enable or disable keepalive on rpc connections
+# enable or disable keepalive on rpc/native connections
 rpc_keepalive: true
 
-# Cassandra provides three out-of-the-box options for the RPC Server:
+# Cassandra provides two out-of-the-box options for the RPC Server:
 #
 # sync  -> One thread per thrift connection. For a very large number of clients, memory
 #          will be your limiting factor. On a 64 bit JVM, 180KB is the minimum stack size
@@ -439,15 +403,22 @@ incremental_backups: false
 snapshot_before_compaction: false
 
 # Whether or not a snapshot is taken of the data before keyspace truncation
-# or dropping of column families. The STRONGLY advised default of true
+# or dropping of column families. The STRONGLY advised default of true 
 # should be used to provide data safety. If you set this flag to false, you will
 # lose data on truncation or drop.
 auto_snapshot: true
 
-# Log a debug message if more than this many tombstones are scanned
-# in a single-partition query.  Set the threshold on SliceQueryFilter
-# to debug to enable.
-tombstone_debug_threshold: 10000
+# When executing a scan, within or across a partition, we need to keep the
+# tombstones seen in memory so we can return them to the coordinator, which
+# will use them to make sure other replicas also know about the deleted rows.
+# With workloads that generate a lot of tombstones, this can cause performance
+# problems and even exaust the server heap.
+# (http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets)
+# Adjust the thresholds here if you understand the dangers and want to
+# scan more tombstones anyway.  These thresholds may also be adjusted at runtime
+# using the StorageService mbean.
+tombstone_warn_threshold: 1000
+tombstone_failure_threshold: 100000
 
 # Add column indexes to a row after its contents reach this size.
 # Increase if your column values are large, or if you have a very large
@@ -458,6 +429,11 @@ tombstone_debug_threshold: 10000
 # that wastefully either.
 column_index_size_in_kb: 64
 
+
+# Log WARN on any batch size exceeding this value. 5kb per batch by default.
+# Caution should be taken on increasing the size of this threshold as it can lead to node instability.
+batch_size_warn_threshold_in_kb: 5
+
 # Size limit for rows being compacted in memory.  Larger rows will spill
 # over to disk and use a slower two-pass compaction process.  A message
 # will be logged specifying the row key.
@@ -478,7 +454,7 @@ in_memory_compaction_limit_in_mb: 64
 
 # Multi-threaded compaction. When enabled, each compaction will use
 # up to one thread per core, plus one thread per sstable being merged.
-# This is usually only useful for SSD-based hardware: otherwise,
+# This is usually only useful for SSD-based hardware: otherwise, 
 # your concern is usually to get compaction to do LESS i/o (see:
 # compaction_throughput_mb_per_sec), not more.
 multithreaded_compaction: false
@@ -504,11 +480,14 @@ compaction_preheat_key_cache: true
 # stream_throughput_outbound_megabits_per_sec: 200
 
 # How long the coordinator should wait for read operations to complete
-read_request_timeout_in_ms: 10000
+read_request_timeout_in_ms: 5000
 # How long the coordinator should wait for seq or index scans to complete
 range_request_timeout_in_ms: 10000
 # How long the coordinator should wait for writes to complete
-write_request_timeout_in_ms: 10000
+write_request_timeout_in_ms: 2000
+# How long a coordinator should continue to retry a CAS operation
+# that contends with other proposals for the same row
+cas_contention_timeout_in_ms: 1000
 # How long the coordinator should wait for truncates to complete
 # (This can be much longer, because unless auto_snapshot is disabled
 # we need to flush first so we can snapshot before removing the data.)
@@ -517,8 +496,10 @@ truncate_request_timeout_in_ms: 60000
 request_timeout_in_ms: 10000
 
 # Enable operation timeout information exchange between nodes to accurately
-# measure request timeouts, If disabled cassandra will assuming the request
-# was forwarded to the replica instantly by the coordinator
+# measure request timeouts.  If disabled, replicas will assume that requests
+# were forwarded to them instantly by the coordinator, which means that
+# under overload conditions we will waste that much extra time processing 
+# already-timed-out requests.
 #
 # Warning: before enabling this property make sure to ntp is installed
 # and the times are synchronized between the nodes.
@@ -551,23 +532,18 @@ cross_node_timeout: false
 #
 # Out of the box, Cassandra provides
 #  - SimpleSnitch:
-#    Treats Strategy order as proximity. This improves cache locality
-#    when disabling read repair, which can further improve throughput.
-#    Only appropriate for single-datacenter deployments.
+#    Treats Strategy order as proximity. This can improve cache
+#    locality when disabling read repair.  Only appropriate for
+#    single-datacenter deployments.
+#  - GossipingPropertyFileSnitch
+#    This should be your go-to snitch for production use.  The rack
+#    and datacenter for the local node are defined in
+#    cassandra-rackdc.properties and propagated to other nodes via
+#    gossip.  If cassandra-topology.properties exists, it is used as a
+#    fallback, allowing migration from the PropertyFileSnitch.
 #  - PropertyFileSnitch:
 #    Proximity is determined by rack and data center, which are
 #    explicitly configured in cassandra-topology.properties.
-#  - GossipingPropertyFileSnitch
-#    The rack and datacenter for the local node are defined in
-#    cassandra-rackdc.properties and propagated to other nodes via gossip.  If
-#    cassandra-topology.properties exists, it is used as a fallback, allowing
-#    migration from the PropertyFileSnitch.
-#  - RackInferringSnitch:
-#    Proximity is determined by rack and data center, which are
-#    assumed to correspond to the 3rd and 2nd octet of each node's
-#    IP address, respectively.  Unless this happens to match your
-#    deployment conventions (as it did Facebook's), this is best used
-#    as an example of writing a custom Snitch class.
 #  - Ec2Snitch:
 #    Appropriate for EC2 deployments in a single Region. Loads Region
 #    and Availability Zone information from the EC2 API. The Region is
@@ -581,6 +557,12 @@ cross_node_timeout: false
 #    ssl_storage_port on the public IP firewall.  (For intra-Region
 #    traffic, Cassandra will switch to the private IP after
 #    establishing a connection.)
+#  - RackInferringSnitch:
+#    Proximity is determined by rack and data center, which are
+#    assumed to correspond to the 3rd and 2nd octet of each node's IP
+#    address, respectively.  Unless this happens to match your
+#    deployment conventions, this is best used as an example of
+#    writing a custom Snitch class and is provided in that spirit.
 #
 # You can use a custom Snitch by setting this to the full class name
 # of the snitch, which will be assumed to be on your classpath.
@@ -588,7 +570,7 @@ endpoint_snitch: SimpleSnitch
 
 # controls how often to perform the more expensive part of host score
 # calculation
-dynamic_snitch_update_interval_in_ms: 100
+dynamic_snitch_update_interval_in_ms: 100 
 # controls how often to reset all host scores, allowing a bad host to
 # possibly recover
 dynamic_snitch_reset_interval_in_ms: 600000
@@ -618,7 +600,7 @@ request_scheduler: org.apache.cassandra.scheduler.NoScheduler
 # NoScheduler - Has no options
 # RoundRobin
 #  - throttle_limit -- The throttle_limit is the number of in-flight
-#                      requests per client.  Requests beyond
+#                      requests per client.  Requests beyond 
 #                      that limit are queued up until
 #                      running requests can complete.
 #                      The value of 80 here is twice the number of
@@ -641,22 +623,11 @@ request_scheduler: org.apache.cassandra.scheduler.NoScheduler
 # the request scheduling. Currently the only valid option is keyspace.
 # request_scheduler_id: keyspace
 
-# index_interval controls the sampling of entries from the primrary
-# row index in terms of space versus time.  The larger the interval,
-# the smaller and less effective the sampling will be.  In technicial
-# terms, the interval coresponds to the number of index entries that
-# are skipped between taking each sample.  All the sampled entries
-# must fit in memory.  Generally, a value between 128 and 512 here
-# coupled with a large key cache size on CFs results in the best trade
-# offs.  This value is not often changed, however if you have many
-# very small rows (many to an OS page), then increasing this will
-# often lower memory usage without a impact on performance.
-index_interval: 128
-
 # Enable or disable inter-node encryption
 # Default settings are TLS v1, RSA 1024-bit keys (it is imperative that
 # users generate their own keys) TLS_RSA_WITH_AES_128_CBC_SHA as the cipher
 # suite for authentication, key exchange and encryption of the actual data transfers.
+# Use the DHE/ECDHE ciphers if running in FIPS 140 compliant mode.
 # NOTE: No custom encryption options are enabled at the moment
 # The available internode options are : all, none, dc, rack
 #
@@ -677,7 +648,7 @@ server_encryption_options:
     # protocol: TLS
     # algorithm: SunX509
     # store_type: JKS
-    # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA]
+    # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_256_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA]
     # require_client_auth: false
 
 # enable or disable client/server encryption.
@@ -693,17 +664,24 @@ client_encryption_options:
     # protocol: TLS
     # algorithm: SunX509
     # store_type: JKS
-    # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA]
+    # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_256_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA]
 
 # internode_compression controls whether traffic between nodes is
 # compressed.
 # can be:  all  - all traffic is compressed
 #          dc   - traffic between different datacenters is compressed
 #          none - nothing is compressed.
+# WE have to shut this off to avoid SNAPPY errors when testing
 internode_compression: none
 
 # Enable or disable tcp_nodelay for inter-dc communication.
 # Disabling it will result in larger (but fewer) network packets being sent,
 # reducing overhead from the TCP protocol itself, at the cost of increasing
 # latency if you block for cross-datacenter responses.
-inter_dc_tcp_nodelay: true
+inter_dc_tcp_nodelay: false
+
+# Enable or disable kernel page cache preheating from contents of the key cache after compaction.
+# When enabled it would preheat only first "page" (4KB) of each row to optimize
+# for sequential access. Note: This could be harmful for fat rows, see CASSANDRA-4937
+# for further details on that topic.
+preheat_kernel_page_cache: false

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/TestUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/TestUtils.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/TestUtils.java
index 1ede643..c9b1e39 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/TestUtils.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/TestUtils.java
@@ -68,7 +68,7 @@ public class TestUtils {
 
     public static <K, C> void createColumnFamiliy(final Keyspace keyspace, final ColumnFamily<K, C> columnFamily, final Map<String, Object> options){
         try{
-            keyspace.createColumnFamily( columnFamily, new HashMap<String, Object>() );
+            keyspace.createColumnFamily( columnFamily, options );
         }catch(Exception e){
            log.error( "Error on creating column family, ignoring" , e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/javadriver/HystrixSessionTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/javadriver/HystrixSessionTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/javadriver/HystrixSessionTest.java
index 3588268..7ec37c9 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/javadriver/HystrixSessionTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/javadriver/HystrixSessionTest.java
@@ -133,7 +133,7 @@ public class HystrixSessionTest {
 
         final String keyspaceCreate =
                 "CREATE KEYSPACE " + keyspaceName + " WITH REPLICATION = { 'class' : 'SimpleStrategy', "
-                        + "'replication_factor' : 3 };";
+                        + "'replication_factor' : 1 };";
 
         final String createColumnFamily = "CREATE TABLE " + tableName + " ( " +
                 "entry_id uuid, " +

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index f0e954b..278c866 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -23,6 +23,7 @@ import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
+import org.apache.usergrid.persistence.core.migration.DatastaxMigration;
 import org.apache.usergrid.persistence.core.migration.Migration;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.GraphManager;
@@ -135,14 +136,20 @@ public class GraphModule extends AbstractModule {
 
         //do multibindings for migrations
         Multibinder<Migration> migrationBinding = Multibinder.newSetBinder( binder(), Migration.class );
-        migrationBinding.addBinding().to( Key.get( NodeSerialization.class ) );
-        migrationBinding.addBinding().to( Key.get( EdgeMetadataSerialization.class ) );
+
 
         //bind each singleton to the multi set.  Otherwise we won't migrate properly
         migrationBinding.addBinding().to( Key.get( EdgeColumnFamilies.class ) );
 
         migrationBinding.addBinding().to( Key.get( EdgeShardSerialization.class ) );
         migrationBinding.addBinding().to( Key.get( NodeShardCounterSerialization.class ) );
+
+
+        Multibinder<DatastaxMigration>
+                dsMigrationBinding = Multibinder.newSetBinder( binder(), DatastaxMigration.class );
+
+        dsMigrationBinding.addBinding().to( Key.get( NodeSerialization.class ) );
+        dsMigrationBinding.addBinding().to( Key.get( EdgeMetadataSerialization.class ) );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index ae38372..246d549 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.persistence.graph.impl;
 
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -28,7 +29,7 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
+import org.apache.usergrid.persistence.core.javadriver.BatchStatementUtils;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -50,11 +51,12 @@ import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.Observer;
@@ -87,10 +89,11 @@ public class GraphManagerImpl implements GraphManager {
 
 
     private final GraphFig graphFig;
+    private final Session session;
 
 
     @Inject
-    public GraphManagerImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
+    public GraphManagerImpl( final Session session, final EdgeMetadataSerialization edgeMetadataSerialization,
                              final EdgeSerialization storageEdgeSerialization,
                              final NodeSerialization nodeSerialization, final GraphFig graphFig,
                              @Assisted final ApplicationScope scope, final EdgeDeleteListener edgeDeleteListener,
@@ -98,12 +101,14 @@ public class GraphManagerImpl implements GraphManager {
 
 
         ValidationUtils.validateApplicationScope( scope );
+        Preconditions.checkNotNull( session, "session must not be null" );
         Preconditions.checkNotNull( edgeMetadataSerialization, "edgeMetadataSerialization must not be null" );
         Preconditions.checkNotNull( storageEdgeSerialization, "storageEdgeSerialization must not be null" );
         Preconditions.checkNotNull( nodeSerialization, "nodeSerialization must not be null" );
         Preconditions.checkNotNull( graphFig, "consistencyFig must not be null" );
         Preconditions.checkNotNull( scope, "scope must not be null" );
 
+        this.session = session;
         this.scope = scope;
         this.edgeMetadataSerialization = edgeMetadataSerialization;
         this.storageEdgeSerialization = storageEdgeSerialization;
@@ -130,13 +135,13 @@ public class GraphManagerImpl implements GraphManager {
                 final UUID timestamp = UUIDGenerator.newTimeUUID();
 
 
-                final MutationBatch mutation = edgeMetadataSerialization.writeEdge( scope, edge );
+                final Collection<? extends Statement> mutation = edgeMetadataSerialization.writeEdge( scope, edge );
 
-                final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge, timestamp );
+                final Collection<? extends Statement> edgeMutation =
+                        storageEdgeSerialization.writeEdge( scope, edge, timestamp );
 
-                mutation.mergeShallow( edgeMutation );
 
-                HystrixCassandra.user( mutation );
+                BatchStatementUtils.runBatches(session, mutation, edgeMutation );
 
                 return edge;
             }
@@ -158,12 +163,12 @@ public class GraphManagerImpl implements GraphManager {
                 final UUID timestamp = UUIDGenerator.newTimeUUID();
 
 
-                final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge, timestamp );
+                final Collection<? extends Statement> edgeMutation =
+                        storageEdgeSerialization.writeEdge( scope, edge, timestamp );
 
 
-                LOG.debug( "Marking edge {} as deleted to commit log", edge );
-                HystrixCassandra.user( edgeMutation );
 
+                BatchStatementUtils.runBatches(session, edgeMutation, edgeMutation );
 
                 //HystrixCassandra.async( edgeDeleteListener.receive( scope, markedEdge,
                 // timestamp )).subscribeOn( Schedulers.io() ).subscribe( edgeDeleteSubcriber );
@@ -188,11 +193,14 @@ public class GraphManagerImpl implements GraphManager {
 
                 final UUID eventTimestamp = UUIDGenerator.newTimeUUID();
 
-                final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, timestamp );
+                final Collection<? extends Statement> nodeMutation = nodeSerialization.mark( scope, id, timestamp );
+
+
 
 
                 LOG.debug( "Marking node {} as deleted to node mark", node );
-                HystrixCassandra.user( nodeMutation );
+
+                BatchStatementUtils.runBatches(session, nodeMutation);
 
 
                 //HystrixCassandra.async(nodeDeleteListener.receive(scope, id, eventTimestamp  )).subscribeOn(

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
index 0137ba4..323db30 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.persistence.graph.impl.stage;
 
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.UUID;
 
@@ -27,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
+import org.apache.usergrid.persistence.core.javadriver.BatchStatementUtils;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -36,6 +38,8 @@ import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.netflix.astyanax.Keyspace;
@@ -56,21 +60,21 @@ public class EdgeDeleteRepairImpl implements EdgeDeleteRepair {
 
     protected final EdgeSerialization storageSerialization;
     protected final GraphFig graphFig;
-    protected final Keyspace keyspace;
+    protected final Session session;
 
 
     @Inject
     public EdgeDeleteRepairImpl( final EdgeSerialization storageSerialization,
-                                 final GraphFig graphFig, final Keyspace keyspace ) {
+                                 final GraphFig graphFig, final Session session ) {
 
         Preconditions.checkNotNull( "storageSerialization is required", storageSerialization );
         Preconditions.checkNotNull( "consistencyFig is required", graphFig );
-        Preconditions.checkNotNull( "keyspace is required", keyspace );
+        Preconditions.checkNotNull( "session is required", session );
 
 
         this.storageSerialization = storageSerialization;
         this.graphFig = graphFig;
-        this.keyspace = keyspace;
+        this.session = session;
     }
 
 
@@ -93,8 +97,11 @@ public class EdgeDeleteRepairImpl implements EdgeDeleteRepair {
                                     //remove from the commit log
 
 
+                                    final Collection<? extends Statement>
+                                            deletes = storageSerialization.deleteEdge( scope, edge, timestamp );
+
                                     //remove from storage
-                                    HystrixCassandra.async(storageSerialization.deleteEdge( scope, edge, timestamp ));
+                                    BatchStatementUtils.runBatches(session, deletes  );
                                 }
                             }
                         } );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94a92c39/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index 7e09eca..5c6f4a1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.graph.impl.stage;
 
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
@@ -28,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
+import org.apache.usergrid.persistence.core.javadriver.BatchStatementUtils;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -41,6 +43,9 @@ import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -65,23 +70,22 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
 
     private final EdgeMetadataSerialization edgeMetadataSerialization;
     private final EdgeSerialization storageEdgeSerialization;
-    private final Keyspace keyspace;
+    private final Session session;
     private final GraphFig graphFig;
 
 
     @Inject
-    public EdgeMetaRepairImpl( final EdgeMetadataSerialization edgeMetadataSerialization, final Keyspace keyspace,
+    public EdgeMetaRepairImpl( final EdgeMetadataSerialization edgeMetadataSerialization, final Session session,
                                final GraphFig graphFig, final EdgeSerialization storageEdgeSerialization ) {
 
 
-        Preconditions.checkNotNull( "edgeMetadataSerialization is required", edgeMetadataSerialization );
-        Preconditions.checkNotNull( "storageEdgeSerialization is required", storageEdgeSerialization );
-        Preconditions.checkNotNull( "consistencyFig is required", graphFig );
-        Preconditions.checkNotNull( "cassandraConfig is required", graphFig );
-        Preconditions.checkNotNull( "keyspace is required", keyspace );
+        Preconditions.checkNotNull( edgeMetadataSerialization, "edgeMetadataSerialization is required" );
+        Preconditions.checkNotNull( storageEdgeSerialization, "storageEdgeSerialization is required" );
+        Preconditions.checkNotNull( graphFig, "graphFig is required" );
+        Preconditions.checkNotNull( session, "session is required" );
 
         this.edgeMetadataSerialization = edgeMetadataSerialization;
-        this.keyspace = keyspace;
+        this.session = session;
         this.graphFig = graphFig;
         this.storageEdgeSerialization = storageEdgeSerialization;
     }
@@ -122,8 +126,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                     public Observable<Integer> call( final List<String> types ) {
 
 
-                        final MutationBatch batch = keyspace.prepareMutationBatch();
-
+                        final BatchStatement batch = BatchStatementUtils.fastStatement();
                         final List<Observable<Integer>> checks = new ArrayList<Observable<Integer>>( types.size() );
 
                         //for each id type, check if the exist in parallel to increase processing speed
@@ -161,9 +164,11 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                                                          LOG.debug( "No edges with nodeId {}, type {}, "
                                                                          + "and subtype {}. Removing subtype.", node,
                                                                  edgeType, subType );
-                                                         batch.mergeShallow( serialization
-                                                                 .removeEdgeSubType( scope, node, edgeType, subType,
-                                                                         maxTimestamp ) );
+
+                                                         final Collection<? extends Statement> statements =  serialization.removeEdgeSubType(
+                                                                 scope, node, edgeType, subType, maxTimestamp );
+
+                                                         batch.addAll( statements );
                                                      }
                                                  } );
 
@@ -185,9 +190,9 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                                                                         "Executing batch for subtype deletion with " +
                                                                                 "type {}.  "
                                                                                 + "Mutation has {} rows to mutate ",
-                                                                        edgeType, batch.getRowCount() );
+                                                                        edgeType, batch.getStatements().size() );
 
-                                                                HystrixCassandra.async( batch );
+                                                                session.execute( batch );
                                                             }
                                                         }
 
@@ -218,7 +223,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
 
                 LOG.debug( "Type {} has no subtypes in use as of maxTimestamp {}.  Deleting type.", edgeType,
                         maxTimestamp );
-                HystrixCassandra.async( serialization.removeEdgeType( scope, node, edgeType, maxTimestamp ) );
+                BatchStatementUtils.runBatches(session, serialization.removeEdgeType( scope, node, edgeType, maxTimestamp ) );
             }
         } );
     }
@@ -245,14 +250,15 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
         /**
          * Remove the sub type specified
          */
-        MutationBatch removeEdgeSubType( final ApplicationScope scope, final Id nodeId, final String edgeType,
-                                         final String subType, final long maxTimestamp );
+        java.util.Collection<? extends com.datastax.driver.core.Statement> removeEdgeSubType(
+                final ApplicationScope scope, final Id nodeId, final String type, final String subType,
+                final long maxTimestamp );
 
         /**
          * Remove the edge type
          */
-        MutationBatch removeEdgeType( final ApplicationScope scope, final Id nodeId, final String type,
-                                      final long maxTimestamp );
+        Collection<? extends Statement> removeEdgeType( final ApplicationScope scope, final Id nodeId,
+                                                        final String type, final long maxTimestamp );
     }
 
 
@@ -292,15 +298,16 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
 
 
         @Override
-        public MutationBatch removeEdgeSubType( final ApplicationScope scope, final Id nodeId, final String type,
-                                                final String subType, final long maxTimestamp ) {
+        public Collection<? extends Statement> removeEdgeSubType(
+                final ApplicationScope scope, final Id nodeId, final String type, final String subType,
+                final long maxTimestamp ) {
             return edgeMetadataSerialization.removeIdTypeToTarget( scope, nodeId, type, subType, maxTimestamp );
         }
 
 
         @Override
-        public MutationBatch removeEdgeType( final ApplicationScope scope, final Id nodeId, final String type,
-                                             final long maxTimestamp ) {
+        public Collection<? extends Statement> removeEdgeType( final ApplicationScope scope, final Id nodeId,
+                                                               final String type, final long maxTimestamp ) {
             return edgeMetadataSerialization.removeEdgeTypeToTarget( scope, nodeId, type, maxTimestamp );
         }
     };
@@ -338,15 +345,16 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
 
 
         @Override
-        public MutationBatch removeEdgeSubType( final ApplicationScope scope, final Id nodeId, final String type,
-                                                final String subType, final long maxTimestamp ) {
+        public java.util.Collection<? extends com.datastax.driver.core.Statement> removeEdgeSubType(
+                final ApplicationScope scope, final Id nodeId, final String type, final String subType,
+                final long maxTimestamp ) {
             return edgeMetadataSerialization.removeIdTypeFromSource( scope, nodeId, type, subType, maxTimestamp );
         }
 
 
         @Override
-        public MutationBatch removeEdgeType( final ApplicationScope scope, final Id nodeId, final String type,
-                                             final long maxTimestamp ) {
+        public Collection<? extends Statement> removeEdgeType( final ApplicationScope scope, final Id nodeId,
+                                                               final String type, final long maxTimestamp ) {
             return edgeMetadataSerialization.removeEdgeTypeFromSource( scope, nodeId, type, maxTimestamp );
         }
     };