You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/20 00:21:01 UTC

[1/4] git commit: Pushed Hystrix down to actual read execution of RowQuery. Still not desired behavior

Repository: incubator-usergrid
Updated Branches:
  refs/heads/hystrix-integration 5cadb9f58 -> c1e7a33eb
  refs/pull/76/head fe2b3d6b4 -> a1b6124cc
  refs/pull/76/merge 515b8da9e -> 81ecfec85 (forced update)


Pushed Hystrix down to actual read execution of RowQuery.  Still not desired behavior


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c1e7a33e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c1e7a33e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c1e7a33e

Branch: refs/heads/hystrix-integration
Commit: c1e7a33eba78d3308e67cd8b9a6fe1ea40b5721c
Parents: 5cadb9f
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 19 16:16:46 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 19 16:16:46 2014 -0700

----------------------------------------------------------------------
 .../graph/impl/EdgeDeleteListener.java          |  2 +-
 .../persistence/graph/impl/EdgeManagerImpl.java | 16 +++---
 .../graph/impl/EdgeWriteListener.java           |  2 +-
 .../graph/impl/NodeDeleteListener.java          |  8 +--
 .../graph/impl/stage/AbstractEdgeRepair.java    |  4 +-
 .../graph/impl/stage/EdgeMetaRepairImpl.java    |  8 +--
 .../impl/EdgeMetadataSerializationImpl.java     | 17 +++---
 .../impl/EdgeSerializationImpl.java             | 21 ++++----
 .../impl/parse/ColumnNameIterator.java          | 55 ++++++++++++--------
 .../impl/parse/ObservableIterator.java          | 48 +++++------------
 10 files changed, 88 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
index d377882..a5715c9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -76,7 +76,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
 
 
                 //go through every version of this edge <= the current version and remove it
-                Observable<MarkedEdge> edges = Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+                Observable<MarkedEdge> edges = Observable.create( new ObservableIterator<MarkedEdge>( ) {
                     @Override
                     protected Iterator<MarkedEdge> getIterator() {
                         return edgeSerialization.getEdgeToTarget( scope,

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
index 9e529c5..f825767 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
@@ -201,7 +201,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>( ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesFromSource( scope, search );
@@ -220,7 +220,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>( ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesToTarget( scope, search );
@@ -240,7 +240,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>( ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesFromSourceByTargetType( scope, search );
@@ -258,7 +258,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>( ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesToTargetBySourceType( scope, search );
@@ -276,7 +276,7 @@ public class EdgeManagerImpl implements EdgeManager {
     @Override
     public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<String>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<String>( ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
@@ -287,7 +287,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
-        return Observable.create( new ObservableIterator<String>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<String>( ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
@@ -299,7 +299,7 @@ public class EdgeManagerImpl implements EdgeManager {
     @Override
     public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<String>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<String>( ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
@@ -310,7 +310,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
-        return Observable.create( new ObservableIterator<String>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<String>( ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getIdTypesToTarget( scope, search );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
index ed8e861..e146ba9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
@@ -52,7 +52,7 @@ public class EdgeWriteListener implements MessageListener<EdgeEvent<Edge>, EdgeE
         final OrganizationScope scope = write.getOrganizationScope();
         final UUID maxVersion = edge.getVersion();
 
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>(  ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index 631ac50..0927ba8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -210,7 +210,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
      */
     private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<String>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<String>(  ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
@@ -224,7 +224,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
      */
     private Observable<String> getEdgesTypesFromSource( final OrganizationScope scope, final SearchEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<String>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<String>(  ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
@@ -238,7 +238,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
      */
     private Observable<MarkedEdge> loadEdgesToTarget( final OrganizationScope scope, final SearchByEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>(  ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesToTarget( scope, search );
@@ -252,7 +252,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
      */
     private Observable<MarkedEdge> loadEdgesFromSource( final OrganizationScope scope, final SearchByEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>(  ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesFromSource( scope, search );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
index e09d884..fb31a33 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
@@ -125,7 +125,7 @@ public abstract class AbstractEdgeRepair  {
      */
     private Observable<MarkedEdge> getEdgeVersionsFromSource( final OrganizationScope scope, final Edge edge ) {
 
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>(  ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
 
@@ -142,7 +142,7 @@ public abstract class AbstractEdgeRepair  {
      */
     private Observable<MarkedEdge> getEdgeVersionsToTarget( final OrganizationScope scope, final Edge edge ) {
 
-        return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>(  ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index c54f485..0bbc91a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -251,7 +251,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
         @Override
         public Observable<String> loadEdgeSubTypes( final OrganizationScope scope, final Id nodeId,
                                                     final String edgeType, final UUID version ) {
-            return Observable.create( new ObservableIterator<String>( graphFig.getReadTimeout() ) {
+            return Observable.create( new ObservableIterator<String>( ) {
                 @Override
                 protected Iterator<String> getIterator() {
                     return edgeMetadataSerialization
@@ -264,7 +264,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
         @Override
         public Observable<MarkedEdge> loadEdges( final OrganizationScope scope, final Id nodeId, final String edgeType,
                                                  final String subType, final UUID version ) {
-            return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+            return Observable.create( new ObservableIterator<MarkedEdge>( ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return edgeSerialization.getEdgesToTargetBySourceType( scope,
@@ -296,7 +296,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
         @Override
         public Observable<String> loadEdgeSubTypes( final OrganizationScope scope, final Id nodeId,
                                                     final String edgeType, final UUID version ) {
-            return Observable.create( new ObservableIterator<String>( graphFig.getReadTimeout() ) {
+            return Observable.create( new ObservableIterator<String>( ) {
                 @Override
                 protected Iterator<String> getIterator() {
                     return edgeMetadataSerialization
@@ -309,7 +309,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
         @Override
         public Observable<MarkedEdge> loadEdges( final OrganizationScope scope, final Id nodeId, final String edgeType,
                                                  final String subType, final UUID version ) {
-            return Observable.create( new ObservableIterator<MarkedEdge>( graphFig.getReadTimeout() ) {
+            return Observable.create( new ObservableIterator<MarkedEdge>( ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return edgeSerialization.getEdgesFromSourceByTargetType( scope,

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
index 713c7fe..b230399 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
@@ -37,6 +37,7 @@ import org.apache.usergrid.persistence.collection.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.collection.migration.Migration;
 import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.SearchEdgeType;
 import org.apache.usergrid.persistence.graph.SearchIdType;
 import org.apache.usergrid.persistence.graph.serialization.CassandraConfig;
@@ -50,7 +51,6 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.ByteBufferRange;
 import com.netflix.astyanax.model.CompositeBuilder;
 import com.netflix.astyanax.model.CompositeParser;
@@ -111,13 +111,16 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
 
 
     protected final Keyspace keyspace;
-    private final CassandraConfig graphFig;
+    private final CassandraConfig cassandraConfig;
+    private final GraphFig graphFig;
 
 
 
     @Inject
-    public EdgeMetadataSerializationImpl( final Keyspace keyspace, final CassandraConfig graphFig) {
+    public EdgeMetadataSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
+                                          final GraphFig graphFig ) {
         this.keyspace = keyspace;
+        this.cassandraConfig = cassandraConfig;
         this.graphFig = graphFig;
     }
 
@@ -329,14 +332,14 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
 
 
         final RangeBuilder rangeBuilder =
-                new RangeBuilder().setLimit( graphFig.getScanPageSize() ).setStart( search.getLast().or( "" ) );
+                new RangeBuilder().setLimit( cassandraConfig.getScanPageSize() ).setStart( search.getLast().or( "" ) );
 
         RowQuery<ScopedRowKey<OrganizationScope, Id>, String> query =
                 keyspace.prepareQuery( cf ).getKey( sourceKey ).autoPaginate( true )
                         .withColumnRange( rangeBuilder.build() );
 
         return new ColumnNameIterator<String, String>( query, PARSER,
-                    search.getLast().isPresent() );
+                    search.getLast().isPresent(), graphFig.getReadTimeout() );
 
     }
 
@@ -367,14 +370,14 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
 
         //resume from the last if specified.  Also set the range
         final ByteBufferRange searchRange =
-                new RangeBuilder().setLimit( graphFig.getScanPageSize() ).setStart( search.getLast().or( "" ) ).build();
+                new RangeBuilder().setLimit( cassandraConfig.getScanPageSize() ).setStart( search.getLast().or( "" ) ).build();
 
         RowQuery<ScopedRowKey<OrganizationScope, EdgeIdTypeKey>, String> query =
                 keyspace.prepareQuery( cf ).getKey( sourceTypeKey ).autoPaginate( true ).withColumnRange( searchRange );
 
 
        return new ColumnNameIterator<String, String>( query, PARSER,
-                    search.getLast().isPresent() );
+                    search.getLast().isPresent(), graphFig.getReadTimeout() );
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index 4dc3c8a..7e11206 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -41,6 +41,7 @@ import org.apache.usergrid.persistence.collection.cassandra.ColumnTypes;
 import org.apache.usergrid.persistence.collection.migration.Migration;
 import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -59,7 +60,6 @@ import com.google.common.base.Preconditions;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.AbstractComposite;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.CompositeBuilder;
@@ -125,19 +125,22 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
 
     protected final Keyspace keyspace;
-    protected final CassandraConfig graphFig;
+    protected final CassandraConfig cassandraConfig;
+    protected final GraphFig graphFig;
 
 
     @Inject
-    public EdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig graphFig ) {
+    public EdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
+                                  final GraphFig graphFig ) {
         this.keyspace = keyspace;
+        this.cassandraConfig = cassandraConfig;
         this.graphFig = graphFig;
     }
 
 
     @Override
     public MutationBatch writeEdge( final OrganizationScope scope, final Edge edge ) {
-        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( graphFig.getWriteCL() );;
+        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() );;
 
 
         doWrite( scope, edge, new RowOp() {
@@ -156,7 +159,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
     @Override
     public MutationBatch markEdge( final OrganizationScope scope, final Edge edge ) {
-        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( graphFig.getWriteCL() );;
+        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() );;
 
 
         doWrite( scope, edge, new RowOp() {
@@ -175,7 +178,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
     @Override
     public MutationBatch deleteEdge( final OrganizationScope scope, final Edge edge ) {
-        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( graphFig.getWriteCL());
+        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() );
 
 
         doWrite( scope, edge, new RowOp() {
@@ -477,7 +480,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
          * If the edge is present, we need to being seeking from this
          */
 
-        final RangeBuilder rangeBuilder = new RangeBuilder().setLimit( graphFig.getScanPageSize() );
+        final RangeBuilder rangeBuilder = new RangeBuilder().setLimit( cassandraConfig.getScanPageSize() );
 
 
         //set the range into the search
@@ -487,12 +490,12 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
 
         RowQuery<ScopedRowKey<OrganizationScope, R>, DirectedEdge> query =
-                keyspace.prepareQuery( cf ).setConsistencyLevel( graphFig.getReadCL() ).getKey( rowKey ).autoPaginate( true )
+                keyspace.prepareQuery( cf ).setConsistencyLevel( cassandraConfig.getReadCL() ).getKey( rowKey ).autoPaginate( true )
                         .withColumnRange( rangeBuilder.build() );
 
 
         return new ColumnNameIterator<DirectedEdge, MarkedEdge>( query, searcher,
-                    searcher.hasPage() );
+                    searcher.hasPage(), graphFig.getReadTimeout() );
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java
index 907586c..b6d17c6 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java
@@ -4,21 +4,24 @@ package org.apache.usergrid.persistence.graph.serialization.impl.parse;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.query.RowQuery;
+import com.netflix.hystrix.HystrixCommand;
+import com.netflix.hystrix.HystrixCommandGroupKey;
+import com.netflix.hystrix.HystrixCommandProperties;
 
 
 /**
- *
- * Simple iterator that wraps a Row query and will keep executing it's paging until there are no more
- * results to read from cassandra
- *
- *
+ * Simple iterator that wraps a Row query and will keep executing it's paging until there are no more results to read
+ * from cassandra
  */
 public class ColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
 
 
+    private static final HystrixCommandGroupKey GROUP_KEY = HystrixCommandGroupKey.Factory.asKey( "CassRead" );
+
+    private final int executionTimeout;
+
 
     private final RowQuery<?, C> rowQuery;
     private final ColumnParser<C, T> parser;
@@ -26,21 +29,21 @@ public class ColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
     private Iterator<Column<C>> sourceIterator;
 
 
-
-    public ColumnNameIterator( RowQuery<?, C> rowQuery, final ColumnParser<C, T> parser, final boolean skipFirst ) {
+    public ColumnNameIterator( RowQuery<?, C> rowQuery, final ColumnParser<C, T> parser, final boolean skipFirst,
+                               final int executionTimeout ) {
         this.rowQuery = rowQuery.autoPaginate( true );
         this.parser = parser;
+        this.executionTimeout = executionTimeout;
 
         advanceIterator();
 
         //if we are to skip the first element, we need to advance the iterator
-        if(skipFirst && sourceIterator.hasNext()){
+        if ( skipFirst && sourceIterator.hasNext() ) {
             sourceIterator.next();
         }
     }
 
 
-
     @Override
     public Iterator<T> iterator() {
         return this;
@@ -50,8 +53,8 @@ public class ColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
     @Override
     public boolean hasNext() {
         //if we've exhausted this iterator, try to advance to the next set
-        if(sourceIterator.hasNext()){
-           return true;
+        if ( sourceIterator.hasNext() ) {
+            return true;
         }
 
         //advance the iterator, to the next page, there could be more
@@ -64,29 +67,37 @@ public class ColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
     @Override
     public T next() {
 
-        if(!hasNext()){
+        if ( !hasNext() ) {
             throw new NoSuchElementException();
         }
 
-        return parser.parseColumn(sourceIterator.next());
+        return parser.parseColumn( sourceIterator.next() );
     }
 
 
     @Override
     public void remove() {
-       sourceIterator.remove();
+        sourceIterator.remove();
     }
 
 
     /**
      * Execute the query again and set the reuslts
      */
-    private void advanceIterator(){
-        try {
-            sourceIterator = rowQuery.execute().getResult().iterator();
-        }
-        catch ( ConnectionException e ) {
-            throw new RuntimeException("Unable to execute query", e);
-        }
+    private void advanceIterator() {
+
+        //run producing the values within a hystrix command.  This way we'll time out if the read takes too long
+        sourceIterator = new HystrixCommand<Iterator<Column<C>>>( HystrixCommand.Setter.withGroupKey( GROUP_KEY )
+                                                                                .andCommandPropertiesDefaults(
+                                                                                        HystrixCommandProperties
+                                                                                                .Setter()
+                                                                                                .withExecutionIsolationThreadTimeoutInMilliseconds(
+                                                                                                        executionTimeout ) ) ) {
+
+            @Override
+            protected Iterator<Column<C>> run() throws Exception {
+                return rowQuery.execute().getResult().iterator();
+            }
+        }.execute();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1e7a33e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
index c70dc4c..2274868 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
@@ -3,65 +3,43 @@ package org.apache.usergrid.persistence.graph.serialization.impl.parse;
 
 import java.util.Iterator;
 
-import com.netflix.hystrix.HystrixCommand;
-import com.netflix.hystrix.HystrixCommandGroupKey;
-import com.netflix.hystrix.HystrixCommandProperties;
-
 import rx.Observable;
+import rx.Observer;
 import rx.Subscriber;
+import rx.Subscription;
+import rx.subscriptions.Subscriptions;
 
 
 /**
  * Converts an iterator to an observable.  Subclasses need to only implement getting the iterator from the data source.
- * This is used in favor of "Observable.just" when the initial fetch of the iterator will require I/O.  This allows us
- * to wrap the iterator in a deferred invocation to avoid the blocking on construction.
+ * This is used in favor of "Observable.just" when the initial fetch of the iterator will require I/O.  This allows
+ * us to wrap the iterator in a deferred invocation to avoid the blocking on construction.
  */
 public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T> {
 
-    private static final HystrixCommandGroupKey GROUP_KEY = HystrixCommandGroupKey.Factory.asKey( "CassRead" );
-
-    private final int executionTimeout;
-
-
-    protected ObservableIterator( final int executionTimeout ) {
-        this.executionTimeout = executionTimeout;
-    }
-
 
     @Override
     public void call( final Subscriber<? super T> subscriber ) {
 
 
         try {
-            //run producing the values within a hystrix command.  This way we'll time out if the read takes too long
-            new HystrixCommand<Void>( HystrixCommand.Setter.withGroupKey( GROUP_KEY ).andCommandPropertiesDefaults(
-                    HystrixCommandProperties.Setter()
-                                            .withExecutionIsolationThreadTimeoutInMilliseconds( executionTimeout ) ) ) {
+            //get our iterator and push data to the observer
+            Iterator<T> itr = getIterator();
 
 
-                @Override
-                protected Void run() throws Exception {
-                    //get our iterator and push data to the observer
-                    final Iterator<T> itr = getIterator();
+            //while we have items to emit and our subscriber is subscribed, we want to keep emitting items
+            while ( itr.hasNext() && !subscriber.isUnsubscribed()) {
+                subscriber.onNext( itr.next() );
+            }
 
-
-                    //while we have items to emit and our subscriber is subscribed, we want to keep emitting items
-                    while ( itr.hasNext() && !subscriber.isUnsubscribed() ) {
-                        subscriber.onNext( itr.next() );
-                    }
-
-
-                    subscriber.onCompleted();
-
-                    return null;
-                }
-            }.execute();
+            subscriber.onCompleted();
         }
 
         //if any error occurs, we need to notify the observer so it can perform it's own error handling
         catch ( Throwable t ) {
             subscriber.onError( t );
         }
+
     }
 
 


[4/4] git commit: Merge a1b6124ccf6aed8de351693f184302381a777994 into 24425d5e61fabb08e632fcfbdfbb423b7ff6acaa

Posted by sn...@apache.org.
Merge a1b6124ccf6aed8de351693f184302381a777994 into 24425d5e61fabb08e632fcfbdfbb423b7ff6acaa


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/81ecfec8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/81ecfec8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/81ecfec8

Branch: refs/pull/76/merge
Commit: 81ecfec85c551d9ff33f309c6f66df1206f24675
Parents: 24425d5 a1b6124
Author: GERey <gr...@apigee.com>
Authored: Wed Mar 19 23:18:18 2014 +0000
Committer: GERey <gr...@apigee.com>
Committed: Wed Mar 19 23:18:18 2014 +0000

----------------------------------------------------------------------
 .../applications/ApplicationResource.java       |   1 -
 stack/services/pom.xml                          |  16 ++
 .../management/export/ExportServiceImpl.java    |  59 +----
 .../management/export/S3ExportImpl.java         |  10 +-
 .../cassandra/ManagementServiceIT.java          | 243 ++++++++++++++++---
 .../management/cassandra/MockS3ExportImpl.java  |   1 -
 6 files changed, 245 insertions(+), 85 deletions(-)
----------------------------------------------------------------------



[3/4] git commit: Added amazon s3 dependencies to get the file back out of s3 to make sure it was transferred over. Removed smoke from smoke tests, they now properly get the file back from s3. Json validity is handled in other tests so I only verify that

Posted by sn...@apache.org.
Added amazon s3 dependencies to get the file back out of s3 to make sure it was transferred over.
Removed smoke from smoke tests, they now properly get the file back from s3. Json validity is handled in other tests so I only verify that the object exists.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a1b6124c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a1b6124c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a1b6124c

Branch: refs/pull/76/merge
Commit: a1b6124ccf6aed8de351693f184302381a777994
Parents: fe2b3d6
Author: grey <gr...@apigee.com>
Authored: Wed Mar 19 16:18:15 2014 -0700
Committer: grey <gr...@apigee.com>
Committed: Wed Mar 19 16:18:15 2014 -0700

----------------------------------------------------------------------
 stack/services/pom.xml                          | 13 +++++++
 .../management/export/S3ExportImpl.java         | 10 +++--
 .../cassandra/ManagementServiceIT.java          | 39 +++++++++++++++++++-
 .../management/cassandra/MockS3ExportImpl.java  |  1 -
 4 files changed, 58 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a1b6124c/stack/services/pom.xml
----------------------------------------------------------------------
diff --git a/stack/services/pom.xml b/stack/services/pom.xml
index 5c31d43..de02659 100644
--- a/stack/services/pom.xml
+++ b/stack/services/pom.xml
@@ -553,5 +553,18 @@
       <artifactId>jul-to-slf4j</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>4.3.3</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk</artifactId>
+      <version>1.7.3</version>
+    </dependency>
+
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a1b6124c/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
index 863a8f8..c6cfc37 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
@@ -44,9 +44,12 @@ import com.google.inject.Module;
  */
 public class S3ExportImpl implements S3Export {
 
+    String fn;
+
     @Override
     public void copyToS3( final InputStream inputStream, final Map<String,Object> exportInfo, String filename ) {
 
+        fn = filename;
 
         Logger logger = LoggerFactory.getLogger( ExportServiceImpl.class );
         /*won't need any of the properties as I have the export info*/
@@ -87,13 +90,14 @@ public class S3ExportImpl implements S3Export {
         try {
             AsyncBlobStore blobStore = context.getAsyncBlobStore();
             BlobBuilder blobBuilder =
-                    blobStore.blobBuilder( filename ).payload( inputStream ).calculateMD5().contentType( "text/plain" );
+                    blobStore.blobBuilder( fn ).payload( inputStream ).calculateMD5().contentType( "text/plain" );
 
 
             Blob blob = blobBuilder.build();
 
             ListenableFuture<String> futureETag = blobStore.putBlob( bucketName, blob, PutOptions.Builder.multipart() );
 
+
             logger.info( "Uploaded file etag=" + futureETag.get() );
         }
         catch ( Exception e ) {
@@ -102,8 +106,8 @@ public class S3ExportImpl implements S3Export {
     }
 
     @Override
-    public String getFilename () {return "";}
+    public String getFilename () {return fn;}
 
     @Override
-    public void setFilename(String givenName) {;}
+    public void setFilename(String givenName) {fn = givenName;}
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a1b6124c/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ManagementServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ManagementServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ManagementServiceIT.java
index 6c0431b..310d46a 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ManagementServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ManagementServiceIT.java
@@ -65,6 +65,13 @@ import org.apache.usergrid.security.tokens.exceptions.InvalidTokenException;
 import org.apache.usergrid.utils.JsonUtils;
 import org.apache.usergrid.utils.UUIDUtils;
 
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+
 import static org.apache.commons.codec.binary.Base64.encodeBase64URLSafeString;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_CREDENTIALS;
 import static org.apache.usergrid.persistence.cassandra.CassandraService.MANAGEMENT_APPLICATION_ID;
@@ -1473,6 +1480,9 @@ public class ManagementServiceIT {
     @Test //For this test please input your s3 credentials into payload builder.
     public void testIntegration100EntitiesOn() throws Exception {
 
+        //s3client.putObject(new PutObjectRequest(bucketName, keyName, file));
+
+
         S3Export s3Export = new S3ExportImpl();
         ExportService exportService = setup.getExportService();
         HashMap<String, Object> payload = payloadBuilder();
@@ -1507,9 +1517,24 @@ public class ManagementServiceIT {
         when( jobExecution.getJobData() ).thenReturn( jobData );
 
         exportService.doExport( jobExecution );
+        Thread.sleep(1000);
+
+        AWSCredentials myCredentials = new BasicAWSCredentials(
+                System.getProperty( "accessKey" ), System.getProperty("secretKey"));
+        AmazonS3 s3client = new AmazonS3Client(myCredentials);
+        S3Object sd = null;
+        try {
+            sd = s3client.getObject( new GetObjectRequest( System.getProperty( "bucketName" ),
+                s3Export.getFilename()) );
+        }catch(Exception e) {
+            assert(false);
+        }
+        assertNotNull( sd );
+
     }
 
-    @Test //For this test please input your s3 credentials into payload builder.
+    //@Ignore("I'd have to add some overhead such that I loop through all files or keep track of them all ")
+    @Test
     public void testIntegration100EntitiesOnOneOrg() throws Exception {
 
         S3Export s3Export = new S3ExportImpl();
@@ -1567,6 +1592,18 @@ public class ManagementServiceIT {
         when( jobExecution.getJobData() ).thenReturn( jobData );
 
         exportService.doExport( jobExecution );
+
+        AWSCredentials myCredentials = new BasicAWSCredentials(
+                System.getProperty( "accessKey" ), System.getProperty("secretKey"));
+        AmazonS3 s3client = new AmazonS3Client(myCredentials);
+        S3Object sd = null;
+        try {
+            sd = s3client.getObject( new GetObjectRequest( System.getProperty( "bucketName" ),
+                    s3Export.getFilename()) );
+        }catch(Exception e) {
+            assert(false);
+        }
+        assertNotNull( sd );
     }
 
     /*Creates fake payload for testing purposes.*/

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a1b6124c/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ExportImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ExportImpl.java b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ExportImpl.java
index 8ef1c19..a1814fd 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ExportImpl.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ExportImpl.java
@@ -42,7 +42,6 @@ public class MockS3ExportImpl implements S3Export {
         int read = 0;
         byte[] bytes = new byte[1024];
         OutputStream outputStream = null;
-        //FileInputStream fis = new PrintWriter( inputStream );
 
         try {
             outputStream = new FileOutputStream( new File( getFilename() ) );


[2/4] git commit: Added amazon s3 dependencies to get the file back out of s3 to make sure it was transferred over. Removed smoke from smoke tests, they now properly get the file back from s3. Json validity is handled in other tests so I only verify that

Posted by sn...@apache.org.
Added amazon s3 dependencies to get the file back out of s3 to make sure it was transferred over.
Removed smoke from smoke tests, they now properly get the file back from s3. Json validity is handled in other tests so I only verify that the object exists.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a1b6124c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a1b6124c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a1b6124c

Branch: refs/pull/76/head
Commit: a1b6124ccf6aed8de351693f184302381a777994
Parents: fe2b3d6
Author: grey <gr...@apigee.com>
Authored: Wed Mar 19 16:18:15 2014 -0700
Committer: grey <gr...@apigee.com>
Committed: Wed Mar 19 16:18:15 2014 -0700

----------------------------------------------------------------------
 stack/services/pom.xml                          | 13 +++++++
 .../management/export/S3ExportImpl.java         | 10 +++--
 .../cassandra/ManagementServiceIT.java          | 39 +++++++++++++++++++-
 .../management/cassandra/MockS3ExportImpl.java  |  1 -
 4 files changed, 58 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a1b6124c/stack/services/pom.xml
----------------------------------------------------------------------
diff --git a/stack/services/pom.xml b/stack/services/pom.xml
index 5c31d43..de02659 100644
--- a/stack/services/pom.xml
+++ b/stack/services/pom.xml
@@ -553,5 +553,18 @@
       <artifactId>jul-to-slf4j</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>4.3.3</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk</artifactId>
+      <version>1.7.3</version>
+    </dependency>
+
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a1b6124c/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
index 863a8f8..c6cfc37 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
@@ -44,9 +44,12 @@ import com.google.inject.Module;
  */
 public class S3ExportImpl implements S3Export {
 
+    String fn;
+
     @Override
     public void copyToS3( final InputStream inputStream, final Map<String,Object> exportInfo, String filename ) {
 
+        fn = filename;
 
         Logger logger = LoggerFactory.getLogger( ExportServiceImpl.class );
         /*won't need any of the properties as I have the export info*/
@@ -87,13 +90,14 @@ public class S3ExportImpl implements S3Export {
         try {
             AsyncBlobStore blobStore = context.getAsyncBlobStore();
             BlobBuilder blobBuilder =
-                    blobStore.blobBuilder( filename ).payload( inputStream ).calculateMD5().contentType( "text/plain" );
+                    blobStore.blobBuilder( fn ).payload( inputStream ).calculateMD5().contentType( "text/plain" );
 
 
             Blob blob = blobBuilder.build();
 
             ListenableFuture<String> futureETag = blobStore.putBlob( bucketName, blob, PutOptions.Builder.multipart() );
 
+
             logger.info( "Uploaded file etag=" + futureETag.get() );
         }
         catch ( Exception e ) {
@@ -102,8 +106,8 @@ public class S3ExportImpl implements S3Export {
     }
 
     @Override
-    public String getFilename () {return "";}
+    public String getFilename () {return fn;}
 
     @Override
-    public void setFilename(String givenName) {;}
+    public void setFilename(String givenName) {fn = givenName;}
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a1b6124c/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ManagementServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ManagementServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ManagementServiceIT.java
index 6c0431b..310d46a 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ManagementServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ManagementServiceIT.java
@@ -65,6 +65,13 @@ import org.apache.usergrid.security.tokens.exceptions.InvalidTokenException;
 import org.apache.usergrid.utils.JsonUtils;
 import org.apache.usergrid.utils.UUIDUtils;
 
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+
 import static org.apache.commons.codec.binary.Base64.encodeBase64URLSafeString;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_CREDENTIALS;
 import static org.apache.usergrid.persistence.cassandra.CassandraService.MANAGEMENT_APPLICATION_ID;
@@ -1473,6 +1480,9 @@ public class ManagementServiceIT {
     @Test //For this test please input your s3 credentials into payload builder.
     public void testIntegration100EntitiesOn() throws Exception {
 
+        //s3client.putObject(new PutObjectRequest(bucketName, keyName, file));
+
+
         S3Export s3Export = new S3ExportImpl();
         ExportService exportService = setup.getExportService();
         HashMap<String, Object> payload = payloadBuilder();
@@ -1507,9 +1517,24 @@ public class ManagementServiceIT {
         when( jobExecution.getJobData() ).thenReturn( jobData );
 
         exportService.doExport( jobExecution );
+        Thread.sleep(1000);
+
+        AWSCredentials myCredentials = new BasicAWSCredentials(
+                System.getProperty( "accessKey" ), System.getProperty("secretKey"));
+        AmazonS3 s3client = new AmazonS3Client(myCredentials);
+        S3Object sd = null;
+        try {
+            sd = s3client.getObject( new GetObjectRequest( System.getProperty( "bucketName" ),
+                s3Export.getFilename()) );
+        }catch(Exception e) {
+            assert(false);
+        }
+        assertNotNull( sd );
+
     }
 
-    @Test //For this test please input your s3 credentials into payload builder.
+    //@Ignore("I'd have to add some overhead such that I loop through all files or keep track of them all ")
+    @Test
     public void testIntegration100EntitiesOnOneOrg() throws Exception {
 
         S3Export s3Export = new S3ExportImpl();
@@ -1567,6 +1592,18 @@ public class ManagementServiceIT {
         when( jobExecution.getJobData() ).thenReturn( jobData );
 
         exportService.doExport( jobExecution );
+
+        AWSCredentials myCredentials = new BasicAWSCredentials(
+                System.getProperty( "accessKey" ), System.getProperty("secretKey"));
+        AmazonS3 s3client = new AmazonS3Client(myCredentials);
+        S3Object sd = null;
+        try {
+            sd = s3client.getObject( new GetObjectRequest( System.getProperty( "bucketName" ),
+                    s3Export.getFilename()) );
+        }catch(Exception e) {
+            assert(false);
+        }
+        assertNotNull( sd );
     }
 
     /*Creates fake payload for testing purposes.*/

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a1b6124c/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ExportImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ExportImpl.java b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ExportImpl.java
index 8ef1c19..a1814fd 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ExportImpl.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ExportImpl.java
@@ -42,7 +42,6 @@ public class MockS3ExportImpl implements S3Export {
         int read = 0;
         byte[] bytes = new byte[1024];
         OutputStream outputStream = null;
-        //FileInputStream fis = new PrintWriter( inputStream );
 
         try {
             outputStream = new FileOutputStream( new File( getFilename() ) );