You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2013/11/27 23:24:44 UTC
[3/3] git commit: Initial import of 2.0 core persistence code.
Initial import of 2.0 core persistence code.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ac634a1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ac634a1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ac634a1d
Branch: refs/heads/two-dot-o
Commit: ac634a1d05e3f8eedc99eb06f5d7b97c3b437d23
Parents: a30bf50
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Nov 27 15:08:46 2013 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Nov 27 15:08:46 2013 -0700
----------------------------------------------------------------------
stack/corepersistence/.gitignore | 7 +
stack/corepersistence/README.md | 73 ++++
stack/corepersistence/collection/pom.xml | 113 ++++++
.../collection/CollecitonManagerImpl.java | 15 +
.../collection/CollectionContext.java | 25 ++
.../collection/CollectionContextImpl.java | 82 +++++
.../collection/CollectionManager.java | 41 +++
.../collection/CollectionManagerFactory.java | 18 +
.../CollectionManagerFactoryImpl.java | 15 +
.../collection/CollectionManagerImpl.java | 45 +++
.../astynax/AstynaxKeyspaceProvider.java | 99 ++++++
.../collection/guice/CollectionModule.java | 64 ++++
.../collection/guice/PropertyUtils.java | 60 ++++
.../migration/CollectionColumnFamily.java | 50 +++
.../collection/migration/Migration.java | 16 +
.../migration/MigrationException.java | 18 +
.../collection/migration/MigrationManager.java | 15 +
.../migration/MigrationManagerImpl.java | 198 +++++++++++
.../collection/mvcc/entity/MvccEntity.java | 38 ++
.../collection/mvcc/entity/MvccEntityImpl.java | 95 +++++
.../collection/mvcc/entity/MvccLogEntry.java | 38 ++
.../mvcc/entity/MvccLogEntryImpl.java | 90 +++++
.../collection/mvcc/entity/Stage.java | 67 ++++
.../mvcc/event/PostProcessListener.java | 24 ++
.../collection/mvcc/stage/Commit.java | 18 +
.../collection/mvcc/stage/MvccStrategy.java | 42 +++
.../collection/mvcc/stage/Start.java | 24 ++
.../collection/mvcc/stage/Write.java | 25 ++
.../collection/mvcc/stage/WriteStage.java | 20 ++
.../collection/mvcc/verify/AtomicUpdate.java | 26 ++
.../mvcc/verify/OptimisticUpdate.java | 22 ++
.../collection/mvcc/verify/UniqueUpdate.java | 27 ++
.../MvccEntitySerializationStrategy.java | 77 +++++
.../MvccEntitySerializationStrategyImpl.java | 252 ++++++++++++++
.../MvccLogEntrySerializationStrategy.java | 59 ++++
.../MvccLogEntrySerializationStrategyImpl.java | 219 ++++++++++++
.../CollectionManagerFactoryTest.java | 8 +
.../collection/guice/TestCollectionModule.java | 103 ++++++
.../collection/mvcc/entity/StageTest.java | 91 +++++
...MvccEntitySerializationStrategyImplTest.java | 343 +++++++++++++++++++
...ccLogEntrySerializationStrategyImplTest.java | 150 ++++++++
.../serialization/SerializationComparison.java | 182 ++++++++++
.../src/test/resources/cassandra.properties | 5 +
stack/corepersistence/index/pom.xml | 22 ++
.../usergrid/persistence/index/Query.java | 6 +
.../usergrid/persistence/index/QueryEngine.java | 22 ++
.../persistence/index/QueryEngineFactory.java | 21 ++
.../usergrid/persistence/index/Results.java | 16 +
.../persistence/index/stage/Complete.java | 21 ++
.../usergrid/persistence/index/stage/Start.java | 16 +
.../usergrid/persistence/index/stage/Write.java | 17 +
stack/corepersistence/model/pom.xml | 38 ++
.../persistence/model/entity/Entity.java | 149 ++++++++
.../persistence/model/field/AbstractField.java | 85 +++++
.../persistence/model/field/ArrayField.java | 27 ++
.../persistence/model/field/BooleanField.java | 19 +
.../model/field/ByteBufferField.java | 29 ++
.../persistence/model/field/DoubleField.java | 20 ++
.../model/field/EntityObjectField.java | 20 ++
.../usergrid/persistence/model/field/Field.java | 28 ++
.../persistence/model/field/IntegerField.java | 20 ++
.../persistence/model/field/ListField.java | 28 ++
.../persistence/model/field/LocationField.java | 20 ++
.../persistence/model/field/LongField.java | 23 ++
.../persistence/model/field/SetField.java | 28 ++
.../persistence/model/field/StringField.java | 17 +
.../persistence/model/field/UUIDField.java | 20 ++
.../persistence/model/util/UUIDGenerator.java | 106 ++++++
.../persistence/model/value/EntityObject.java | 40 +++
.../persistence/model/value/Location.java | 32 ++
.../persistence/model/field/EntityTest.java | 117 +++++++
stack/corepersistence/perftest/Readme.md | 52 +++
stack/corepersistence/perftest/pom.xml | 150 ++++++++
.../apache/usergrid/perftest/NoopPerftest.java | 54 +++
.../usergrid/perftest/NoopPerftestModule.java | 34 ++
.../org/apache/usergrid/perftest/Perftest.java | 11 +
.../usergrid/perftest/PerftestModule.java | 47 +++
.../usergrid/perftest/PerftestRunner.java | 197 +++++++++++
.../perftest/PerftestServletConfig.java | 50 +++
.../usergrid/perftest/TestModuleLoader.java | 95 +++++
.../apache/usergrid/perftest/logging/Log.java | 19 +
.../perftest/logging/Slf4jMembersInjector.java | 46 +++
.../perftest/logging/Slf4jTypeListener.java | 44 +++
.../perftest/rest/CallStatsSnapshot.java | 91 +++++
.../perftest/rest/PerftestResetResource.java | 64 ++++
.../perftest/rest/PerftestStartResource.java | 64 ++++
.../perftest/rest/PerftestStatsResource.java | 53 +++
.../perftest/rest/PerftestStatusResource.java | 62 ++++
.../perftest/rest/PerftestStopResource.java | 59 ++++
.../usergrid/perfteststats/CallStats.java | 90 +++++
.../src/main/resources/config.properties | 22 ++
.../src/main/resources/log4j.properties | 20 ++
.../perftest/src/main/webapp/WEB-INF/web.xml | 27 ++
stack/corepersistence/pom.xml | 30 ++
stack/corepersistence/testutils/pom.xml | 43 +++
.../persistence/test/AvailablePortFinder.java | 187 ++++++++++
.../persistence/test/CassandraRule.java | 86 +++++
.../src/main/resources/log4j-server.properties | 35 ++
.../testutils/src/main/resources/log4j.xml | 16 +
99 files changed, 5754 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/.gitignore
----------------------------------------------------------------------
diff --git a/stack/corepersistence/.gitignore b/stack/corepersistence/.gitignore
new file mode 100644
index 0000000..7e23415
--- /dev/null
+++ b/stack/corepersistence/.gitignore
@@ -0,0 +1,7 @@
+.idea
+atlassian-ide-plugin.xml
+target
+**/target
+*.iml
+*.swp
+*.log
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/README.md
----------------------------------------------------------------------
diff --git a/stack/corepersistence/README.md b/stack/corepersistence/README.md
new file mode 100644
index 0000000..5f9a44b
--- /dev/null
+++ b/stack/corepersistence/README.md
@@ -0,0 +1,73 @@
+Core Persistence
+===============
+
+A Framework to provide basic component services for persistence frameworks
+
+
+Data Templates
+==============
+
+Below are the basic data templates this system should support
+
+
+Collections
+-----------
+
+A collection storage and indexing framework. Properties should be secondary indexed, and should be able to be queried efficiently.
+
+
+*MVCC Semantics*
+
+Transaction/Checkpoint logging on indexing.
+Consistent data view. Can potentially be for long running jobs.
+Optimistic Locking (maybe)
+Atomic updates (maybe)
+
+*Operation Chaining* (maybe)
+
+Possible ability to define an operation context where a set of all writes must either succeed or fail as a group
+(can probably be done with MVCC)
+
+
+
+
+Graphs
+-----------
+
+A system for creating relationships between collection entities. The directed edges can be named (a type) and
+an index query can be executed on those edges.
+
+
+
+Maps
+-----------
+
+A map that can store hierarchical keys. Shorter keys are better. This should allow for range "scanning". I.E.
+
+key1: => org1/app1/env1/version1
+
+key2: => org1/app1/env2/version1
+
+Operations:
+
+ Put by key
+ Get by key
+ Iterate by scan
+ Delete by key
+
+
+Get me all keys present in org1/app1.
+
+Start => org1/app1
+
+End => org1/app1 inclusive
+
+-----------
+===========
+
+A write through distributed cache backed by the cassandra map implementation for persistence
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
new file mode 100644
index 0000000..47875bc
--- /dev/null
+++ b/stack/corepersistence/collection/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>persistence</artifactId>
+ <groupId>org.apache.usergrid</groupId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <description>The module for handling all collection I/O</description>
+ <properties>
+ <guice.version>3.0</guice.version>
+ </properties>
+
+ <artifactId>collection</artifactId>
+
+ <dependencies>
+
+ <!-- Depends on the basic models -->
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>model</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+
+ <!-- include our cassandra client -->
+ <dependency>
+ <groupId>com.netflix.astyanax</groupId>
+ <artifactId>astyanax-core</artifactId>
+ <version>${astynax.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.netflix.astyanax</groupId>
+ <artifactId>astyanax-thrift</artifactId>
+ <version>${astynax.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.netflix.astyanax</groupId>
+ <artifactId>astyanax-cassandra</artifactId>
+ <version>${astynax.version}</version>
+ </dependency>
+
+ <!-- Serialization libraries -->
+
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-smile</artifactId>
+ <version>1.9.13</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ <version>2.22</version>
+ </dependency>
+
+ <!-- helper serializers for kryo -->
+ <dependency>
+ <groupId>de.javakaffee</groupId>
+ <artifactId>kryo-serializers</artifactId>
+ <version>0.26</version>
+ </dependency>
+
+ <!-- Google Guice -->
+ <dependency>
+ <groupId>com.google.inject</groupId>
+ <artifactId>guice</artifactId>
+ <version>${guice.version}</version>
+ </dependency>
+
+ <!-- guice helper removed for now, investigate using this later, it's getting in the way right now while learning guice -->
+ <!--<dependency>
+ <groupId>com.netflix.governator</groupId>
+ <artifactId>governator</artifactId>
+ <version>1.2.3</version>
+ </dependency> -->
+
+ <dependency>
+ <groupId>com.google.inject.extensions</groupId>
+ <artifactId>guice-multibindings</artifactId>
+ <version>${guice.version}</version>
+ </dependency>
+
+ <!-- Google Guice Integration Test Injectors -->
+
+ <dependency>
+ <groupId>com.google.guiceberry</groupId>
+ <artifactId>guiceberry</artifactId>
+ <version>3.3.1</version>
+ <scope>test</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>testutils</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollecitonManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollecitonManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollecitonManagerImpl.java
new file mode 100644
index 0000000..5a2f708
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollecitonManagerImpl.java
@@ -0,0 +1,15 @@
+package org.apache.usergrid.persistence.collection;
+
+
+/**
+ * @author tnine
+ */
+public class CollecitonManagerImpl {
+
+ private final CollectionContext context;
+
+
+ public CollecitonManagerImpl( final CollectionContext context ) {
+ this.context = context;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionContext.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionContext.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionContext.java
new file mode 100644
index 0000000..dd30d3a
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionContext.java
@@ -0,0 +1,25 @@
+package org.apache.usergrid.persistence.collection;
+
+
+import java.util.UUID;
+
+
+/**
+ * A context to use when creating the collection manager. Typically, this would be something like an application, or an
+ * organization. Some context that "owns" the collection
+ */
+public interface CollectionContext
+{
+
+ /** @return The application that will contain this collection */
+ public UUID getApplication();
+
+ /**
+ * @return A uuid that is unique to this context. It can be any uuid (time uuid preferred). Usually an application
+ * Id, but could be an entity Id that is the parent of another collection
+ */
+ public UUID getOwner();
+
+ /** @return The name of the collection. This should be singular, NO PLURALIZATION!!!!!! */
+ public String getName();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionContextImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionContextImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionContextImpl.java
new file mode 100644
index 0000000..371bde9
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionContextImpl.java
@@ -0,0 +1,82 @@
+package org.apache.usergrid.persistence.collection;
+
+
+import java.util.UUID;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Simple impl of hte collection context
+ * @author tnine
+ */
+public class CollectionContextImpl implements CollectionContext {
+
+ private final UUID applicationId;
+ private final UUID ownerId;
+ private final String name;
+
+
+ public CollectionContextImpl( final UUID applicationId, final UUID ownerId, final String name ) {
+ Preconditions.checkNotNull( applicationId , "applicationId is required");
+ Preconditions.checkNotNull( ownerId , "ownerId is required");
+ Preconditions.checkNotNull( name , "name is required");
+
+
+ this.applicationId = applicationId;
+ this.ownerId = ownerId;
+ this.name = name;
+ }
+
+
+ @Override
+ public UUID getApplication() {
+ return applicationId;
+ }
+
+
+ @Override
+ public UUID getOwner() {
+ return ownerId;
+ }
+
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() ) {
+ return false;
+ }
+
+ final CollectionContextImpl that = ( CollectionContextImpl ) o;
+
+ if ( !applicationId.equals( that.applicationId ) ) {
+ return false;
+ }
+ if ( !name.equals( that.name ) ) {
+ return false;
+ }
+ if ( !ownerId.equals( that.ownerId ) ) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ int result = applicationId.hashCode();
+ result = 31 * result + ownerId.hashCode();
+ result = 31 * result + name.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManager.java
new file mode 100644
index 0000000..570a1e4
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManager.java
@@ -0,0 +1,41 @@
+package org.apache.usergrid.persistence.collection;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/**
+ *
+ *
+ * @author: tnine
+ *
+ */
+public interface CollectionManager
+{
+
+ /**
+ * Create the entity in the collection. Only use for entities your are sure are new.
+ *
+ * @param entity The entity to update
+ */
+ public void create( Entity entity );
+
+ /**
+ * Update the entity with the given fields.
+ *
+ * @param entity The entity properties to update
+ */
+ public void update( Entity entity );
+
+ /** Delete the entity and remove it's indexes with the given entity id */
+ public void delete( UUID entityId );
+
+ /**
+ * Load the entity with the given entity Id
+ * @param entityId
+ * @return
+ */
+ public Entity load(UUID entityId);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerFactory.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerFactory.java
new file mode 100644
index 0000000..7791a0b
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerFactory.java
@@ -0,0 +1,18 @@
+package org.apache.usergrid.persistence.collection;
+
+
+/** A basic factory that creates a collection manager with the given context */
+public interface CollectionManagerFactory
+{
+
+ /**
+ * Create a new CollectionManager for the given context. The CollectionManager can safely be used on the current
+ * thread and will cache responses. The returned instance should not be shared among threads it will not be
+ * guaranteed to be thread safe
+ *
+ * @param context The context to use when creating the collection manager
+ *
+ * @return The collection manager to perform operations within the provided context
+ */
+ public CollectionManager createCollectionManager( CollectionContext context );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerFactoryImpl.java
new file mode 100644
index 0000000..a6631e6
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerFactoryImpl.java
@@ -0,0 +1,15 @@
+package org.apache.usergrid.persistence.collection;
+
+
+/**
+ * Basic Imple
+ * @author tnine
+ */
+public class CollectionManagerFactoryImpl implements CollectionManagerFactory {
+
+
+ @Override
+ public CollectionManager createCollectionManager( final CollectionContext context ) {
+ return new CollectionManagerImpl( context );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java
new file mode 100644
index 0000000..15ef3ff
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionManagerImpl.java
@@ -0,0 +1,45 @@
+package org.apache.usergrid.persistence.collection;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/**
+ * Simple implementation. Should perform
+ * @author tnine
+ */
+public class CollectionManagerImpl implements CollectionManager {
+
+ private final CollectionContext context;
+
+
+ public CollectionManagerImpl( final CollectionContext context ) {
+ this.context = context;
+ }
+
+
+ @Override
+ public void create( final Entity entity ) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+ @Override
+ public void update( final Entity entity ) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+ @Override
+ public void delete( final UUID entityId ) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+ @Override
+ public Entity load( final UUID entityId ) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astynax/AstynaxKeyspaceProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astynax/AstynaxKeyspaceProvider.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astynax/AstynaxKeyspaceProvider.java
new file mode 100644
index 0000000..150f221
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astynax/AstynaxKeyspaceProvider.java
@@ -0,0 +1,99 @@
+package org.apache.usergrid.persistence.collection.astynax;
+
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.name.Named;
+import com.netflix.astyanax.AstyanaxConfiguration;
+import com.netflix.astyanax.AstyanaxContext;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.ConnectionPoolConfiguration;
+import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
+import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
+import com.netflix.astyanax.connectionpool.impl.Slf4jConnectionPoolMonitorImpl;
+import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
+import com.netflix.astyanax.thrift.ThriftFamilyFactory;
+
+
+/**
+ * TODO. Provide the ability to do a service hook for realtime tuning without the need of a JVM restart
+ * This could be done with governator and service discovery
+ * @author tnine
+ */
+public class AstynaxKeyspaceProvider implements Provider<Keyspace> {
+
+ /**
+ * The cassandra URL property
+ */
+ public static final String CASSANDRA_HOSTS = "cassandra.hosts";
+ public static final String CASSANDRA_PORT = "cassandra.port";
+ public static final String CASSANDRA_CONNECTIONS = "cassandra.connections";
+ public static final String CASSANDRA_CLUSTER_NAME = "cassandra.cluster_name";
+ public static final String CASSANDRA_VERSION = "cassandra.version";
+ public static final String COLLECTIONS_KEYSPACE_NAME = "collections.keyspace";
+
+ protected final String cassandraHosts;
+ protected final int cassandraPort;
+ protected final int cassandraConnections;
+ protected final String clusterName;
+ protected final String keyspaceName;
+ protected final String cassandraVersion;
+
+
+ @Inject
+ public AstynaxKeyspaceProvider( @Named( CASSANDRA_HOSTS ) final String cassandraHosts,
+ @Named( CASSANDRA_PORT ) final int cassandraPort,
+ @Named( CASSANDRA_CONNECTIONS ) final int cassandraConnections,
+ @Named( CASSANDRA_CLUSTER_NAME ) final String clusterName,
+ @Named( CASSANDRA_VERSION ) final String cassandraVersion,
+ @Named( COLLECTIONS_KEYSPACE_NAME ) final String keyspaceName ) {
+ this.cassandraHosts = cassandraHosts;
+ this.cassandraPort = cassandraPort;
+ this.cassandraConnections = cassandraConnections;
+ this.clusterName = clusterName;
+ this.keyspaceName = keyspaceName;
+ this.cassandraVersion = cassandraVersion;
+ }
+
+
+ @Override
+ public Keyspace get() {
+ AstyanaxConfiguration config = new AstyanaxConfigurationImpl().setDiscoveryType( NodeDiscoveryType.TOKEN_AWARE )
+ .setTargetCassandraVersion( cassandraVersion );
+
+ ConnectionPoolConfiguration connectionPoolConfiguration =
+ new ConnectionPoolConfigurationImpl( "UsergridConnectionPool" ).setPort( cassandraPort )
+ .setMaxConnsPerHost(
+ cassandraConnections )
+ .setSeeds( cassandraHosts );
+
+ AstyanaxContext<Keyspace> context =
+ new AstyanaxContext.Builder().forCluster( clusterName ).forKeyspace( keyspaceName )
+ /**
+ *TODO tnine Filter this by adding a host supplier. We will get token discovery from cassandra
+ * but only connect
+ * to nodes that have been specified. Good for real time updates of the cass system without adding
+ * load to them during runtime
+ */.withAstyanaxConfiguration( config )
+ .withConnectionPoolConfiguration( connectionPoolConfiguration )
+ .withConnectionPoolMonitor( new Slf4jConnectionPoolMonitorImpl() )
+ .buildKeyspace( ThriftFamilyFactory.getInstance() );
+
+ context.start();
+
+
+ return context.getClient();
+ }
+
+
+ /**
+ * Get runtime options that can be overridden. TODO: Make this an interface and somehow hook it into Guice auotmagically
+ * @return
+ */
+ public static String[] getRuntimeOptions() {
+ return new String[] {
+ CASSANDRA_HOSTS, CASSANDRA_PORT, CASSANDRA_CONNECTIONS, CASSANDRA_CLUSTER_NAME, CASSANDRA_VERSION,
+ COLLECTIONS_KEYSPACE_NAME
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
new file mode 100644
index 0000000..c83fe4f
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -0,0 +1,64 @@
+package org.apache.usergrid.persistence.collection.guice;
+
+
+import org.apache.usergrid.persistence.collection.astynax.AstynaxKeyspaceProvider;
+import org.apache.usergrid.persistence.collection.migration.Migration;
+import org.apache.usergrid.persistence.collection.migration.MigrationManager;
+import org.apache.usergrid.persistence.collection.migration.MigrationManagerImpl;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategyImpl;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategyImpl;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.multibindings.Multibinder;
+import com.google.inject.name.Names;
+import com.netflix.astyanax.Keyspace;
+
+
+/**
+ * Simple module for wiring our collection api
+ *
+ * @author tnine
+ */
+public class CollectionModule extends AbstractModule {
+
+ /**
+ * The location of the properties file
+ */
+ private static final String CASS_PROPS = "cassandra.properties";
+
+
+ @Override
+ protected void configure() {
+
+
+ //bind our cassandra properties
+ Names.bindProperties( binder(), PropertyUtils.loadFromClassPath( CASS_PROPS ) );
+
+ //Load the cassandra url if set on the system properties
+ Names.bindProperties( binder(),
+ PropertyUtils.loadSystemProperties( AstynaxKeyspaceProvider.getRuntimeOptions() ) );
+
+ //bind our keyspace to the AstynaxKeyspaceProvider
+ bind( Keyspace.class ).toProvider( AstynaxKeyspaceProvider.class );
+
+ //bind our migration manager
+ bind(MigrationManager.class).to( MigrationManagerImpl.class );
+
+
+ //bind the serialization strategies
+
+ bind( MvccEntitySerializationStrategy.class ).to( MvccEntitySerializationStrategyImpl.class );
+
+
+ bind( MvccLogEntrySerializationStrategy.class ).to( MvccLogEntrySerializationStrategyImpl.class );
+
+
+ //do multibindings for migrations
+ Multibinder<Migration> uriBinder = Multibinder.newSetBinder( binder(), Migration.class );
+
+ uriBinder.addBinding().to( MvccEntitySerializationStrategyImpl.class );
+ uriBinder.addBinding().to( MvccLogEntrySerializationStrategyImpl.class );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/PropertyUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/PropertyUtils.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/PropertyUtils.java
new file mode 100644
index 0000000..a427135
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/PropertyUtils.java
@@ -0,0 +1,60 @@
+package org.apache.usergrid.persistence.collection.guice;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+
+/**
+ * Simple Utility class to get properties
+ *
+ * @author tnine
+ */
+public class PropertyUtils {
+
+
+ /**
+ * Load the properties file from the classpath. Throws IOException if they cannot be loaded
+ */
+ public static Properties loadFromClassPath( String propsFile ) {
+ InputStream in = PropertyUtils.class.getClassLoader().getResourceAsStream( propsFile );
+
+ if ( in == null ) {
+ throw new RuntimeException( new IOException(
+ String.format( "Could not find properties file on the classpath at location %s", propsFile ) ) );
+ }
+
+ Properties props = new Properties();
+
+ try {
+ props.load( in );
+ }
+ catch ( IOException e ) {
+ throw new RuntimeException( e );
+ }
+
+ return props;
+ }
+
+
+ /**
+ * Load each of the defined properties into a system property and return them. If a system property is not found,
+ * it will be ignored
+ */
+ public static Properties loadSystemProperties( String... properties ) {
+
+ Properties props = new Properties();
+
+ for ( String propName : properties ) {
+ String propValue = System.getProperty( propName );
+
+ if ( propValue != null ) {
+ props.put( propName, propValue );
+ }
+ }
+
+
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/CollectionColumnFamily.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/CollectionColumnFamily.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/CollectionColumnFamily.java
new file mode 100644
index 0000000..a8036bd
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/CollectionColumnFamily.java
@@ -0,0 +1,50 @@
+package org.apache.usergrid.persistence.collection.migration;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.netflix.astyanax.model.ColumnFamily;
+
+
+/**
+ * Bean wrapper for column family information
+ *
+ * @author tnine
+ */
+public class CollectionColumnFamily {
+
+ public static final String COMPARATOR_TYPE = "comparator_type";
+ public static final String REVERSED = "reversed";
+ public static final String READ_REPAIR_CHANCE = "read_repair_chance";
+
+
+ private final ColumnFamily columnFamily;
+ private final String comparator;
+ private final boolean reversed;
+
+
+ public CollectionColumnFamily( final ColumnFamily columnFamily, final String comparator, final boolean reversed ) {
+ this.columnFamily = columnFamily;
+ this.comparator = comparator;
+ this.reversed = reversed;
+ }
+
+
+ public Map<String, Object> getOptions(){
+
+ Map<String, Object> options = new HashMap<String, Object>();
+ options.put( COMPARATOR_TYPE, comparator );
+ options.put( REVERSED, reversed );
+
+ //always use 10% read repair chance!
+ options.put( READ_REPAIR_CHANCE, 0.1d );
+
+ return options;
+ }
+
+
+ public ColumnFamily getColumnFamily() {
+ return columnFamily;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/Migration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/Migration.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/Migration.java
new file mode 100644
index 0000000..f446031
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/Migration.java
@@ -0,0 +1,16 @@
+package org.apache.usergrid.persistence.collection.migration;
+
+
+import java.util.Collection;
+
+
+/**
+ * @author tnine
+ */
+public interface Migration {
+
+ /**
+ * Get the column families required for this implementation. If one does not exist it will be created.
+ */
+ public Collection<CollectionColumnFamily> getColumnFamilies();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/MigrationException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/MigrationException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/MigrationException.java
new file mode 100644
index 0000000..5eed4fd
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/MigrationException.java
@@ -0,0 +1,18 @@
+package org.apache.usergrid.persistence.collection.migration;
+
+
+/**
+ * Thrown when a migration cannot be performed
+ * @author tnine
+ */
+public class MigrationException extends Exception {
+
+ public MigrationException( final String message ) {
+ super( message );
+ }
+
+
+ public MigrationException( final String message, final Throwable cause ) {
+ super( message, cause );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/MigrationManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/MigrationManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/MigrationManager.java
new file mode 100644
index 0000000..d3d7038
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/MigrationManager.java
@@ -0,0 +1,15 @@
+package org.apache.usergrid.persistence.collection.migration;
+
+
+/**
+ * A manager that will perform any migrations necessary. Setup code should invoke the implementation of this interface
+ *
+ * @author tnine
+ */
+public interface MigrationManager {
+
+ /**
+ * Perform any migration necessary in the application. Will only create keyspaces and column families if they do not exist
+ */
+ public void migrate() throws MigrationException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/MigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/MigrationManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/MigrationManagerImpl.java
new file mode 100644
index 0000000..a57dcef
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/migration/MigrationManagerImpl.java
@@ -0,0 +1,198 @@
+package org.apache.usergrid.persistence.collection.migration;
+
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.exceptions.BadRequestException;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.ddl.ColumnFamilyDefinition;
+import com.netflix.astyanax.ddl.KeyspaceDefinition;
+
+
+/**
+ * Implementation of the migration manager to set up keyspace
+ *
+ * @author tnine
+ */
+@Singleton
+public class MigrationManagerImpl implements MigrationManager {
+
+
+ private static final Logger logger = LoggerFactory.getLogger( MigrationManagerImpl.class );
+
+ public static final String STRATEGY_CLASS = "collections.keyspace.strategy.class";
+ public static final String STRATEGY_OPTIONS = "collections.keyspace.strategy.options";
+ public static final String REPLICATION_FACTOR = "collections.keyspace.replicationfactor";
+
+
+ private final String strategyClass;
+ private final String replicationFactor;
+
+
+ private final Set<Migration> migrations;
+ private final Keyspace keyspace;
+ private final Properties props;
+
+
+ @Inject
+ public MigrationManagerImpl( final Keyspace keyspace, final Set<Migration> migrations, final Properties props,
+ @Named( STRATEGY_CLASS ) final String strategyClass,
+ @Named( REPLICATION_FACTOR ) final String replicationFactor ) {
+ this.keyspace = keyspace;
+ this.migrations = migrations;
+ this.props = props;
+ this.strategyClass = strategyClass;
+ this.replicationFactor = replicationFactor;
+ }
+
+
+ @Override
+ public void migrate() throws MigrationException {
+
+
+ try {
+
+ testAndCreateKeyspace();
+
+ for ( Migration migration : migrations ) {
+
+ final Collection<CollectionColumnFamily> columnFamilies = migration.getColumnFamilies();
+
+
+ if(columnFamilies == null){
+ logger.warn( "Class {} implements {} but returns null column families for migration. Either implement this method or remove the interface from the class", migration.getClass(), Migration.class );
+ continue;
+ }
+
+ for ( CollectionColumnFamily cf : columnFamilies) {
+ testAndCreateColumnFamilyDef( cf );
+ }
+ }
+ }
+ catch ( Throwable t ) {
+ logger.error( "Unable to perform migration", t );
+ throw new MigrationException( "Unable to perform migration", t );
+ }
+ }
+
+
+ /**
+ * Check if the column family exists. If it dosn't create it
+ */
+ private void testAndCreateColumnFamilyDef( CollectionColumnFamily columnFamily ) throws ConnectionException {
+ final KeyspaceDefinition keyspaceDefinition = keyspace.describeKeyspace();
+
+ final ColumnFamilyDefinition existing =
+ keyspaceDefinition.getColumnFamily( columnFamily.getColumnFamily().getName() );
+
+ if ( existing != null ) {
+ return;
+ }
+
+ keyspace.createColumnFamily( columnFamily.getColumnFamily(), columnFamily.getOptions() );
+
+ waitForMigration();
+ }
+
+
+ /**
+ * Check if they keyspace exists. If it doesn't create it
+ */
+ private void testAndCreateKeyspace() throws ConnectionException {
+
+
+ KeyspaceDefinition keyspaceDefinition = null;
+
+ try {
+ keyspaceDefinition = keyspace.describeKeyspace();
+ }
+ catch ( BadRequestException badRequestException ) {
+
+ //check if it's b/c the keyspace is missing, if so
+ final String message = badRequestException.getMessage();
+
+ boolean missingKeyspace = message.contains( "why:Keyspace" ) && message.contains( "does not exist" );
+
+ if ( !missingKeyspace ) {
+ throw badRequestException;
+ }
+ }
+
+
+ if ( keyspaceDefinition != null ) {
+ return;
+ }
+
+
+ ImmutableMap.Builder<String, Object> strategyOptions =
+ ImmutableMap.<String, Object>builder().put( "replication_factor", replicationFactor );
+
+ strategyOptions.putAll( getKeySpaceProps() );
+
+
+ ImmutableMap<String, Object> options =
+ ImmutableMap.<String, Object>builder().put( "strategy_class", strategyClass )
+ .put( "strategy_options", strategyOptions.build() ).build();
+
+
+ keyspace.createKeyspace( options );
+
+ waitForMigration();
+ }
+
+
+ /**
+ * Get keyspace properties
+ */
+ private Map<String, String> getKeySpaceProps() {
+ Map<String, String> keyspaceProps = new HashMap<String, String>();
+
+ for ( Map.Entry<Object, Object> entry : props.entrySet() ) {
+ final String key = entry.getKey().toString();
+
+ if ( !key.startsWith( STRATEGY_OPTIONS ) ) {
+ continue;
+ }
+
+ final String optionKey = key.substring( STRATEGY_OPTIONS.length() + 1 );
+
+ keyspaceProps.put( optionKey, entry.getValue().toString() );
+ }
+
+ return keyspaceProps;
+ }
+
+
+ private void waitForMigration() throws ConnectionException {
+
+ while ( true ) {
+
+ final Map<String, List<String>> versions = keyspace.describeSchemaVersions();
+
+ if ( versions != null && versions.size() == 1 ) {
+ return;
+ }
+
+ //sleep and try it again
+ try {
+ Thread.sleep( 100 );
+ }
+ catch ( InterruptedException e ) {
+ //swallow
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java
new file mode 100644
index 0000000..3e544ad
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java
@@ -0,0 +1,38 @@
+package org.apache.usergrid.persistence.collection.mvcc.entity;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A Marker interface for an in flight update to allow context information to be passed between states
+ */
+public interface MvccEntity {
+
+
+ /**
+ * Get the entity for this context.
+ * @return This will return absent if no data is present. Otherwise the entity will be contained within the optional
+ */
+ Optional<Entity> getEntity();
+
+ /**
+ * Return the version of this entityId we are attempting to write used in the current context
+ */
+ UUID getVersion();
+
+ /**
+ * Get the UUID of the entity
+ */
+ UUID getUuid();
+
+ /**
+ * Get the collection context this entity belongs i
+ */
+ CollectionContext getContext();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntityImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntityImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntityImpl.java
new file mode 100644
index 0000000..5c01cb5
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntityImpl.java
@@ -0,0 +1,95 @@
+package org.apache.usergrid.persistence.collection.mvcc.entity;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+
+/**
+ * @author tnine
+ */
+public class MvccEntityImpl implements MvccEntity {
+
+ private final CollectionContext context;
+ private final UUID entityId;
+ private final UUID version;
+ private final Optional<Entity> entity;
+
+
+ public MvccEntityImpl( final CollectionContext context, final UUID entityId, final UUID version,
+ final Optional<Entity> entity ) {
+ Preconditions.checkNotNull( context, "context is required" );
+ Preconditions.checkNotNull( entityId, "entity id is required" );
+ Preconditions.checkNotNull( version, "version id is required" );
+ Preconditions.checkNotNull( entity, "entity is required" );
+
+ this.context = context;
+ this.entityId = entityId;
+ this.version = version;
+ this.entity = entity;
+ }
+
+
+ @Override
+ public Optional<Entity> getEntity() {
+ return entity;
+ }
+
+
+ @Override
+ public UUID getVersion() {
+ return version;
+ }
+
+
+ @Override
+ public UUID getUuid() {
+ return entityId;
+ }
+
+
+ @Override
+ public CollectionContext getContext() {
+ return context;
+ }
+
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() ) {
+ return false;
+ }
+
+ final MvccEntityImpl that = ( MvccEntityImpl ) o;
+
+ if ( !context.equals( that.context ) ) {
+ return false;
+ }
+ if ( !getUuid().equals( that.getUuid() ) ) {
+ return false;
+ }
+
+ if ( !getVersion().equals( that.getVersion() ) ) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ int result = context.hashCode();
+ result = 31 * result + getUuid().hashCode();
+ result = 31 * result + getVersion().hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java
new file mode 100644
index 0000000..40ff498
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java
@@ -0,0 +1,38 @@
+package org.apache.usergrid.persistence.collection.mvcc.entity;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+
+
+/**
+ * A Marker interface for an in flight update to allow context information to be passed between states
+ */
+public interface MvccLogEntry {
+
+
+ /**
+ * Get the stage for the current version
+ */
+ Stage getStage();
+
+ /**
+ * Get the entity to add info to the log
+ * @return
+ */
+ UUID getEntityId();
+
+ /**
+ * Get the version of the entity
+ * @return
+ */
+ UUID getVersion();
+
+ /**
+ * Get the context of the entity
+ * @return
+ */
+ CollectionContext getContext();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntryImpl.java
new file mode 100644
index 0000000..f0f803b
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntryImpl.java
@@ -0,0 +1,90 @@
+package org.apache.usergrid.persistence.collection.mvcc.entity;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+
+
+/**
+ * The simple implementation of a log entry
+ * @author tnine
+ */
+public class MvccLogEntryImpl implements MvccLogEntry {
+
+ private final CollectionContext context;
+ private final UUID entityId;
+ private final UUID version;
+ private final Stage stage;
+
+
+ public MvccLogEntryImpl(final CollectionContext context, final UUID entityId, final UUID version,
+ final Stage stage ) {
+ this.context = context;
+ this.entityId = entityId;
+ this.version = version;
+ this.stage = stage;
+ }
+
+
+ @Override
+ public Stage getStage() {
+ return stage;
+ }
+
+
+ @Override
+ public UUID getEntityId() {
+ return entityId;
+ }
+
+
+ @Override
+ public UUID getVersion() {
+ return version;
+ }
+
+
+ @Override
+ public CollectionContext getContext() {
+ return context;
+ }
+
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() ) {
+ return false;
+ }
+
+ final MvccLogEntryImpl that = ( MvccLogEntryImpl ) o;
+
+ if ( !context.equals( that.context ) ) {
+ return false;
+ }
+ if ( !entityId.equals( that.entityId ) ) {
+ return false;
+ }
+ if ( stage != that.stage ) {
+ return false;
+ }
+ if ( !version.equals( that.version ) ) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ int result = context.hashCode();
+ result = 31 * result + entityId.hashCode();
+ result = 31 * result + version.hashCode();
+ result = 31 * result + stage.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/Stage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/Stage.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/Stage.java
new file mode 100644
index 0000000..96ca3a6
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/Stage.java
@@ -0,0 +1,67 @@
+package org.apache.usergrid.persistence.collection.mvcc.entity;
+
+
+/**
+ * The different stages that can exist in the commit log
+ */
+public enum Stage {
+
+ /**
+ * These are bitmasks that represent the state's we've been through
+ *
+ * Active => 0000
+ * RollBack => 1000
+ * COMMITTED => 1100
+ * POSTPROCESSOR => 1110
+ * ACTIVE => 1111
+ */
+
+ /**
+ * The entity has started writing but is not yet committed
+ */
+ ACTIVE(true, (byte)0),
+
+ /**
+ * The entity has started writing but not yet committed.
+ */
+ ROLLBACK(true, (byte)1),
+ /**
+ * We have applied enough writes to be able to recover via writeahead logging. The system can recover from a
+ * crash without data loss at this point
+ */
+ COMMITTED(false, (byte)2),
+ /**
+ * The entity is going through post processing
+ */
+ POSTPROCESS(false, (byte)6),
+
+ /**
+ * The entity has completed all post processing
+ */
+ COMPLETE(false, (byte)14);
+
+
+ private final boolean transientStage;
+ private final byte id;
+
+
+ private Stage(final boolean transientStage, final byte id){
+ this.transientStage = transientStage;
+ this.id = id;
+ }
+
+
+ /**
+ * Returns true if this stage is transient and should not be retained in the datastore permanently
+ * Stages such as start and write don't need to be retained, but can be used to signal "in flight"
+ * updates
+ */
+ public boolean isTransient() {
+ return transientStage;
+ }
+
+ public byte getId(){
+ return this.id;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java
new file mode 100644
index 0000000..47ad997
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java
@@ -0,0 +1,24 @@
+package org.apache.usergrid.persistence.collection.mvcc.event;
+
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+
+
+/**
+ *
+ * @author: tnine
+ *
+ */
+public interface PostProcessListener<T extends MvccEntity>
+{
+
+
+ /**
+ * The entity was rejected by the MVCC system and will be removed
+ *
+ * @param mvccEntity The mvcc entity to perform post processing on
+ * @return the MvccEntity to use during this stage
+ */
+ public MvccEntity doPostProcessing(MvccEntity mvccEntity );
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Commit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Commit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Commit.java
new file mode 100644
index 0000000..64d3c5d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Commit.java
@@ -0,0 +1,18 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+
+
+/**
+ * This phase should invoke any finalization, and mark the entity as committed in the data store before returning
+ */
+public class Commit implements WriteStage {
+
+
+
+ @Override
+ public MvccEntity performStage( final MvccEntity entity ) {
+ return entity;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/MvccStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/MvccStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/MvccStrategy.java
new file mode 100644
index 0000000..c11f420
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/MvccStrategy.java
@@ -0,0 +1,42 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/**
+ * Interface to define mvcc operations
+ *
+ * TODO: Not sure we need this any more
+ */
+public interface MvccStrategy {
+
+ /**
+ * Start progress through states for this entity
+ *
+ * @param context The context this entity belongs in
+ * @param entityId The entity id to assign to this entity
+ * @param entity The entity values to write
+ */
+ public WriteStage beingWrite( CollectionContext context, UUID entityId, Entity entity );
+
+
+ /**
+ * Get the current stage of the entity in the context at the current version. Should be used for write verification
+ * on resume
+ *
+ * @param context The context this entity belongs in
+ * @param entityId The entity Id to search for in the context
+ * @param version The version of the entityId to review
+ */
+ public WriteStage getCurrentState( CollectionContext context, UUID entityId, UUID version );
+
+
+ /**
+ * Get the write stage of the entity in the context with a version <= the current version and a stage of Comitted
+ */
+ public WriteStage getCurrentStateOfEntity( CollectionContext context, UUID entityId, UUID maxVersion );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Start.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Start.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Start.java
new file mode 100644
index 0000000..d7f85ae
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Start.java
@@ -0,0 +1,24 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+
+
+/**
+ * This is the first stage and should be invoked immediately when a write is started. It should persist the start of a
+ * new write in the data store for a checkpoint and recovery
+ */
+public class Start implements WriteStage {
+
+ /**
+ * Create a new stage with the current context
+ */
+ protected Start( ){
+ }
+
+
+ @Override
+ public MvccEntity performStage( final MvccEntity entity ) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Write.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Write.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Write.java
new file mode 100644
index 0000000..b67ca31
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Write.java
@@ -0,0 +1,25 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+
+
+/**
+ * This phase should execute the serialization to the data store.
+ */
+public class Write implements WriteStage {
+
+ /**
+ * Create a new stage with the current context
+ */
+ protected Write( ){
+ }
+
+
+ @Override
+ public MvccEntity performStage( final MvccEntity entity) {
+
+
+ return entity;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
new file mode 100644
index 0000000..cda707f
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/WriteStage.java
@@ -0,0 +1,20 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+
+
+/**
+ * The possible stages in our write flow.
+ */
+public interface WriteStage {
+
+ /**
+ * Run this stage. This will return the MvccEntity that should be returned or passed to the next stage
+ * @param entity The entity to use in this stage
+ *
+ * @return The MvccEntity to use for the next sgage
+ *
+ */
+ public MvccEntity performStage( MvccEntity entity);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/verify/AtomicUpdate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/verify/AtomicUpdate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/verify/AtomicUpdate.java
new file mode 100644
index 0000000..d844f3b
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/verify/AtomicUpdate.java
@@ -0,0 +1,26 @@
+package org.apache.usergrid.persistence.collection.mvcc.verify;
+
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+
+
+/**
+ * Interface to test if we can perform atomic operations
+ * <p/>
+ * Note This will probably require a new WriteStage that is after start, which is rollback
+ */
+public interface AtomicUpdate
+{
+
+ /** Signal that we are starting update */
+ public void startUpdate( MvccEntity context );
+
+ /**
+ * Try the commit.
+ *
+ * @return true if we can proceed. False if we cannot
+ */
+ public boolean tryCommit( MvccEntity context );
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/verify/OptimisticUpdate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/verify/OptimisticUpdate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/verify/OptimisticUpdate.java
new file mode 100644
index 0000000..cef476f
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/verify/OptimisticUpdate.java
@@ -0,0 +1,22 @@
+package org.apache.usergrid.persistence.collection.mvcc.verify;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+
+
+/** Interface to define how optimistic updates should be performed */
+public interface OptimisticUpdate
+{
+
+ /**
+ * Verify the entity we're trying to write in our current context has the correct most current version
+ *
+ * @param context The mvcc context
+ * @param optimisticVersion The optimistic version the caller provider as the most up to date
+ *
+ * @return True if the optimisticVersion is the most current >= Comitted stage, false otherwise
+ */
+ public boolean verifyCurrent( MvccEntity context, UUID optimisticVersion );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/verify/UniqueUpdate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/verify/UniqueUpdate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/verify/UniqueUpdate.java
new file mode 100644
index 0000000..2df987f
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/verify/UniqueUpdate.java
@@ -0,0 +1,27 @@
+package org.apache.usergrid.persistence.collection.mvcc.verify;
+
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.model.field.Field;
+
+
+/** Interface to define how unique updates should be performed */
+public interface UniqueUpdate
+{
+
+ /**
+ * Verify the entity we're trying to write in our current context has the correct most current version
+ *
+ * @param context The mvcc context
+ * @param uniqueField The field to check for uniqueness
+ *
+ * @return True if the value in the uniqueField is unique in the collection context
+ */
+ public boolean verifyUnique( MvccEntity context, Field<?> uniqueField );
+
+ /**
+ * During the commit phase, ensure this entity is committed as a unique value. This may release locks or overwrite
+ * expiring timeout values since we are at the final commit phase
+ */
+ public void commitUnique( MvccEntity entity, Field<?> uniqueField );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java
new file mode 100644
index 0000000..0d3d112
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java
@@ -0,0 +1,77 @@
+package org.apache.usergrid.persistence.collection.serialization;
+
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/** The interface that allows us to serialize an entity to disk */
+public interface MvccEntitySerializationStrategy
+{
+
+ /**
+ * Serialize the entity to the data store with the given collection context
+ *
+ * @param entity The entity to persist
+ * @return The MutationBatch operations for this update
+ */
+ public MutationBatch write( MvccEntity entity );
+
+
+ /**
+ * Load and return the entity with the given id and a version that is <= the version provided
+ *
+ * @param context The context to persist the entity into
+ * @param entityId The entity id to load
+ * @param version The version to load. This will return the version <= the given version
+ *
+ * @return The deserialized version of the entity. Null if no version == to version exists.
+ * If the entity version has been cleared, the MvccEntity will be returned, but the optional entity
+ * will not be set
+ */
+ public MvccEntity load( CollectionContext context, UUID entityId, UUID version ) throws ConnectionException;
+
+ /**
+ * Load a list, from highest to lowest of the entity with versions <= version up to maxSize elements
+ *
+ * @param context The context to persist the entity into
+ * @param entityId The entity id to load
+ * @param version The max version to seek from. I.E a stored version <= this argument
+ * @param maxSize The maximum size to return. If you receive this size, there may be more versions to load.
+ *
+ * @return A list of entities up to max size ordered from max(UUID)=> min(UUID). The return value should be null safe
+ * and return an empty list when there are no matches
+ */
+ public List<MvccEntity> load( CollectionContext context, UUID entityId, UUID version, int maxSize );
+
+
+ /**
+ * Clear this version from the persistence store, but keep the version to mark that is has been cleared
+ * This can be used in a mark+sweep system. The entity with the given version will exist in the context,
+ * but no data will be stored
+ *
+ * @param context
+ * @param entityId
+ * @param version
+ * @return
+ */
+ public MutationBatch clear(CollectionContext context, UUID entityId, UUID version);
+
+
+ /**
+ * Delete the entity from the context with the given entityId and version
+ *
+ * @param context The context that contains the entity
+ * @param entityId The entity id to delete
+ * @param version The version to delete
+ */
+ public MutationBatch delete( CollectionContext context, UUID entityId, UUID version );
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategyImpl.java
new file mode 100644
index 0000000..4b8840d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategyImpl.java
@@ -0,0 +1,252 @@
+package org.apache.usergrid.persistence.collection.serialization;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.db.marshal.UUIDType;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.migration.CollectionColumnFamily;
+import org.apache.usergrid.persistence.collection.migration.Migration;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntityImpl;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.ColumnListMutation;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.model.ColumnFamily;
+import com.netflix.astyanax.model.ColumnList;
+import com.netflix.astyanax.model.ColumnSlice;
+import com.netflix.astyanax.serializers.AbstractSerializer;
+import com.netflix.astyanax.serializers.ObjectSerializer;
+import com.netflix.astyanax.serializers.UUIDSerializer;
+
+
+/**
+ * @author tnine
+ */
+@Singleton
+public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializationStrategy, Migration {
+
+
+ private static final EntitySerializer SER = new EntitySerializer();
+
+
+ private static final ColumnFamily<UUID, UUID> CF_ENTITY_DATA =
+ new ColumnFamily<UUID, UUID>( "Entity_Version_Data", UUIDSerializer.get(), UUIDSerializer.get() );
+
+
+ protected final Keyspace keyspace;
+
+
+ @Inject
+ public MvccEntitySerializationStrategyImpl( final Keyspace keyspace ) {
+ this.keyspace = keyspace;
+ }
+
+
+ @Override
+ public MutationBatch write( final MvccEntity entity ) {
+ Preconditions.checkNotNull( entity, "entity is required" );
+
+ final UUID colName = entity.getVersion();
+ final UUID entityId = entity.getUuid();
+
+ final Optional<Entity> colValue = entity.getEntity();
+
+ return doWrite( entityId, new RowOp() {
+ @Override
+ public void doOp( final ColumnListMutation<UUID> colMutation ) {
+ colMutation.putColumn( colName, SER.toByteBuffer( colValue ) );
+ }
+ } );
+ }
+
+
+ @Override
+ public MvccEntity load( final CollectionContext context, final UUID entityId, final UUID version )
+ throws ConnectionException {
+ Preconditions.checkNotNull( context, "context is required" );
+ Preconditions.checkNotNull( entityId, "entity id is required" );
+ Preconditions.checkNotNull( version, "version is required" );
+
+
+ final UUID start = null;
+ final UUID end = null;
+
+ Column<UUID> column;
+
+ try {
+ column = keyspace.prepareQuery( CF_ENTITY_DATA ).getKey( entityId )
+ .getColumn( version ).execute().getResult();
+ }
+
+ catch ( NotFoundException e ) {
+ //swallow, there's just no column
+ return null;
+ }
+
+ if ( column == null ) {
+ return null;
+ }
+
+
+ return new MvccEntityImpl( context, entityId, version, column.getValue(SER) );
+
+ }
+
+
+ @Override
+ public List<MvccEntity> load( final CollectionContext context, final UUID entityId, final UUID version,
+ final int maxSize ) {
+
+ Preconditions.checkNotNull( context, "context is required" );
+ Preconditions.checkNotNull( entityId, "entity id is required" );
+ Preconditions.checkNotNull( version, "version is required" );
+ Preconditions.checkArgument( maxSize > 0, "max Size must be greater than 0" );
+
+
+ ColumnList<UUID> columns;
+
+ try {
+
+
+ columns = keyspace.prepareQuery( CF_ENTITY_DATA ).getKey( entityId )
+ .withColumnRange( version, null, false, maxSize ).execute().getResult();
+ }
+
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to load column data", e );
+ }
+
+ if ( columns == null ) {
+ return Collections.EMPTY_LIST;
+ }
+
+ List<MvccEntity> results = new ArrayList<MvccEntity>( columns.size() );
+
+ for ( Column<UUID> col : columns ) {
+ results.add( new MvccEntityImpl( context, entityId, col.getName(), col.getValue( SER ) ) );
+ }
+
+ return results;
+ }
+
+
+ @Override
+ public MutationBatch clear( final CollectionContext context, final UUID entityId, final UUID version ) {
+ Preconditions.checkNotNull( context, "context is required" );
+ Preconditions.checkNotNull( entityId, "entity id is required" );
+ Preconditions.checkNotNull( version, "version is required" );
+
+ final Optional<Entity> value = Optional.absent();
+
+ return doWrite( entityId, new RowOp() {
+ @Override
+ public void doOp( final ColumnListMutation<UUID> colMutation ) {
+ colMutation.putColumn( version, SER.toByteBuffer( value ) );
+ }
+ } );
+ }
+
+
+ @Override
+ public MutationBatch delete( final CollectionContext context, final UUID entityId, final UUID version ) {
+
+ return doWrite( entityId, new RowOp() {
+ @Override
+ public void doOp( final ColumnListMutation<UUID> colMutation ) {
+ colMutation.deleteColumn( version );
+ }
+ } );
+ }
+
+
+ @Override
+ public Collection<CollectionColumnFamily> getColumnFamilies() {
+
+ //create the CF entity data. We want it reversed b/c we want the most recent version at the top of the
+ //row for fast seeks
+ CollectionColumnFamily cf = new CollectionColumnFamily( CF_ENTITY_DATA,
+ ReversedType.class.getName() + "(" + UUIDType.class.getName() + ")", true );
+
+
+ return Collections.singleton( cf );
+ }
+
+
+ /**
+ * Do the write on the correct row for the entity id with the operation
+ */
+ private MutationBatch doWrite( UUID entityId, RowOp op ) {
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ op.doOp( batch.withRow( CF_ENTITY_DATA, entityId ) );
+
+ return batch;
+ }
+
+
+ /**
+ * Simple callback to perform puts and deletes with a common row setup code
+ */
+ private static interface RowOp {
+
+ /**
+ * The operation to perform on the row
+ */
+ void doOp( ColumnListMutation<UUID> colMutation );
+ }
+
+
+ /**
+ * TODO: Serializer for the entity. This just uses object serialization, change this to use SMILE before production!
+ * We want to retain the Optional wrapper. It helps us easily mark something as cleaned without removing the data
+ */
+ private static class EntitySerializer extends AbstractSerializer<Optional<Entity>> {
+
+ private static final ObjectSerializer SER = ObjectSerializer.get();
+
+ //the marker for when we're passed a "null" value
+ private static final byte[] EMPTY = new byte[] { 0x0 };
+
+
+ @Override
+ public ByteBuffer toByteBuffer( final Optional<Entity> obj ) {
+
+ //mark this version as empty
+ if ( !obj.isPresent() ) {
+ return ByteBuffer.wrap( EMPTY );
+ }
+
+ return SER.toByteBuffer( obj.get() );
+ }
+
+
+ @Override
+ public Optional<Entity> fromByteBuffer( final ByteBuffer byteBuffer ) {
+
+ final ByteBuffer check = byteBuffer.duplicate();
+
+ if ( check.remaining() == 1 && check.get() == EMPTY[0] ) {
+ return Optional.absent();
+ }
+
+ return Optional.of( ( Entity ) SER.fromByteBuffer( byteBuffer ) );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac634a1d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
new file mode 100644
index 0000000..a249522
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
@@ -0,0 +1,59 @@
+package org.apache.usergrid.persistence.collection.serialization;
+
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/** The interface that allows us to serialize a log entry to disk */
+public interface MvccLogEntrySerializationStrategy
+{
+
+ /**
+ * Serialize the entity to the data store with the given collection context
+ *
+ * @param entry the entry to write
+ * @return The mutation batch with the mutation operations for this write.
+ */
+ public MutationBatch write( MvccLogEntry entry );
+
+ /**
+ * Load and return the stage with the given id and a version that is <= the version provided
+ *
+ *
+ * @param context The context to persist the entity into
+ * @param entityId The entity id to load
+ * @param version The version to load. This will return the version <= the given version
+ *
+ * @return The deserialized version of the entity. Null if no version <= the current version exists, or the entity does not exist
+ */
+ public MvccLogEntry load( final CollectionContext context, final UUID entityId, final UUID version )
+ throws ConnectionException;
+
+ /**
+ * Load a list, from highest to lowest of the stage with versions <= version up to maxSize elements
+ *
+ * @param context The context to persist the entity into
+ * @param entityId The entity id to load
+ * @param version The max version to seek from
+ * @param maxSize The maximum size to return. If you receive this size, there may be more versions to load.
+ *
+ * @return A list of entities up to max size ordered from max(UUID)=> min(UUID)
+ */
+ public List<MvccLogEntry> load( CollectionContext context, UUID entityId, UUID version, int maxSize );
+
+ /**
+ * Delete the stage from the context with the given entityId and version
+ *
+ * @param context The context that contains the entity
+ * @param entityId The entity id to delete
+ * @param version The version to delete
+ */
+ public MutationBatch delete( CollectionContext context, UUID entityId, UUID version );
+}