You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/18 01:24:01 UTC

[1/2] incubator-usergrid git commit: Revert "fix for list types changing from string to int or whatever"

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-405-merge 762e43e1e -> 5e6ed4a12


Revert "fix for list types changing from string to int or whatever"

This reverts commit e5cab76f8b8fc1e4a60ea23274e7a8a0cc79f759.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b09dc435
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b09dc435
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b09dc435

Branch: refs/heads/USERGRID-405-merge
Commit: b09dc43526cd27dcf75fa09dc5c40a288ca30225
Parents: 762e43e
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 17 17:30:27 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 17 17:30:27 2015 -0600

----------------------------------------------------------------------
 .../index/impl/EsEntityIndexBatchImpl.java      |  14 ++-
 .../persistence/index/impl/IndexingUtils.java   | 109 +++++++++----------
 .../persistence/index/impl/EntityIndexTest.java |  38 +------
 3 files changed, 70 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b09dc435/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 4c32cc1..92312d2 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -56,7 +56,15 @@ import com.codahale.metrics.Timer;
 import rx.Observable;
 import rx.functions.Func1;
 
-import static org.apache.usergrid.persistence.index.impl.IndexingUtils.*;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ANALYZED_STRING_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.BOOLEAN_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ENTITYID_ID_FIELDNAME;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ENTITY_CONTEXT_FIELDNAME;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.GEO_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.NUMBER_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.STRING_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createContextName;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexDocId;
 
 
 public class EsEntityIndexBatchImpl implements EntityIndexBatch {
@@ -235,7 +243,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
             if ( f instanceof ListField ) {
                 List list = ( List ) field.getValue();
-                entityMap.put(LIST_PREFIX + field.getName().toLowerCase(),
+                entityMap.put( field.getName().toLowerCase(),
                         new ArrayList( processCollectionForMap( list ) ) );
 
                 if ( !list.isEmpty() ) {
@@ -247,7 +255,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
             }
             else if ( f instanceof ArrayField ) {
                 List list = ( List ) field.getValue();
-                entityMap.put(ARRAY_PREFIX + field.getName().toLowerCase(),
+                entityMap.put( field.getName().toLowerCase(),
                         new ArrayList( processCollectionForMap( list ) ) );
             }
             else if ( f instanceof SetField ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b09dc435/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
index 41a4680..ffd98e9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
@@ -34,9 +34,6 @@ public class IndexingUtils {
 
     public static final String STRING_PREFIX = "su_";
     public static final String ANALYZED_STRING_PREFIX = "sa_";
-    public static final String ARRAY_PREFIX = "ar_";
-    public static final String LIST_PREFIX = "li_";
-
     public static final String GEO_PREFIX = "go_";
     public static final String NUMBER_PREFIX = "nu_";
     public static final String BOOLEAN_PREFIX = "bu_";
@@ -156,75 +153,75 @@ public class IndexingUtils {
 
                     /**  add routing  "_routing":{ "required":false,  "path":"ug_entityId" **/
                      .startObject("_routing").field("required",true).field("path",ENTITYID_ID_FIELDNAME).endObject()
-            .startArray("dynamic_templates")
+                     .startArray("dynamic_templates")
                         // we need most specific mappings first since it's a stop on match algorithm
 
-            .startObject()
+                        .startObject()
 
-            .startObject("entity_id_template" )
-            .field("match", IndexingUtils.ENTITYID_ID_FIELDNAME )
-            .field("match_mapping_type", "string" )
-            .startObject( "mapping").field("type", "string" )
-            .field("index", "not_analyzed" )
-            .endObject()
-            .endObject()
-            .endObject()
+                            .startObject( "entity_id_template" )
+                                .field( "match", IndexingUtils.ENTITYID_ID_FIELDNAME )
+                                    .field( "match_mapping_type", "string" )
+                                            .startObject( "mapping" ).field( "type", "string" )
+                                                .field( "index", "not_analyzed" )
+                                            .endObject()
+                                    .endObject()
+                                .endObject()
 
-            .startObject()
-            .startObject("entity_context_template" )
+                            .startObject()
+                            .startObject( "entity_context_template" )
                                 .field( "match", IndexingUtils.ENTITY_CONTEXT_FIELDNAME )
-            .field("match_mapping_type", "string" )
-            .startObject( "mapping" )
+                                .field( "match_mapping_type", "string" )
+                                    .startObject( "mapping" )
                                         .field( "type", "string" )
-            .field("index", "not_analyzed").endObject()
-            .endObject()
-            .endObject()
-
-            .startObject()
-            .startObject("entity_version_template" )
-            .field("match", IndexingUtils.ENTITY_VERSION_FIELDNAME )
-            .field("match_mapping_type", "string" )
-            .startObject( "mapping").field("type", "long" )
-            .endObject()
-            .endObject()
-            .endObject()
+                                        .field( "index", "not_analyzed" ).endObject()
+                                    .endObject()
+                            .endObject()
+
+                            .startObject()
+                            .startObject( "entity_version_template" )
+                                .field( "match", IndexingUtils.ENTITY_VERSION_FIELDNAME )
+                                        .field( "match_mapping_type", "string" )
+                                            .startObject( "mapping" ).field( "type", "long" )
+                                            .endObject()
+                                        .endObject()
+                                    .endObject()
 
                             // any string with field name that starts with sa_ gets analyzed
-            .startObject()
-            .startObject("template_1" )
+                            .startObject()
+                                .startObject( "template_1" )
                                     .field( "match", ANALYZED_STRING_PREFIX + "*" )
-            .field("match_mapping_type", "string" ).startObject( "mapping" )
+                                    .field( "match_mapping_type", "string" ).startObject( "mapping" )
                                     .field( "type", "string" )
-            .field("index", "analyzed" )
-            .endObject()
-            .endObject()
+                                    .field( "index", "analyzed" )
+                                .endObject()
+                            .endObject()
 
-            .endObject()
+                        .endObject()
 
                         // all other strings are not analyzed
-            .startObject()
-            .startObject("template_2" )
+                        .startObject()
+                            .startObject( "template_2" )
                                 //todo, should be string prefix, remove 2 field mapping
-            .field("match", "*" )
-            .field("match_mapping_type", "string" )
-            .startObject( "mapping" )
-            .field("type", "string" )
-            .field("index", "not_analyzed" )
-            .endObject()
-            .endObject()
-            .endObject()
+                                .field( "match", "*" )
+                                .field( "match_mapping_type", "string" )
+                                .startObject( "mapping" )
+                                    .field( "type", "string" )
+                                        .field( "index", "not_analyzed" )
+                                .endObject()
+                            .endObject()
+                        .endObject()
 
                         // fields names starting with go_ get geo-indexed
-            .startObject()
-            .startObject("template_3" )
-            .field("match", GEO_PREFIX + "location" )
-            .startObject( "mapping" )
-            .field("type", "geo_point" )
-            .endObject()
-            .endObject()
-            .endObject()
-
-            .endArray()
+                        .startObject()
+                            .startObject( "template_3" )
+                                .field( "match", GEO_PREFIX + "location" )
+                                    .startObject( "mapping" )
+            .field( "type", "geo_point" )
+                                    .endObject()
+                            .endObject()
+                        .endObject()
+
+                    .endArray()
 
             .endObject();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b09dc435/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 9cb5297..ca9bf79 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -26,7 +26,9 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.index.*;
-import org.apache.usergrid.persistence.model.field.*;
+import org.apache.usergrid.persistence.model.field.ArrayField;
+import org.apache.usergrid.persistence.model.field.EntityObjectField;
+import org.apache.usergrid.persistence.model.field.UUIDField;
 import org.apache.usergrid.persistence.model.field.value.EntityObject;
 import org.junit.Ignore;
 import org.junit.Rule;
@@ -49,6 +51,7 @@ import org.apache.usergrid.persistence.index.utils.UUIDUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.StringField;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -96,7 +99,7 @@ public class EntityIndexTest extends BaseIT {
     }
 
     @Test
-//    @Ignore("this is a problem i will work on when i can breathe")
+    @Ignore("this is a problem i will work on when i can breathe")
     public void testIndexVariations() throws IOException {
         Id appId = new SimpleId( "application" );
 
@@ -116,8 +119,6 @@ public class EntityIndexTest extends BaseIT {
         batch.index(indexScope, entity);
         batch.execute().get();
 
-        entity = new Entity( entityType );
-        entity.setField(new UUIDField(IndexingUtils.ENTITYID_ID_FIELDNAME, UUID.randomUUID()));
         EntityUtils.setVersion(entity, UUIDGenerator.newTimeUUID());
         List<String> list = new ArrayList<>();
         list.add("test");
@@ -125,9 +126,6 @@ public class EntityIndexTest extends BaseIT {
         batch.index(indexScope, entity);
         batch.execute().get();
 
-
-        entity = new Entity( entityType );
-        entity.setField(new UUIDField(IndexingUtils.ENTITYID_ID_FIELDNAME, UUID.randomUUID()));
         EntityUtils.setVersion(entity, UUIDGenerator.newTimeUUID());
         EntityObject testObj = new EntityObject();
         testObj.setField(new StringField("test","testFiedl"));
@@ -135,32 +133,9 @@ public class EntityIndexTest extends BaseIT {
         batch.index(indexScope, entity);
         batch.execute().get();
 
-        entity = new Entity( entityType );
-        entity.setField(new UUIDField(IndexingUtils.ENTITYID_ID_FIELDNAME, UUID.randomUUID()));
-        EntityUtils.setVersion(entity, UUIDGenerator.newTimeUUID());
-        List<Integer> listint = new ArrayList<>();
-        listint.add(0);
-        entity.setField(new ArrayField<Integer>("testfield", listint));
-        batch.index(indexScope, entity);
-        batch.execute().get();
-
-        entity = new Entity( entityType );
-        entity.setField(new UUIDField(IndexingUtils.ENTITYID_ID_FIELDNAME, UUID.randomUUID()));
-        EntityUtils.setVersion(entity, UUIDGenerator.newTimeUUID());
-        List<EntityObject> listObj = new ArrayList<>();
-        EntityObject listObjField = new EntityObject();
-        listObjField.setField(new StringField("testasf","somevalue"));
-        listObj.add(listObjField);
-        listObjField = new EntityObject();
-        listObjField.setField(new IntegerField("testasf",0));
-        listObj.add(listObjField);
-        entity.setField(new ArrayField<EntityObject>("testfield", listObj));
-        batch.index(indexScope, entity);
-        batch.execute().get();
-
         entityIndex.refresh();
-        testQuery(indexScope, searchTypes, entityIndex, "select *", 5);
 
+        testQueries( indexScope, searchTypes,  entityIndex );
     }
 
     @Test
@@ -288,7 +263,6 @@ public class EntityIndexTest extends BaseIT {
         entityIndex.refresh();
 
         //Hilda Youn
-
         testQuery(indexScope, searchTypes, entityIndex, "name = 'Bowers Oneil'", 0);
 
     }


[2/2] incubator-usergrid git commit: Removed Hystrix. Causing thread OOM issues and not really used in production.

Posted by to...@apache.org.
Removed Hystrix.   Causing thread OOM issues and not really used in production.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5e6ed4a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5e6ed4a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5e6ed4a1

Branch: refs/heads/USERGRID-405-merge
Commit: 5e6ed4a12c7aa154f1411d85c6fcce56fbcd4367
Parents: b09dc43
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 17 18:23:58 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 17 18:23:58 2015 -0600

----------------------------------------------------------------------
 .../core/astyanax/MultiRowColumnIterator.java   | 12 ++-
 .../core/hystrix/HystrixCassandra.java          | 94 --------------------
 .../persistence/core/task/TaskExecutor.java     |  4 +-
 .../graph/impl/GraphManagerImpl.java            | 30 +++++--
 .../graph/impl/stage/EdgeDeleteRepairImpl.java  |  9 +-
 .../graph/impl/stage/EdgeMetaRepairImpl.java    | 16 +++-
 .../impl/stage/NodeDeleteListenerImpl.java      | 16 +++-
 .../impl/NodeSerializationImpl.java             | 36 ++++----
 .../shard/count/NodeShardApproximationImpl.java |  4 +-
 .../NodeShardCounterSerializationImpl.java      | 25 +++---
 .../shard/impl/NodeShardAllocationImpl.java     | 16 +++-
 .../shard/impl/ShardGroupCompactionImpl.java    | 64 ++++---------
 12 files changed, 128 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index fdc4768..667992c 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -33,10 +33,8 @@ import java.util.NoSuchElementException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
-
 import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.ColumnFamily;
 import com.netflix.astyanax.model.ColumnList;
@@ -184,7 +182,13 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
                 keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys )
                         .withColumnRange( rangeBuilder.build() );
 
-        final Rows<R, C> result = HystrixCassandra.user( query ).getResult();
+        final Rows<R, C> result;
+        try {
+            result = query.execute().getResult();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to casandra", e );
+        }
 
 
         //now aggregate them together

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
deleted file mode 100644
index ab71782..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.core.hystrix;
-
-
-import com.netflix.astyanax.Execution;
-import com.netflix.astyanax.connectionpool.OperationResult;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.hystrix.HystrixCommand;
-import com.netflix.hystrix.HystrixCommandGroupKey;
-import com.netflix.hystrix.HystrixThreadPoolProperties;
-
-
-/**
- * A utility class that creates graph observables wrapped in Hystrix for timeouts and circuit breakers.
- */
-public class HystrixCassandra {
-
-
-
-
-    /**
-     * Command group used for realtime user commands
-     */
-    public static final HystrixCommand.Setter
-            USER_GROUP = HystrixCommand.Setter.withGroupKey(   HystrixCommandGroupKey.Factory.asKey( "user" ) ).andThreadPoolPropertiesDefaults(
-            HystrixThreadPoolProperties.Setter().withCoreSize( 1000 ) );
-
-    /**
-     * Command group for asynchronous operations
-     */
-    public static final HystrixCommand.Setter
-            ASYNC_GROUP = HystrixCommand.Setter.withGroupKey( HystrixCommandGroupKey.Factory.asKey( "async" ) ).andThreadPoolPropertiesDefaults(
-            HystrixThreadPoolProperties.Setter().withCoreSize( 1000 ) );
-
-
-    /**
-     * Execute an user operation
-     */
-    public static <R> OperationResult<R> user( final Execution<R> execution) {
-        return new HystrixCommand<OperationResult<R>>( USER_GROUP ) {
-
-            @Override
-            protected OperationResult<R> run() {
-                try {
-                    return  execution.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( e );
-                }
-            }
-        }.execute();
-    }
-
-
-    /**
-     * Execute an an async operation
-     */
-    public static <R> OperationResult<R> async( final Execution<R> execution) {
-
-
-        return new HystrixCommand<OperationResult<R>>( ASYNC_GROUP ) {
-
-            @Override
-            protected OperationResult<R> run() {
-                try {
-                    return  execution.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( e );
-                }
-            }
-        }.execute();
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index 5e9aa4c..5728d2e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -25,7 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 /**
  * An interface for execution of tasks
  */
-public interface TaskExecutor {
+public interface  TaskExecutor {
 
     /**
      * Submit the task asynchronously
@@ -37,5 +37,5 @@ public interface TaskExecutor {
      * Stop the task executor without waiting for scheduled threads to run
      */
     public void shutdown();
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 246bbca..26d06ad 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -25,14 +25,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -54,10 +51,12 @@ import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
 import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Notification;
 import rx.Observable;
@@ -202,7 +201,12 @@ public class GraphManagerImpl implements GraphManager {
 
                 mutation.mergeShallow( edgeMutation );
 
-                HystrixCassandra.user( mutation );
+                try {
+                    mutation.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to execute mutation", e );
+                }
 
                 return edge;
             }
@@ -241,7 +245,12 @@ public class GraphManagerImpl implements GraphManager {
 
 
                 LOG.debug("Marking edge {} as deleted to commit log", edge);
-                HystrixCassandra.user(edgeMutation);
+                try {
+                    edgeMutation.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to execute mutation", e );
+                }
 
 
                 //HystrixCassandra.async( edgeDeleteListener.receive( scope, markedEdge,
@@ -285,7 +294,12 @@ public class GraphManagerImpl implements GraphManager {
 
 
                 LOG.debug( "Marking node {} as deleted to node mark", node );
-                HystrixCassandra.user( nodeMutation );
+                try {
+                    nodeMutation.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to execute mutation", e );
+                }
 
 
                 //HystrixCassandra.async(nodeDeleteListener.receive(scope, id, eventTimestamp  )).subscribeOn(

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
index 0137ba4..7dca0ce 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -26,7 +26,6 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -39,6 +38,7 @@ import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.functions.Action1;
@@ -94,7 +94,12 @@ public class EdgeDeleteRepairImpl implements EdgeDeleteRepair {
 
 
                                     //remove from storage
-                                    HystrixCassandra.async(storageSerialization.deleteEdge( scope, edge, timestamp ));
+                                    try {
+                                        storageSerialization.deleteEdge( scope, edge, timestamp ).execute();
+                                    }
+                                    catch ( ConnectionException e ) {
+                                        throw new RuntimeException( "Unable to connect to casandra", e );
+                                    }
                                 }
                             }
                         } );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index 0e1c4e2..ab141f7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -47,6 +46,7 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.functions.Action1;
@@ -188,7 +188,12 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                                                                                 + "Mutation has {} rows to mutate ",
                                                                         edgeType, batch.getRowCount() );
 
-                                                                HystrixCassandra.async( batch );
+                                                                try {
+                                                                    batch.execute();
+                                                                }
+                                                                catch ( ConnectionException e ) {
+                                                                    throw new RuntimeException( "Unable to connect to casandra", e );
+                                                                }
                                                             }
                                                         }
 
@@ -219,7 +224,12 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
 
                 LOG.debug( "Type {} has no subtypes in use as of maxTimestamp {}.  Deleting type.", edgeType,
                         maxTimestamp );
-                HystrixCassandra.async( serialization.removeEdgeType( scope, node, edgeType, maxTimestamp ) );
+                try {
+                    serialization.removeEdgeType( scope, node, edgeType, maxTimestamp ).execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to connect to casandra", e );
+                }
             }
         } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index f167f0c..e8c224e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -29,7 +29,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -47,6 +46,7 @@ import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.functions.Action0;
@@ -129,7 +129,12 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
                                 .doOnCompleted( new Action0() {
                                     @Override
                                     public void call() {
-                                        HystrixCassandra.async(nodeSerialization.delete( scope, node, maxVersion.get() ));
+                                        try {
+                                            nodeSerialization.delete( scope, node, maxVersion.get()).execute();
+                                        }
+                                        catch ( ConnectionException e ) {
+                                            throw new RuntimeException( "Unable to connect to casandra", e );
+                                        }
                                     }
                                 } );
                     }
@@ -210,7 +215,12 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
                             targetNodes.add( new TargetPair( edge.getTargetNode(), edge.getType() ) );
                         }
 
-                        HystrixCassandra.async( batch );
+                        try {
+                            batch.execute();
+                        }
+                        catch ( ConnectionException e ) {
+                            throw new RuntimeException( "Unable to connect to casandra", e );
+                        }
 
                         //now  delete meta data
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
index 2cc0391..18062c4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
@@ -38,7 +38,6 @@ import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -52,6 +51,7 @@ import com.google.common.base.Preconditions;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.Row;
@@ -150,22 +150,23 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
                 keyspace.prepareQuery( GRAPH_DELETE ).setConsistencyLevel( fig.getReadCL() );
 
 
+
+        Column<Boolean> result = null;
         try {
-            Column<Boolean> result = HystrixCassandra
-                    .user( query.getKey( ScopedRowKey.fromKey( scope.getApplication(), node ) ).getColumn( COLUMN_NAME ) )
+            result = query.getKey( ScopedRowKey.fromKey( scope.getApplication(), node ) ).getColumn( COLUMN_NAME ).execute()
                     .getResult();
-
-            return Optional.of( result.getLongValue() );
         }
-        catch (RuntimeException re ) {
-            if(re.getCause().getCause() instanceof   NotFoundException) {
-                //swallow, there's just no column
-                return Optional.absent();
-            }
-
-            throw re;
+        catch(NotFoundException nfe){
+             //swallow, there's just no column
+            return Optional.absent();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to casandra", e );
         }
 
+        return Optional.of( result.getLongValue() );
+
+
     }
 
 
@@ -193,9 +194,14 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
         }
 
 
-        final Rows<ScopedRowKey<Id>, Boolean> results = HystrixCassandra
-                .user( query.getRowSlice( keys ).withColumnSlice( Collections.singletonList( COLUMN_NAME ) ) )
-                .getResult();
+        final Rows<ScopedRowKey<Id>, Boolean> results;
+        try {
+            results = query.getRowSlice( keys ).withColumnSlice( Collections.singletonList( COLUMN_NAME )).execute()
+                    .getResult();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to casandra", e );
+        }
 
         for ( Row<ScopedRowKey<Id>, Boolean> row : results ) {
             Column<Boolean> column = row.getColumns().getColumnByName( COLUMN_NAME );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
index 47243e5..a47d528 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
@@ -39,6 +38,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.hystrix.HystrixCommand;
+import com.netflix.hystrix.HystrixCommandGroupKey;
 
 import rx.functions.Action0;
 import rx.schedulers.Schedulers;
@@ -229,7 +229,7 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
                 /**
                  * Execute the command in hystrix to avoid slamming cassandra
                  */
-                new HystrixCommand( HystrixCassandra.ASYNC_GROUP ) {
+                new HystrixCommand( HystrixCommandGroupKey.Factory.asKey("BatchCounterRollup") ) {
 
                     @Override
                     protected Void run() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
index 524a0cf..6934275 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
@@ -33,9 +33,8 @@ import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
 import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -50,6 +49,7 @@ import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.CompositeBuilder;
@@ -117,24 +117,19 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
         final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope.getApplication(), key );
 
 
+        OperationResult<Column<Boolean>> column = null;
         try {
-            OperationResult<Column<Boolean>> column = HystrixCassandra.user(
-                    keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( true ) );
-
-            return column.getResult().getLongValue();
+            column = keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( true ).execute();
         }
         //column not found, return 0
-        catch ( RuntimeException re ) {
-
-            final Throwable cause = re.getCause();
-
-            if(cause != null && cause.getCause() instanceof NotFoundException) {
-                return 0;
-            }
-
-            throw  re;
+        catch ( NotFoundException nfe ) {
+            return 0;
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to read from cassandra", e );
         }
 
+        return column.getResult().getLongValue();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 0d34b63..ad64338 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -49,6 +48,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.util.TimeUUIDUtils;
 
 
@@ -110,7 +110,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
             final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, MIN_SHARD, directedEdgeMeta );
-            HystrixCassandra.user( batch  );
+            try {
+                batch.execute();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to connect to casandra", e );
+            }
 
             existingShards = Collections.singleton( MIN_SHARD ).iterator();
         }
@@ -232,7 +237,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
         final MutationBatch batch = this.edgeShardSerialization.writeShardMeta( scope, newShard, directedEdgeMeta );
 
-        HystrixCassandra.user( batch );
+        try {
+            batch.execute();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to casandra", e );
+        }
 
 
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 9f8efc8..6135121 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -29,13 +29,7 @@ import java.util.Iterator;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
 
@@ -43,7 +37,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
@@ -74,6 +67,7 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 
 /**
@@ -207,8 +201,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                 if ( edgeCount % maxWorkSize == 0 ) {
 
                     try {
-                        HystrixCassandra.async( newRowBatch );
-                        HystrixCassandra.async( deleteRowBatch );
+                        newRowBatch.execute();
+                        deleteRowBatch.execute();
                     }
                     catch ( Throwable t ) {
                         LOG.error( "Unable to move edges from shard {} to shard {}", sourceShard, targetShard );
@@ -219,8 +213,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
 
         try {
-            HystrixCassandra.async( newRowBatch );
-            HystrixCassandra.async( deleteRowBatch );
+            newRowBatch.execute();
+            deleteRowBatch.execute();
         }
         catch ( Throwable t ) {
             LOG.error( "Unable to move edges to target shard {}", targetShard );
@@ -260,7 +254,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
             }
 
 
-            HystrixCassandra.async( shardRemovalRollup );
+            try {
+                shardRemovalRollup.execute();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to connect to casandra", e );
+            }
 
 
             LOG.info( "Shard has been fully compacted.  Marking shard {} as compacted in Cassandra", targetShard );
@@ -268,7 +267,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
             //Overwrite our shard index with a newly created one that has been marked as compacted
             Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true );
             final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta );
-            HystrixCassandra.async( updateMark );
+            try {
+                updateMark.execute();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to connect to casandra", e );
+            }
 
             resultBuilder.withCompactedShard( compactedShard );
         }
@@ -530,40 +534,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
     }
 
 
-    /**
-     * Create a thread pool that will reject work if our audit tasks become overwhelmed
-     */
-    private final class MaxSizeThreadPool extends ThreadPoolExecutor {
-
-        public MaxSizeThreadPool( final int workerSize, final int queueLength ) {
-            super( 1, workerSize, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>( queueLength ),
-                    new CompactionThreadFactory(), new RejectionLogger() );
-        }
-    }
-
-
-    private final class CompactionThreadFactory implements ThreadFactory {
-
-        private final AtomicLong threadCounter = new AtomicLong();
-
-
-        @Override
-        public Thread newThread( final Runnable r ) {
-            final long newValue = threadCounter.incrementAndGet();
-
-            return new Thread( r, "Graph-Shard-Compaction-" + newValue );
-        }
-    }
-
-
-    private final class RejectionLogger implements RejectedExecutionHandler {
-
-
-        @Override
-        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
-            LOG.warn( "Audit queue full, rejecting audit task {}", r );
-        }
-    }
 
 
     public static final class CompactionResult {