You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/27 18:20:49 UTC

[02/27] git commit: Pushed Hystrix down to actual read execution of RowQuery. Still not desired behavior

Pushed Hystrix down to actual read execution of RowQuery.  Still not desired behavior


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c1e7a33e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c1e7a33e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c1e7a33e

Branch: refs/heads/entity-manager
Commit: c1e7a33eba78d3308e67cd8b9a6fe1ea40b5721c
Parents: 5cadb9f
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 19 16:16:46 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 19 16:16:46 2014 -0700

----------------------------------------------------------------------
 .../graph/impl/EdgeDeleteListener.java          |  2 +-
 .../persistence/graph/impl/EdgeManagerImpl.java | 16 +++---
 .../graph/impl/EdgeWriteListener.java           |  2 +-
 .../graph/impl/NodeDeleteListener.java          |  8 +--
 .../graph/impl/stage/AbstractEdgeRepair.java    |  4 +-
 .../graph/impl/stage/EdgeMetaRepairImpl.java    |  8 +--
 .../impl/EdgeMetadataSerializationImpl.java     | 17 +++---
 .../impl/EdgeSerializationImpl.java             | 21 ++++----
 .../impl/parse/ColumnNameIterator.java          | 55 ++++++++++++--------
 .../impl/parse/ObservableIterator.java          | 48 +++++------------
 10 files changed, 88 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
index d377882..a5715c9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -76,7 +76,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
 
 
                 //go through every version of this edge <= the current version and remove it
-                Observable<MarkedEdge> edges = Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+                Observable<MarkedEdge> edges = Observable.create( new ObservableIterator<MarkedEdge>( ) {
                     @Override
                     protected Iterator<MarkedEdge> getIterator() {
                         return edgeSerialization.getEdgeToTarget( scope,

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
index 9e529c5..f825767 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
@@ -201,7 +201,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>( ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesFromSource( scope, search );
@@ -220,7 +220,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>( ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesToTarget( scope, search );
@@ -240,7 +240,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>( ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesFromSourceByTargetType( scope, search );
@@ -258,7 +258,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>( ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesToTargetBySourceType( scope, search );
@@ -276,7 +276,7 @@ public class EdgeManagerImpl implements EdgeManager {
     @Override
     public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<String>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<String>( ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
@@ -287,7 +287,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
-        return Observable.create( new ObservableIterator<String>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<String>( ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
@@ -299,7 +299,7 @@ public class EdgeManagerImpl implements EdgeManager {
     @Override
     public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<String>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<String>( ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
@@ -310,7 +310,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
-        return Observable.create( new ObservableIterator<String>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<String>( ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getIdTypesToTarget( scope, search );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
index ed8e861..e146ba9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
@@ -52,7 +52,7 @@ public class EdgeWriteListener implements MessageListener<EdgeEvent<Edge>, EdgeE
         final OrganizationScope scope = write.getOrganizationScope();
         final UUID maxVersion = edge.getVersion();
 
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>(  ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index 631ac50..0927ba8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -210,7 +210,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
      */
     private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<String>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<String>(  ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
@@ -224,7 +224,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
      */
     private Observable<String> getEdgesTypesFromSource( final OrganizationScope scope, final SearchEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<String>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<String>(  ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
@@ -238,7 +238,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
      */
     private Observable<MarkedEdge> loadEdgesToTarget( final OrganizationScope scope, final SearchByEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>(  ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesToTarget( scope, search );
@@ -252,7 +252,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
      */
     private Observable<MarkedEdge> loadEdgesFromSource( final OrganizationScope scope, final SearchByEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>(  ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesFromSource( scope, search );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
index e09d884..fb31a33 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
@@ -125,7 +125,7 @@ public abstract class AbstractEdgeRepair  {
      */
     private Observable<MarkedEdge> getEdgeVersionsFromSource( final OrganizationScope scope, final Edge edge ) {
 
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>(  ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
 
@@ -142,7 +142,7 @@ public abstract class AbstractEdgeRepair  {
      */
     private Observable<MarkedEdge> getEdgeVersionsToTarget( final OrganizationScope scope, final Edge edge ) {
 
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>(  ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index c54f485..0bbc91a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -251,7 +251,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
         @Override
         public Observable<String> loadEdgeSubTypes( final OrganizationScope scope, final Id nodeId,
                                                     final String edgeType, final UUID version ) {
-            return Observable.create( new ObservableIterator<String>( graphFig.getReadTimeout() ) {
+            return Observable.create( new ObservableIterator<String>( ) {
                 @Override
                 protected Iterator<String> getIterator() {
                     return edgeMetadataSerialization
@@ -264,7 +264,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
         @Override
         public Observable<MarkedEdge> loadEdges( final OrganizationScope scope, final Id nodeId, final String edgeType,
                                                  final String subType, final UUID version ) {
-            return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+            return Observable.create( new ObservableIterator<MarkedEdge>( ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return edgeSerialization.getEdgesToTargetBySourceType( scope,
@@ -296,7 +296,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
         @Override
         public Observable<String> loadEdgeSubTypes( final OrganizationScope scope, final Id nodeId,
                                                     final String edgeType, final UUID version ) {
-            return Observable.create( new ObservableIterator<String>( graphFig.getReadTimeout() ) {
+            return Observable.create( new ObservableIterator<String>( ) {
                 @Override
                 protected Iterator<String> getIterator() {
                     return edgeMetadataSerialization
@@ -309,7 +309,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
         @Override
         public Observable<MarkedEdge> loadEdges( final OrganizationScope scope, final Id nodeId, final String edgeType,
                                                  final String subType, final UUID version ) {
-            return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+            return Observable.create( new ObservableIterator<MarkedEdge>( ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return edgeSerialization.getEdgesFromSourceByTargetType( scope,

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
index 713c7fe..b230399 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
@@ -37,6 +37,7 @@ import org.apache.usergrid.persistence.collection.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.collection.migration.Migration;
 import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.SearchEdgeType;
 import org.apache.usergrid.persistence.graph.SearchIdType;
 import org.apache.usergrid.persistence.graph.serialization.CassandraConfig;
@@ -50,7 +51,6 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.ByteBufferRange;
 import com.netflix.astyanax.model.CompositeBuilder;
 import com.netflix.astyanax.model.CompositeParser;
@@ -111,13 +111,16 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
 
 
     protected final Keyspace keyspace;
-    private final CassandraConfig graphFig;
+    private final CassandraConfig cassandraConfig;
+    private final GraphFig graphFig;
 
 
 
     @Inject
-    public EdgeMetadataSerializationImpl( final Keyspace keyspace, final CassandraConfig graphFig) {
+    public EdgeMetadataSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
+                                          final GraphFig graphFig ) {
         this.keyspace = keyspace;
+        this.cassandraConfig = cassandraConfig;
         this.graphFig = graphFig;
     }
 
@@ -329,14 +332,14 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
 
 
         final RangeBuilder rangeBuilder =
-                new RangeBuilder().setLimit( graphFig.getScanPageSize() ).setStart( search.getLast().or( "" ) );
+                new RangeBuilder().setLimit( cassandraConfig.getScanPageSize() ).setStart( search.getLast().or( "" ) );
 
         RowQuery<ScopedRowKey<OrganizationScope, Id>, String> query =
                 keyspace.prepareQuery( cf ).getKey( sourceKey ).autoPaginate( true )
                         .withColumnRange( rangeBuilder.build() );
 
         return new ColumnNameIterator<String, String>( query, PARSER,
-                    search.getLast().isPresent() );
+                    search.getLast().isPresent(), graphFig.getReadTimeout() );
 
     }
 
@@ -367,14 +370,14 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
 
         //resume from the last if specified.  Also set the range
         final ByteBufferRange searchRange =
-                new RangeBuilder().setLimit( graphFig.getScanPageSize() ).setStart( search.getLast().or( "" ) ).build();
+                new RangeBuilder().setLimit( cassandraConfig.getScanPageSize() ).setStart( search.getLast().or( "" ) ).build();
 
         RowQuery<ScopedRowKey<OrganizationScope, EdgeIdTypeKey>, String> query =
                 keyspace.prepareQuery( cf ).getKey( sourceTypeKey ).autoPaginate( true ).withColumnRange( searchRange );
 
 
        return new ColumnNameIterator<String, String>( query, PARSER,
-                    search.getLast().isPresent() );
+                    search.getLast().isPresent(), graphFig.getReadTimeout() );
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index 4dc3c8a..7e11206 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -41,6 +41,7 @@ import org.apache.usergrid.persistence.collection.cassandra.ColumnTypes;
 import org.apache.usergrid.persistence.collection.migration.Migration;
 import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -59,7 +60,6 @@ import com.google.common.base.Preconditions;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.AbstractComposite;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.CompositeBuilder;
@@ -125,19 +125,22 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
 
     protected final Keyspace keyspace;
-    protected final CassandraConfig graphFig;
+    protected final CassandraConfig cassandraConfig;
+    protected final GraphFig graphFig;
 
 
     @Inject
-    public EdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig graphFig ) {
+    public EdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
+                                  final GraphFig graphFig ) {
         this.keyspace = keyspace;
+        this.cassandraConfig = cassandraConfig;
         this.graphFig = graphFig;
     }
 
 
     @Override
     public MutationBatch writeEdge( final OrganizationScope scope, final Edge edge ) {
-        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( graphFig.getWriteCL() );;
+        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() );;
 
 
         doWrite( scope, edge, new RowOp() {
@@ -156,7 +159,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
     @Override
     public MutationBatch markEdge( final OrganizationScope scope, final Edge edge ) {
-        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( graphFig.getWriteCL() );;
+        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() );;
 
 
         doWrite( scope, edge, new RowOp() {
@@ -175,7 +178,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
     @Override
     public MutationBatch deleteEdge( final OrganizationScope scope, final Edge edge ) {
-        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( graphFig.getWriteCL());
+        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() );
 
 
         doWrite( scope, edge, new RowOp() {
@@ -477,7 +480,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
          * If the edge is present, we need to being seeking from this
          */
 
-        final RangeBuilder rangeBuilder = new RangeBuilder().setLimit( graphFig.getScanPageSize() );
+        final RangeBuilder rangeBuilder = new RangeBuilder().setLimit( cassandraConfig.getScanPageSize() );
 
 
         //set the range into the search
@@ -487,12 +490,12 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
 
         RowQuery<ScopedRowKey<OrganizationScope, R>, DirectedEdge> query =
-                keyspace.prepareQuery( cf ).setConsistencyLevel( graphFig.getReadCL() ).getKey( rowKey ).autoPaginate( true )
+                keyspace.prepareQuery( cf ).setConsistencyLevel( cassandraConfig.getReadCL() ).getKey( rowKey ).autoPaginate( true )
                         .withColumnRange( rangeBuilder.build() );
 
 
         return new ColumnNameIterator<DirectedEdge, MarkedEdge>( query, searcher,
-                    searcher.hasPage() );
+                    searcher.hasPage(), graphFig.getReadTimeout() );
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java
index 907586c..b6d17c6 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java
@@ -4,21 +4,24 @@ package org.apache.usergrid.persistence.graph.serialization.impl.parse;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.query.RowQuery;
+import com.netflix.hystrix.HystrixCommand;
+import com.netflix.hystrix.HystrixCommandGroupKey;
+import com.netflix.hystrix.HystrixCommandProperties;
 
 
 /**
- *
- * Simple iterator that wraps a Row query and will keep executing it's paging until there are no more
- * results to read from cassandra
- *
- *
+ * Simple iterator that wraps a Row query and will keep executing it's paging until there are no more results to read
+ * from cassandra
  */
 public class ColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
 
 
+    private static final HystrixCommandGroupKey GROUP_KEY = HystrixCommandGroupKey.Factory.asKey( "CassRead" );
+
+    private final int executionTimeout;
+
 
     private final RowQuery<?, C> rowQuery;
     private final ColumnParser<C, T> parser;
@@ -26,21 +29,21 @@ public class ColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
     private Iterator<Column<C>> sourceIterator;
 
 
-
-    public ColumnNameIterator( RowQuery<?, C> rowQuery, final ColumnParser<C, T> parser, final boolean skipFirst ) {
+    public ColumnNameIterator( RowQuery<?, C> rowQuery, final ColumnParser<C, T> parser, final boolean skipFirst,
+                               final int executionTimeout ) {
         this.rowQuery = rowQuery.autoPaginate( true );
         this.parser = parser;
+        this.executionTimeout = executionTimeout;
 
         advanceIterator();
 
         //if we are to skip the first element, we need to advance the iterator
-        if(skipFirst && sourceIterator.hasNext()){
+        if ( skipFirst && sourceIterator.hasNext() ) {
             sourceIterator.next();
         }
     }
 
 
-
     @Override
     public Iterator<T> iterator() {
         return this;
@@ -50,8 +53,8 @@ public class ColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
     @Override
     public boolean hasNext() {
         //if we've exhausted this iterator, try to advance to the next set
-        if(sourceIterator.hasNext()){
-           return true;
+        if ( sourceIterator.hasNext() ) {
+            return true;
         }
 
         //advance the iterator, to the next page, there could be more
@@ -64,29 +67,37 @@ public class ColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
     @Override
     public T next() {
 
-        if(!hasNext()){
+        if ( !hasNext() ) {
             throw new NoSuchElementException();
         }
 
-        return parser.parseColumn(sourceIterator.next());
+        return parser.parseColumn( sourceIterator.next() );
     }
 
 
     @Override
     public void remove() {
-       sourceIterator.remove();
+        sourceIterator.remove();
     }
 
 
     /**
      * Execute the query again and set the reuslts
      */
-    private void advanceIterator(){
-        try {
-            sourceIterator = rowQuery.execute().getResult().iterator();
-        }
-        catch ( ConnectionException e ) {
-            throw new RuntimeException("Unable to execute query", e);
-        }
+    private void advanceIterator() {
+
+        //run producing the values within a hystrix command.  This way we'll time out if the read takes too long
+        sourceIterator = new HystrixCommand<Iterator<Column<C>>>( HystrixCommand.Setter.withGroupKey( GROUP_KEY )
+                                                                                .andCommandPropertiesDefaults(
+                                                                                        HystrixCommandProperties
+                                                                                                .Setter()
+                                                                                                .withExecutionIsolationThreadTimeoutInMilliseconds(
+                                                                                                        executionTimeout ) ) ) {
+
+            @Override
+            protected Iterator<Column<C>> run() throws Exception {
+                return rowQuery.execute().getResult().iterator();
+            }
+        }.execute();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
index c70dc4c..2274868 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
@@ -3,65 +3,43 @@ package org.apache.usergrid.persistence.graph.serialization.impl.parse;
 
 import java.util.Iterator;
 
-import com.netflix.hystrix.HystrixCommand;
-import com.netflix.hystrix.HystrixCommandGroupKey;
-import com.netflix.hystrix.HystrixCommandProperties;
-
 import rx.Observable;
+import rx.Observer;
 import rx.Subscriber;
+import rx.Subscription;
+import rx.subscriptions.Subscriptions;
 
 
 /**
  * Converts an iterator to an observable.  Subclasses need to only implement getting the iterator from the data source.
- * This is used in favor of "Observable.just" when the initial fetch of the iterator will require I/O.  This allows us
- * to wrap the iterator in a deferred invocation to avoid the blocking on construction.
+ * This is used in favor of "Observable.just" when the initial fetch of the iterator will require I/O.  This allows
+ * us to wrap the iterator in a deferred invocation to avoid the blocking on construction.
  */
 public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T> {
 
-    private static final HystrixCommandGroupKey GROUP_KEY = HystrixCommandGroupKey.Factory.asKey( "CassRead" );
-
-    private final int executionTimeout;
-
-
-    protected ObservableIterator( final int executionTimeout ) {
-        this.executionTimeout = executionTimeout;
-    }
-
 
     @Override
     public void call( final Subscriber<? super T> subscriber ) {
 
 
         try {
-            //run producing the values within a hystrix command.  This way we'll time out if the read takes too long
-            new HystrixCommand<Void>( HystrixCommand.Setter.withGroupKey( GROUP_KEY ).andCommandPropertiesDefaults(
-                    HystrixCommandProperties.Setter()
-                                            .withExecutionIsolationThreadTimeoutInMilliseconds( executionTimeout ) ) ) {
+            //get our iterator and push data to the observer
+            Iterator<T> itr = getIterator();
 
 
-                @Override
-                protected Void run() throws Exception {
-                    //get our iterator and push data to the observer
-                    final Iterator<T> itr = getIterator();
+            //while we have items to emit and our subscriber is subscribed, we want to keep emitting items
+            while ( itr.hasNext() && !subscriber.isUnsubscribed()) {
+                subscriber.onNext( itr.next() );
+            }
 
-
-                    //while we have items to emit and our subscriber is subscribed, we want to keep emitting items
-                    while ( itr.hasNext() && !subscriber.isUnsubscribed() ) {
-                        subscriber.onNext( itr.next() );
-                    }
-
-
-                    subscriber.onCompleted();
-
-                    return null;
-                }
-            }.execute();
+            subscriber.onCompleted();
         }
 
         //if any error occurs, we need to notify the observer so it can perform it's own error handling
         catch ( Throwable t ) {
             subscriber.onError( t );
         }
+
     }