You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2013/12/02 22:51:20 UTC

[2/3] git commit: Temp upgrade. Need to upgrade to a pipeline type architecture for stages

Temp upgrade.  Need to upgrade to a pipeline type architecture for stages


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f08f96c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f08f96c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f08f96c1

Branch: refs/heads/two-dot-o
Commit: f08f96c175b5143b04e17c59e46a0407b50a4071
Parents: 6a55ea3
Author: Todd Nine <to...@apache.org>
Authored: Mon Dec 2 14:51:04 2013 -0700
Committer: Todd Nine <to...@apache.org>
Committed: Mon Dec 2 14:51:04 2013 -0700

----------------------------------------------------------------------
 stack/corepersistence/collection/pom.xml        |  7 +++
 .../CollectionManagerFactoryImpl.java           |  3 +-
 .../collection/CollectionManagerImpl.java       | 53 +++++++++++++++++++-
 .../persistence/collection/TimeService.java     | 12 +++++
 .../astynax/AstynaxKeyspaceProvider.java        | 36 +++++++------
 .../collection/mvcc/stage/Commit.java           |  8 ++-
 .../collection/mvcc/stage/Start.java            | 26 ++++++++--
 .../collection/mvcc/stage/Write.java            |  8 ++-
 .../collection/mvcc/stage/WriteListener.java    | 10 ++++
 .../collection/mvcc/stage/WriteStage.java       | 10 +++-
 .../collection/mvcc/verify/AtomicUpdate.java    | 26 ----------
 .../MvccEntitySerializationStrategyImpl.java    |  2 +-
 .../collection/guice/TestCollectionModule.java  |  3 ++
 .../collection/mvcc/stage/StartTest.java        | 51 +++++++++++++++++++
 .../persistence/index/stage/Complete.java       |  6 ++-
 .../usergrid/persistence/index/stage/Start.java |  6 ++-
 .../usergrid/persistence/index/stage/Write.java |  6 ++-
 17 files changed, 216 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f08f96c1/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index 47875bc..7e48ea4 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -43,6 +43,13 @@
       <version>${astynax.version}</version>
     </dependency>
 
+    <!-- bean utils for setting uuids etc -->
+    <dependency>
+        <groupId>commons-beanutils</groupId>
+        <artifactId>commons-beanutils-core</artifactId>
+        <version>1.8.3</version>
+    </dependency>
+
     <!-- Serialization libraries -->
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f08f96c1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerFactoryImpl.java
index a6631e6..3ce0e7b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerFactoryImpl.java
@@ -10,6 +10,7 @@ public class CollectionManagerFactoryImpl implements CollectionManagerFactory {
 
     @Override
     public CollectionManager createCollectionManager( final CollectionContext context ) {
-        return new CollectionManagerImpl( context );
+//        return new CollectionManagerImpl( context );
+        return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f08f96c1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java
index 15ef3ff..679a8cb 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java
@@ -1,9 +1,23 @@
 package org.apache.usergrid.persistence.collection;
 
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.UUID;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.beanutils.BeanUtils;
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Commit;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Start;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Write;
 import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.MutationBatch;
 
 
 /**
@@ -12,17 +26,52 @@ import org.apache.usergrid.persistence.model.entity.Entity;
  */
 public class CollectionManagerImpl implements CollectionManager {
 
+    private static final Logger logger = LoggerFactory.getLogger(CollectionManagerImpl.class);
+
     private final CollectionContext context;
+    private final TimeService timeService;
+    private final Start startStage;
+    private final Write writeStage;
+    private final Commit commitStage;
 
 
-    public CollectionManagerImpl( final CollectionContext context ) {
+    @Inject
+    public CollectionManagerImpl( final CollectionContext context, final TimeService timeService, final Start startStage, final Write writeStage,
+                                  final Commit commitStage ) {
         this.context = context;
+        this.timeService = timeService;
+        this.startStage = startStage;
+        this.writeStage = writeStage;
+        this.commitStage = commitStage;
     }
 
 
     @Override
     public void create( final Entity entity ) {
-        //To change body of implemented methods use File | Settings | File Templates.
+
+        final UUID entityId = UUIDGenerator.newTimeUUID();
+        final UUID version = entityId;
+        final long created = timeService.getTime();
+
+
+        try {
+            BeanUtils.setProperty(entity, "uuid", entityId);
+        }
+        catch ( Throwable t ) {
+            logger.error( "Unable to set uuid.  See nested exception", t );
+            throw new RuntimeException( "Unable to set uuid.  See nested exception", t );
+        }
+
+        entity.setVersion( version );
+        entity.setCreated( created );
+        entity.setUpdated( created );
+
+        MvccEntityImpl mvccEntity = new MvccEntityImpl(context, entityId, version, entity   );
+
+
+        MutationBatch mutation = startStage.performStage(  mvccEntity );
+        writeStage.performStage( mvccEntity );
+        commitStage.performStage( mvccEntity );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f08f96c1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/TimeService.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/TimeService.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/TimeService.java
new file mode 100644
index 0000000..eea1e36
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/TimeService.java
@@ -0,0 +1,12 @@
+package org.apache.usergrid.persistence.collection;
+
+
+/** @author tnine */
+public interface TimeService {
+
+    /**
+     * Get the current time in milliseconds since epoch
+     * @return
+     */
+    long getTime();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f08f96c1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astynax/AstynaxKeyspaceProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astynax/AstynaxKeyspaceProvider.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astynax/AstynaxKeyspaceProvider.java
index 150f221..d03de09 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astynax/AstynaxKeyspaceProvider.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astynax/AstynaxKeyspaceProvider.java
@@ -16,40 +16,44 @@ import com.netflix.astyanax.thrift.ThriftFamilyFactory;
 
 
 /**
- * TODO.  Provide the ability to do a service hook for realtime tuning without the need of a JVM restart
- * This could be done with governator and service discovery
+ * TODO.  Provide the ability to do a service hook for realtime tuning without the need of a JVM restart This could be
+ * done with governator and service discovery
+ *
  * @author tnine
  */
 public class AstynaxKeyspaceProvider implements Provider<Keyspace> {
 
-    /**
-     * The cassandra URL property
-     */
+    /** The cassandra URL property */
     public static final String CASSANDRA_HOSTS = "cassandra.hosts";
     public static final String CASSANDRA_PORT = "cassandra.port";
     public static final String CASSANDRA_CONNECTIONS = "cassandra.connections";
     public static final String CASSANDRA_CLUSTER_NAME = "cassandra.cluster_name";
     public static final String CASSANDRA_VERSION = "cassandra.version";
+    public static final String CASSANDRA_TIMEOUT = "cassandra.timeout";
     public static final String COLLECTIONS_KEYSPACE_NAME = "collections.keyspace";
 
+
     protected final String cassandraHosts;
     protected final int cassandraPort;
     protected final int cassandraConnections;
+    protected final int cassandraTimeout;
     protected final String clusterName;
     protected final String keyspaceName;
     protected final String cassandraVersion;
 
 
     @Inject
-    public AstynaxKeyspaceProvider( @Named( CASSANDRA_HOSTS ) final String cassandraHosts,
-                                    @Named( CASSANDRA_PORT ) final int cassandraPort,
-                                    @Named( CASSANDRA_CONNECTIONS ) final int cassandraConnections,
-                                    @Named( CASSANDRA_CLUSTER_NAME ) final String clusterName,
-                                    @Named( CASSANDRA_VERSION ) final String cassandraVersion,
-                                    @Named( COLLECTIONS_KEYSPACE_NAME ) final String keyspaceName ) {
+    public AstynaxKeyspaceProvider( @Named(CASSANDRA_HOSTS) final String cassandraHosts,
+                                    @Named(CASSANDRA_PORT) final int cassandraPort,
+                                    @Named(CASSANDRA_CONNECTIONS) final int cassandraConnections,
+                                    @Named(CASSANDRA_CLUSTER_NAME) final String clusterName,
+                                    @Named(CASSANDRA_VERSION) final String cassandraVersion,
+                                    @Named(COLLECTIONS_KEYSPACE_NAME) final String keyspaceName,
+                                    @Named( CASSANDRA_TIMEOUT ) final int cassandraTimeout ) {
         this.cassandraHosts = cassandraHosts;
         this.cassandraPort = cassandraPort;
         this.cassandraConnections = cassandraConnections;
+        this.cassandraTimeout = cassandraTimeout;
         this.clusterName = clusterName;
         this.keyspaceName = keyspaceName;
         this.cassandraVersion = cassandraVersion;
@@ -65,14 +69,16 @@ public class AstynaxKeyspaceProvider implements Provider<Keyspace> {
                 new ConnectionPoolConfigurationImpl( "UsergridConnectionPool" ).setPort( cassandraPort )
                                                                                .setMaxConnsPerHost(
                                                                                        cassandraConnections )
-                                                                               .setSeeds( cassandraHosts );
+                                                                               .setSeeds( cassandraHosts )
+                                                                               .setSocketTimeout( cassandraTimeout );
 
         AstyanaxContext<Keyspace> context =
                 new AstyanaxContext.Builder().forCluster( clusterName ).forKeyspace( keyspaceName )
                         /**
                          *TODO tnine Filter this by adding a host supplier.  We will get token discovery from cassandra
                          * but only connect
-                         * to nodes that have been specified.  Good for real time updates of the cass system without adding
+                         * to nodes that have been specified.  Good for real time updates of the cass system without
+                         * adding
                          * load to them during runtime
                          */.withAstyanaxConfiguration( config )
                            .withConnectionPoolConfiguration( connectionPoolConfiguration )
@@ -87,8 +93,8 @@ public class AstynaxKeyspaceProvider implements Provider<Keyspace> {
 
 
     /**
-     * Get runtime options that can be overridden.  TODO: Make this an interface and somehow hook it into Guice auotmagically
-     * @return
+     * Get runtime options that can be overridden.  TODO: Make this an interface and somehow hook it into Guice
+     * auotmagically
      */
     public static String[] getRuntimeOptions() {
         return new String[] {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f08f96c1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Commit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Commit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Commit.java
index 64d3c5d..019c497 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Commit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Commit.java
@@ -3,6 +3,10 @@ package org.apache.usergrid.persistence.collection.mvcc.stage;
 
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.OperationResult;
+
 
 /**
  * This phase should invoke any finalization, and mark the entity as committed in the data store before returning
@@ -12,7 +16,7 @@ public class Commit implements WriteStage {
 
 
     @Override
-    public MvccEntity performStage( final MvccEntity entity ) {
-        return entity;
+    public MutationBatch performStage( final MvccEntity entity ) {
+        return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f08f96c1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Start.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Start.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Start.java
index d7f85ae..e66eb52 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Start.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Start.java
@@ -2,23 +2,43 @@ package org.apache.usergrid.persistence.collection.mvcc.stage;
 
 
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 
 /**
  * This is the first stage and should be invoked immediately when a write is started.  It should persist the start of a
  * new write in the data store for a checkpoint and recovery
  */
+@Singleton
 public class Start implements WriteStage {
 
+    private final MvccLogEntrySerializationStrategy logStrategy;
     /**
      * Create a new stage with the current context
+     * @param logStrategy
      */
-    protected Start( ){
+    @Inject
+    protected Start( final MvccLogEntrySerializationStrategy logStrategy ){
+        this.logStrategy = logStrategy;
     }
 
 
     @Override
-    public MvccEntity performStage( final MvccEntity entity ) {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    public MutationBatch performStage( final MvccEntity entity )  {
+        final MvccLogEntry startEntry = new MvccLogEntryImpl(entity.getContext(), entity.getUuid(), entity.getVersion(),Stage.ACTIVE);
+
+        MutationBatch write = logStrategy.write( startEntry );
+
+        return write;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f08f96c1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Write.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Write.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Write.java
index b67ca31..690416f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Write.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Write.java
@@ -3,6 +3,10 @@ package org.apache.usergrid.persistence.collection.mvcc.stage;
 
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.OperationResult;
+
 
 /**
  * This phase should execute the serialization to the data store.
@@ -17,9 +21,9 @@ public class Write implements WriteStage {
 
 
     @Override
-    public MvccEntity performStage( final MvccEntity entity) {
+    public MutationBatch performStage( final MvccEntity entity ) {
 
 
-        return entity;
+        return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f08f96c1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteListener.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteListener.java
new file mode 100644
index 0000000..4a78226
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteListener.java
@@ -0,0 +1,10 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.netflix.astyanax.connectionpool.OperationResult;
+
+
+/** @author tnine */
+public interface WriteListener extends ListenableFuture<OperationResult<Void>> {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f08f96c1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
index cda707f..1960fdb 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
@@ -3,6 +3,11 @@ package org.apache.usergrid.persistence.collection.mvcc.stage;
 
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
 
 /**
  * The possible stages in our write flow.
@@ -11,10 +16,11 @@ public interface WriteStage {
 
     /**
      * Run this stage.  This will return the MvccEntity that should be returned or passed to the next stage
+     *
      * @param entity The entity to use in this stage
      *
-     * @return The MvccEntity to use for the next sgage
+     * @return The asynchronous listener to signal success
      *
      */
-    public MvccEntity performStage( MvccEntity entity);
+    public MutationBatch performStage( MvccEntity entity ) throws ConnectionException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f08f96c1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/verify/AtomicUpdate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/verify/AtomicUpdate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/verify/AtomicUpdate.java
deleted file mode 100644
index d844f3b..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/verify/AtomicUpdate.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.verify;
-
-
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-
-
-/**
- * Interface to test if we can perform atomic operations
- * <p/>
- * Note This will probably require a new WriteStage that is after start, which is rollback
- */
-public interface AtomicUpdate
-{
-
-    /** Signal that we are starting update */
-    public void startUpdate( MvccEntity context );
-
-    /**
-     * Try the commit.
-     *
-     * @return true if we can proceed.  False if we cannot
-     */
-    public boolean tryCommit( MvccEntity context );
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f08f96c1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategyImpl.java
index 6030866..afec2f8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategyImpl.java
@@ -173,7 +173,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
     private MutationBatch doWrite( UUID entityId, RowOp op ) {
         final MutationBatch batch = keyspace.prepareMutationBatch();
 
-        op.doOp( batch.withRow( CF_ENTITY_DATA, entityId ) );
+            op.doOp( batch.withRow( CF_ENTITY_DATA, entityId ) );
 
         return batch;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f08f96c1/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
index 3ea6e62..1d56a5d 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
@@ -53,6 +53,9 @@ public class TestCollectionModule extends AbstractModule {
         configProperties.put( AstynaxKeyspaceProvider.CASSANDRA_HOSTS, "localhost" );
         configProperties.put( AstynaxKeyspaceProvider.CASSANDRA_PORT, "" + CassandraRule.THRIFT_PORT );
         configProperties.put( AstynaxKeyspaceProvider.CASSANDRA_CONNECTIONS, "10" );
+
+        //time out after 5 seconds
+        configProperties.put( AstynaxKeyspaceProvider.CASSANDRA_TIMEOUT, "5000" );
         configProperties.put( AstynaxKeyspaceProvider.CASSANDRA_CLUSTER_NAME, "Usergrid" );
         configProperties.put( AstynaxKeyspaceProvider.CASSANDRA_VERSION, "1.2" );
         configProperties.put( AstynaxKeyspaceProvider.COLLECTIONS_KEYSPACE_NAME, "Usergrid_Collections" );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f08f96c1/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StartTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StartTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StartTest.java
new file mode 100644
index 0000000..76d6326
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/StartTest.java
@@ -0,0 +1,51 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import java.util.UUID;
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.CollectionContextImpl;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import static org.mockito.Mockito.mock;
+
+
+/** @author tnine */
+public class StartTest {
+
+    @Test
+    public void testStartStage(){
+
+        final MvccLogEntrySerializationStrategy logStrategy = mock(MvccLogEntrySerializationStrategy.class);
+
+        Start start = new Start( logStrategy );
+
+        //set up the data
+
+        final UUID applicationId = UUIDGenerator.newTimeUUID();
+
+        final UUID ownerId = UUIDGenerator.newTimeUUID();
+
+        final UUID entityId = UUIDGenerator.newTimeUUID();
+
+        final UUID version = UUIDGenerator.newTimeUUID();
+
+        final String name = "tests";
+
+
+        CollectionContextImpl collection = new CollectionContextImpl( applicationId, ownerId, name );
+
+
+        Entity entity = new Entity();
+
+        MvccEntityImpl mvccEntity = new MvccEntityImpl(collection, entityId, version, entity  );
+
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f08f96c1/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java
index da051b6..17903fe 100644
--- a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java
+++ b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Complete.java
@@ -4,6 +4,10 @@ package org.apache.usergrid.persistence.index.stage;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.OperationResult;
+
 
 /**
  *
@@ -14,7 +18,7 @@ public class Complete implements WriteStage
 {
 
     @Override
-    public MvccEntity performStage( final MvccEntity entity ) {
+    public MutationBatch performStage( final MvccEntity entity ) {
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f08f96c1/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java
index a89e2af..4270629 100644
--- a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java
+++ b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Start.java
@@ -4,13 +4,17 @@ package org.apache.usergrid.persistence.index.stage;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.OperationResult;
+
 
 /** This state should signal an index update has started */
 public class Start implements WriteStage
 {
 
     @Override
-    public MvccEntity performStage( final MvccEntity entity ) {
+    public MutationBatch performStage( final MvccEntity entity ) {
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f08f96c1/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java
index 8e75812..c5d857b 100644
--- a/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java
+++ b/stack/corepersistence/index/src/main/java/org/apache/usergrid/persistence/index/stage/Write.java
@@ -4,13 +4,17 @@ package org.apache.usergrid.persistence.index.stage;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.stage.WriteStage;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.OperationResult;
+
 
 /** This state should perform an update of the index. */
 public class Write implements WriteStage
 {
 
     @Override
-    public MvccEntity performStage( final MvccEntity entity ) {
+    public MutationBatch performStage( final MvccEntity entity ) {
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
 }