You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/11 19:37:18 UTC
[04/14] incubator-usergrid git commit: First pass at refactoring mark
+ sweep
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
new file mode 100644
index 0000000..0034f03
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
+
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.codahale.metrics.Timer;
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+
+
+/**
+ * Runs on an entity that as just be mark committed, and removes all unique values <= this entity
+ */
+public class UniqueCleanup
+ implements Observable.Transformer<CollectionIoEvent<MvccEntity>, CollectionIoEvent<MvccEntity>> {
+
+
+ private static final Logger logger = LoggerFactory.getLogger( UniqueCleanup.class );
+ private final Timer uniqueCleanupTimer;
+
+
+ private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+ private final Keyspace keyspace;
+
+ private final SerializationFig serializationFig;
+
+
+ @Inject
+ public UniqueCleanup( final SerializationFig serializationFig,
+ final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+ final Keyspace keyspace, final MetricsFactory metricsFactory ) {
+
+ this.serializationFig = serializationFig;
+ this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
+ this.keyspace = keyspace;
+ this.uniqueCleanupTimer = metricsFactory.getTimer( UniqueCleanup.class, "uniquecleanup" );
+ }
+
+
+ @Override
+ public Observable<CollectionIoEvent<MvccEntity>> call(
+ final Observable<CollectionIoEvent<MvccEntity>> collectionIoEventObservable ) {
+
+ final Observable<CollectionIoEvent<MvccEntity>> outputObservable =
+ collectionIoEventObservable.doOnNext( mvccEntityCollectionIoEvent -> {
+
+ final Id entityId = mvccEntityCollectionIoEvent.getEvent().getId();
+ final ApplicationScope applicationScope = mvccEntityCollectionIoEvent.getEntityCollection();
+ final UUID entityVersion = mvccEntityCollectionIoEvent.getEvent().getVersion();
+
+ //TODO Refactor this logic into a a class that can be invoked from anywhere
+ //iterate all unique values
+ final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup =
+ Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) {
+ @Override
+ protected Iterator<UniqueValue> getIterator() {
+ return uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope, entityId );
+ }
+ } )
+
+ //skip versions > the specified version
+ .skipWhile( uniqueValue -> {
+
+ final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
+
+ return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) > 0;
+ } )
+
+ //buffer our buffer size, then roll them all up in a single batch mutation
+ .buffer( serializationFig.getBufferSize() )
+
+ //roll them up
+ .doOnNext( uniqueValues -> {
+ final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
+
+
+ for ( UniqueValue value : uniqueValues ) {
+ uniqueCleanupBatch
+ .mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) );
+ }
+
+ try {
+ uniqueCleanupBatch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute batch mutation", e );
+ }
+ } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent );
+ } );
+
+ return ObservableTimer.time( outputObservable, uniqueCleanupTimer );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
new file mode 100644
index 0000000..424ec86
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+
+
+/**
+ * Compact all versions on the input observable by removing them from the log, and from the
+ * versions
+ */
+public class VersionCompact
+ implements Observable.Transformer<CollectionIoEvent<MvccLogEntry>, CollectionIoEvent<MvccLogEntry>> {
+
+ private static final Logger logger = LoggerFactory.getLogger( VersionCompact.class );
+
+ private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+ private final MvccEntitySerializationStrategy mvccEntitySerializationStrategy;
+ private final SerializationFig serializationFig;
+ private final Keyspace keyspace;
+ private final Timer compactTimer;
+
+
+ @Inject
+ public VersionCompact( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final SerializationFig serializationFig, final Keyspace keyspace,
+ final MetricsFactory metricsFactory,
+ final MvccEntitySerializationStrategy mvccEntitySerializationStrategy ) {
+ this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+ this.serializationFig = serializationFig;
+ this.keyspace = keyspace;
+ this.mvccEntitySerializationStrategy = mvccEntitySerializationStrategy;
+ this.compactTimer = metricsFactory.getTimer( VersionCompact.class, "compact" );
+ }
+
+
+ @Override
+ public Observable<CollectionIoEvent<MvccLogEntry>> call(
+ final Observable<CollectionIoEvent<MvccLogEntry>> collectionIoEventObservable ) {
+
+
+ final Observable<CollectionIoEvent<MvccLogEntry>> entryBuffer =
+ collectionIoEventObservable.buffer( serializationFig.getBufferSize() ).flatMap(
+ buffer -> Observable.from( buffer ).collect( () -> keyspace.prepareMutationBatch(),
+ ( ( mutationBatch, mvccLogEntryCollectionIoEvent ) -> {
+
+ final ApplicationScope scope = mvccLogEntryCollectionIoEvent.getEntityCollection();
+ final MvccLogEntry mvccLogEntry = mvccLogEntryCollectionIoEvent.getEvent();
+ final Id entityId = mvccLogEntry.getEntityId();
+ final UUID version = mvccLogEntry.getVersion();
+
+ if ( logger.isDebugEnabled() ) {
+ logger.debug(
+ "Deleting log entry and version data for entity id {} and version {} in app scope {}",
+ new Object[] { entityId, version, scope } );
+ }
+
+
+ //delete from our log
+ mutationBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, version ) );
+
+ //merge our entity delete in
+ mutationBatch
+ .mergeShallow( mvccEntitySerializationStrategy.delete( scope, entityId, version ) );
+
+
+
+ } ) )
+ //delete from the entities
+ .doOnNext( mutationBatch -> {
+ try {
+ mutationBatch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to perform batch mutation" );
+ }
+ } ).flatMap( batches -> Observable.from( buffer ) ) );
+
+
+ return ObservableTimer.time( entryBuffer, compactTimer );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
index 92669a7..5e249ae 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
@@ -71,6 +71,20 @@ public interface MvccLogEntrySerializationStrategy extends Migration, VersionedD
*/
List<MvccLogEntry> load( ApplicationScope applicationScope, Id entityId, UUID version, int maxSize );
+
+
+ /**
+ * Load a list, from lowest to highest of the stage with versions <= version up to maxSize elements
+ *
+ * @param applicationScope The applicationScope to load the entity from
+ * @param entityId The entity id to load
+ * @param minVersion The min version to seek from. Null is allowed
+ * @param maxSize The maximum size to return. If you receive this size, there may be more versions to load.
+ *
+ * @return A list of entities up to max size ordered from max(UUID)=> min(UUID)
+ */
+ List<MvccLogEntry> loadReversed( ApplicationScope applicationScope, Id entityId, UUID minVersion, int maxSize );
+
/**
* MarkCommit the stage from the applicationScope with the given entityId and version
*
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
index 381a24e..6591781 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
@@ -24,42 +24,18 @@ public interface SerializationFig extends GuicyFig {
@Default("5")
int getTimeout();
- /**
- * Number of history items to return for delete.
- *
- * @return Timeout in seconds.
- */
- @Key("collection.delete.history.size")
- @Default("100")
- int getHistorySize();
/**
* Number of items to buffer.
*
- * @return Timeout in seconds.
+ * @return Number of items to buffer in memory
*/
- @Key("collection.buffer.size")
- @Default("10")
+ @Key("buffer.size")
+ @Default("100")
int getBufferSize();
/**
- * The size of threads to have in the task pool
- */
- @Key( "collection.task.pool.threadsize" )
- @Default( "20" )
- int getTaskPoolThreadSize();
-
-
-
- /**
- * The size of threads to have in the task pool
- */
- @Key( "collection.task.pool.queuesize" )
- @Default( "20" )
- int getTaskPoolQueueSize();
-
- /**
* The maximum amount of entities we can load in a single request
* TODO, change this and move it into a common setting that both query and collection share
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
deleted file mode 100644
index e5c2896..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package org.apache.usergrid.persistence.collection.serialization.impl;
-
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Preconditions;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
- */
-public class LogEntryIterator implements Iterator<MvccLogEntry> {
-
-
- private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
- private final ApplicationScope scope;
- private final Id entityId;
- private final int pageSize;
-
-
- private Iterator<MvccLogEntry> elementItr;
- private UUID nextStart;
-
-
- /**
- * @param logEntrySerializationStrategy The serialization strategy to get the log entries
- * @param scope The scope of the entity
- * @param entityId The id of the entity
- * @param maxVersion The max version of the entity. Iterator will iterate from max to min starting with the version
- * < max
- * @param pageSize The fetch size to get when querying the serialization strategy
- */
- public LogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
- final ApplicationScope scope, final Id entityId, final UUID maxVersion,
- final int pageSize ) {
-
- Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" );
-
- this.logEntrySerializationStrategy = logEntrySerializationStrategy;
- this.scope = scope;
- this.entityId = entityId;
- this.nextStart = maxVersion;
- this.pageSize = pageSize;
- }
-
-
- @Override
- public boolean hasNext() {
- if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
- try {
- advance();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to query cassandra", e );
- }
- }
-
- return elementItr.hasNext();
- }
-
-
- @Override
- public MvccLogEntry next() {
- if ( !hasNext() ) {
- throw new NoSuchElementException( "No more elements exist" );
- }
-
- return elementItr.next();
- }
-
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException( "Remove is unsupported" );
- }
-
-
- /**
- * Advance our iterator
- */
- public void advance() throws ConnectionException {
-
- final int requestedSize = pageSize + 1;
-
- //loop through even entry that's < this one and remove it
- List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize );
-
- //we always remove the first version if it's equal since it's returned
- if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
- results.remove( 0 );
- }
-
-
- //we have results, set our next start
- if ( results.size() == pageSize ) {
- nextStart = results.get( results.size() - 1 ).getVersion();
- }
- //nothing left to do
- else {
- nextStart = null;
- }
-
- elementItr = results.iterator();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
new file mode 100644
index 0000000..eae8c06
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
@@ -0,0 +1,121 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
+ */
+public class MinMaxLogEntryIterator implements Iterator<MvccLogEntry> {
+
+
+ private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+ private final ApplicationScope scope;
+ private final Id entityId;
+ private final int pageSize;
+
+
+ private Iterator<MvccLogEntry> elementItr;
+ private UUID nextStart;
+
+
+ /**
+ * @param logEntrySerializationStrategy The serialization strategy to get the log entries
+ * @param scope The scope of the entity
+ * @param entityId The id of the entity
+ * @param pageSize The fetch size to get when querying the serialization strategy
+ */
+ public MinMaxLogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final ApplicationScope scope, final Id entityId, final int pageSize ) {
+
+ Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" );
+
+ this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+ this.scope = scope;
+ this.entityId = entityId;
+ this.pageSize = pageSize;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
+ try {
+ advance();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to query cassandra", e );
+ }
+ }
+
+ return elementItr.hasNext();
+ }
+
+
+ @Override
+ public MvccLogEntry next() {
+ if ( !hasNext() ) {
+ throw new NoSuchElementException( "No more elements exist" );
+ }
+
+ return elementItr.next();
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "Remove is unsupported" );
+ }
+
+
+ /**
+ * Advance our iterator
+ */
+ public void advance() throws ConnectionException {
+
+ final int requestedSize;
+
+ if ( nextStart != null ) {
+ requestedSize = pageSize + 1;
+ }
+ else {
+ requestedSize = pageSize;
+ }
+
+ //loop through even entry that's < this one and remove it
+ List<MvccLogEntry> results = logEntrySerializationStrategy.loadReversed( scope, entityId, nextStart, requestedSize );
+
+ //we always remove the first version if it's equal since it's returned
+ if ( nextStart != null && results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
+ results.remove( 0 );
+ }
+
+
+
+ //we have results, set our next start. If we miss our start version (due to deletion) and we request a +1, we want to ensure we set our next, hence the >=
+ if ( results.size() >= pageSize ) {
+ nextStart = results.get( results.size() - 1 ).getVersion();
+ }
+ //nothing left to do
+ else {
+ nextStart = null;
+ }
+
+
+
+
+ elementItr = results.iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
index 81c0248..fdef3c3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
@@ -111,6 +111,20 @@ public class MvccLogEntrySerializationProxyImpl implements MvccLogEntrySerializa
@Override
+ public List<MvccLogEntry> loadReversed( final ApplicationScope applicationScope, final Id entityId,
+ final UUID minVersion, final int maxSize ) {
+
+ final MigrationRelationship<MvccLogEntrySerializationStrategy> migration = getMigrationRelationShip();
+
+ if ( migration.needsMigration() ) {
+ return migration.from.loadReversed( applicationScope, entityId, minVersion, maxSize );
+ }
+
+ return migration.to.loadReversed( applicationScope, entityId, minVersion, maxSize );
+ }
+
+
+ @Override
public MutationBatch delete( final ApplicationScope applicationScope, final Id entityId, final UUID version ) {
final MigrationRelationship<MvccLogEntrySerializationStrategy> migration = getMigrationRelationShip();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
index 76e9dba..0c0d961 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -31,11 +30,6 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.IntegerType;
-import org.apache.cassandra.db.marshal.ReversedType;
-import org.apache.cassandra.db.marshal.UUIDType;
-
import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.collection.VersionSet;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
@@ -43,10 +37,7 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.impl.util.LegacyScopeUtils;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -98,19 +89,16 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
final UUID colName = entry.getVersion();
final StageStatus stageStatus = new StageStatus( stage, entry.getState() );
- return doWrite( collectionScope, entry.getEntityId(), entry.getVersion(), new RowOp() {
- @Override
- public void doOp( final ColumnListMutation<UUID> colMutation ) {
-
- //Write the stage with a timeout, it's set as transient
- if ( stage.isTransient() ) {
- colMutation.putColumn( colName, stageStatus, SER, fig.getTimeout() );
- return;
- }
+ return doWrite( collectionScope, entry.getEntityId(), entry.getVersion(), colMutation -> {
- //otherwise it's persistent, write it with no expiration
- colMutation.putColumn( colName, stageStatus, SER, null );
+ //Write the stage with a timeout, it's set as transient
+ if ( stage.isTransient() ) {
+ colMutation.putColumn( colName, stageStatus, SER, fig.getTimeout() );
+ return;
}
+
+ //otherwise it's persistent, write it with no expiration
+ colMutation.putColumn( colName, stageStatus, SER, null );
} );
}
@@ -126,12 +114,10 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
//didnt put the max in the error message, I don't want to take the string construction hit every time
Preconditions.checkArgument( entityIds.size() <= fig.getMaxLoadSize(),
- "requested size cannot be over configured maximum" );
+ "requested size cannot be over configured maximum" );
final Id applicationId = collectionScope.getApplication();
- final Id ownerId = applicationId;
-
final List<ScopedRowKey<K>> rowKeys = new ArrayList<>( entityIds.size() );
@@ -155,7 +141,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
}
catch ( ConnectionException e ) {
throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra",
- e );
+ e );
}
@@ -181,7 +167,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
final StageStatus stageStatus = column.getValue( SER );
final MvccLogEntry logEntry =
- new MvccLogEntryImpl( entityId, version, stageStatus.stage, stageStatus.state );
+ new MvccLogEntryImpl( entityId, version, stageStatus.stage, stageStatus.state );
versionResults.addEntry( logEntry );
@@ -208,13 +194,41 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
final ScopedRowKey<K> rowKey = createKey( applicationId, entityId );
+ columns =
+ keyspace.prepareQuery( CF_ENTITY_LOG ).getKey( rowKey ).withColumnRange( version, null, false, maxSize )
+ .execute().getResult();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to load log entries", e );
+ }
+
+ return parseResults( columns, entityId );
+ }
+
+
+ @Override
+ public List<MvccLogEntry> loadReversed( final ApplicationScope applicationScope, final Id entityId,
+ final UUID minVersion, final int maxSize ) {
+ ColumnList<UUID> columns;
+ try {
+
+ final Id applicationId = applicationScope.getApplication();
+
+ final ScopedRowKey<K> rowKey = createKey( applicationId, entityId );
+
+
columns = keyspace.prepareQuery( CF_ENTITY_LOG ).getKey( rowKey )
- .withColumnRange( version, null, false, maxSize ).execute().getResult();
+ .withColumnRange( minVersion, null, true, maxSize ).execute().getResult();
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to load log entries", e );
}
+ return parseResults( columns, entityId );
+ }
+
+
+ private List<MvccLogEntry> parseResults( final ColumnList<UUID> columns, final Id entityId ) {
List<MvccLogEntry> results = new ArrayList<MvccLogEntry>( columns.size() );
@@ -236,17 +250,10 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
Preconditions.checkNotNull( entityId, "entityId is required" );
Preconditions.checkNotNull( version, "version context is required" );
- return doWrite( context, entityId, version, new RowOp() {
- @Override
- public void doOp( final ColumnListMutation<UUID> colMutation ) {
- colMutation.deleteColumn( version );
- }
- } );
+ return doWrite( context, entityId, version, colMutation -> colMutation.deleteColumn( version ) );
}
-
-
/**
* Simple callback to perform puts and deletes with a common row setup code
*/
@@ -281,12 +288,14 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
return batch;
}
+
protected abstract MultiTennantColumnFamily<ScopedRowKey<K>, UUID> getColumnFamily();
- protected abstract ScopedRowKey<K> createKey(final Id applicationId, final Id entityId);
+ protected abstract ScopedRowKey<K> createKey( final Id applicationId, final Id entityId );
+
+ protected abstract Id getEntityIdFromKey( final ScopedRowKey<K> key );
- protected abstract Id getEntityIdFromKey(final ScopedRowKey<K> key);
/**
* Internal stage shard
@@ -319,7 +328,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
*/
private static class StatusCache {
private Map<Integer, MvccLogEntry.State> values =
- new HashMap<Integer, MvccLogEntry.State>( MvccLogEntry.State.values().length );
+ new HashMap<Integer, MvccLogEntry.State>( MvccLogEntry.State.values().length );
private StatusCache() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index 3168817..e825bbc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -31,16 +31,12 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.impl.EntityVersionCleanupTask;
-import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV3Impl;
-import org.apache.usergrid.persistence.collection.serialization.impl.MvccLogEntrySerializationStrategyV2Impl;
import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
-import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyV2Impl;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
@@ -76,7 +72,6 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
private final Keyspace keyspace;
private final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions;
- private final EntityVersionTaskFactory entityVersionCleanupFactory;
private final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3;
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
@@ -85,13 +80,11 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
@Inject
public MvccEntityDataMigrationImpl( final Keyspace keyspace,
final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions,
- final EntityVersionTaskFactory entityVersionCleanupFactory,
final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3,
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy ) {
this.keyspace = keyspace;
this.allVersions = allVersions;
- this.entityVersionCleanupFactory = entityVersionCleanupFactory;
this.mvccEntitySerializationStrategyV3 = mvccEntitySerializationStrategyV3;
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
@@ -162,7 +155,8 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
atomicLong.addAndGet( entities.size() );
- List<EntityVersionCleanupTask> entityVersionCleanupTasks = new ArrayList( entities.size() );
+ final List<Id> toSaveIds = new ArrayList<>( entities.size() );
+
for ( EntityToSaveMessage message : entities ) {
final MutationBatch entityRewrite = migration.to.write( message.scope, message.entity );
@@ -187,6 +181,9 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
final UUID version = message.entity.getVersion();
+
+ toSaveIds.add( entityId );
+
// re-write the unique
// values
// but this
@@ -215,7 +212,6 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
* Migrate the log entry to the new format
*/
for(final MvccLogEntry entry: logEntries){
-
final MutationBatch mb = mvccLogEntrySerializationStrategy.write( message.scope, entry );
totalBatch.mergeShallow( mb );
@@ -223,24 +219,18 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
- //schedule our cleanup task to clean up all the data
- final EntityVersionCleanupTask task = entityVersionCleanupFactory
- .getCleanupTask( message.scope, message.entity.getId(), version, false );
- entityVersionCleanupTasks.add( task );
}
executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong );
//now run our cleanup task
- for ( EntityVersionCleanupTask entityVersionCleanupTask : entityVersionCleanupTasks ) {
- try {
- entityVersionCleanupTask.call();
- }
- catch ( Exception e ) {
- LOGGER.error( "Unable to run cleanup task", e );
- }
+ for ( Id updatedId : toSaveIds ) {
+
+
+
+
}
} ).subscribeOn( Schedulers.io() );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index baceeb4..36faf62 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -91,8 +91,7 @@ public class EntityCollectionManagerIT {
public void write() {
- ApplicationScope context =
- new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
Entity newEntity = new Entity( new SimpleId( "test" ) );
@@ -113,8 +112,7 @@ public class EntityCollectionManagerIT {
public void writeWithUniqueValues() {
- ApplicationScope context =
- new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -146,8 +144,7 @@ public class EntityCollectionManagerIT {
public void writeAndLoad() {
- ApplicationScope context =
- new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
Entity newEntity = new Entity( new SimpleId( "test" ) );
@@ -174,7 +171,7 @@ public class EntityCollectionManagerIT {
public void writeLoadDelete() {
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
Entity newEntity = new Entity( new SimpleId( "test" ) );
EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -185,30 +182,27 @@ public class EntityCollectionManagerIT {
assertNotNull( "Id was assigned", createReturned.getId() );
-
- UUID version = createReturned.getVersion();
-
Observable<Entity> loadObservable = manager.load( createReturned.getId() );
Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null );
assertEquals( "Same value", createReturned, loadReturned );
- manager.delete( createReturned.getId() ).toBlocking().last();
+ manager.mark( createReturned.getId() ).toBlocking().last();
loadObservable = manager.load( createReturned.getId() );
//load may return null, use last or default
loadReturned = loadObservable.toBlocking().lastOrDefault( null );
- assertNull("Entity was deleted", loadReturned);
+ assertNull( "Entity was deleted", loadReturned );
}
@Test
public void writeLoadUpdateLoad() {
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
Entity newEntity = new Entity( new SimpleId( "test" ) );
newEntity.setField( new IntegerField( "counter", 1 ) );
@@ -217,36 +211,36 @@ public class EntityCollectionManagerIT {
Observable<Entity> observable = manager.write( newEntity );
- Entity createReturned = observable.toBlocking().lastOrDefault(null);
+ Entity createReturned = observable.toBlocking().lastOrDefault( null );
assertNotNull( "Id was assigned", createReturned.getId() );
- Observable<Entity> loadObservable = manager.load(createReturned.getId());
+ Observable<Entity> loadObservable = manager.load( createReturned.getId() );
Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null );
assertEquals( "Same value", createReturned, loadReturned );
- assertEquals("Field value correct", createReturned.getField("counter"), loadReturned.getField("counter"));
+ assertEquals( "Field value correct", createReturned.getField( "counter" ), loadReturned.getField( "counter" ) );
//update the field to 2
createReturned.setField( new IntegerField( "counter", 2 ) );
//wait for the write to complete
- manager.write( createReturned ).toBlocking().lastOrDefault(null);
+ manager.write( createReturned ).toBlocking().lastOrDefault( null );
loadObservable = manager.load( createReturned.getId() );
- loadReturned = loadObservable.toBlocking().lastOrDefault(null);
+ loadReturned = loadObservable.toBlocking().lastOrDefault( null );
assertEquals( "Same value", createReturned, loadReturned );
- assertEquals("Field value correct", createReturned.getField("counter"), loadReturned.getField("counter"));
+ assertEquals( "Field value correct", createReturned.getField( "counter" ), loadReturned.getField( "counter" ) );
}
@@ -254,30 +248,30 @@ public class EntityCollectionManagerIT {
public void writeAndLoadScopeClosure() {
- ApplicationScope collectionScope1 = new ApplicationScopeImpl(new SimpleId("organization"));
+ ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
Entity newEntity = new Entity( new SimpleId( "test" ) );
- EntityCollectionManager manager = factory.createCollectionManager(collectionScope1);
+ EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 );
- Observable<Entity> observable = manager.write(newEntity);
+ Observable<Entity> observable = manager.write( newEntity );
Entity createReturned = observable.toBlocking().lastOrDefault( null );
- assertNotNull("Id was assigned", createReturned.getId());
- assertNotNull("Version was assigned", createReturned.getVersion());
+ assertNotNull( "Id was assigned", createReturned.getId() );
+ assertNotNull( "Version was assigned", createReturned.getVersion() );
Observable<Entity> loadObservable = manager.load( createReturned.getId() );
Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null );
- assertEquals("Same value", createReturned, loadReturned);
+ assertEquals( "Same value", createReturned, loadReturned );
- ApplicationScope collectionScope2 = new ApplicationScopeImpl(new SimpleId("organization"));
+ ApplicationScope collectionScope2 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
//now make sure we can't load it from another scope, using the same org
@@ -286,9 +280,7 @@ public class EntityCollectionManagerIT {
Entity loaded = manager2.load( createReturned.getId() ).toBlocking().lastOrDefault( null );
- assertNull("CollectionScope works correctly", loaded);
-
-
+ assertNull( "CollectionScope works correctly", loaded );
}
@@ -296,34 +288,32 @@ public class EntityCollectionManagerIT {
public void writeAndGetField() {
- ApplicationScope collectionScope1 = new ApplicationScopeImpl(new SimpleId("organization"));
+ ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
Entity newEntity = new Entity( new SimpleId( "test" ) );
Field field = new StringField( "testField", "unique", true );
- newEntity.setField(field);
+ newEntity.setField( field );
EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 );
- Observable<Entity> observable = manager.write(newEntity);
+ Observable<Entity> observable = manager.write( newEntity );
Entity createReturned = observable.toBlocking().lastOrDefault( null );
assertNotNull( "Id was assigned", createReturned.getId() );
- assertNotNull("Version was assigned", createReturned.getVersion());
+ assertNotNull( "Version was assigned", createReturned.getVersion() );
Id id = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null );
assertNotNull( id );
assertEquals( newEntity.getId(), id );
Field fieldNull = new StringField( "testFieldNotThere", "uniquely", true );
- id = manager.getIdField( newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
+ id = manager.getIdField( newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
assertNull( id );
}
-
-
@Test
public void updateVersioning() {
@@ -331,10 +321,10 @@ public class EntityCollectionManagerIT {
Entity origEntity = new Entity( new SimpleId( "testUpdate" ) );
origEntity.setField( new StringField( "testField", "value" ) );
- ApplicationScope context = new ApplicationScopeImpl(new SimpleId("organization"));
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
EntityCollectionManager manager = factory.createCollectionManager( context );
- Entity returned = manager.write( origEntity ).toBlocking().lastOrDefault(null);
+ Entity returned = manager.write( origEntity ).toBlocking().lastOrDefault( null );
// note its version
UUID oldVersion = returned.getVersion();
@@ -345,7 +335,7 @@ public class EntityCollectionManagerIT {
// partial update entity but we don't have version number
Entity updateEntity = new Entity( origEntity.getId() );
updateEntity.setField( new StringField( "addedField", "other value" ) );
- manager.write( updateEntity ).toBlocking().lastOrDefault(null);
+ manager.write( updateEntity ).toBlocking().lastOrDefault( null );
// get entity now, it must have a new version
returned = manager.load( origEntity.getId() ).toBlocking().lastOrDefault( null );
@@ -354,14 +344,14 @@ public class EntityCollectionManagerIT {
assertNotNull( "A new version must be assigned", newVersion );
// new Version should be > old version
- assertTrue(UUIDComparator.staticCompare(newVersion, oldVersion) > 0);
+ assertTrue( UUIDComparator.staticCompare( newVersion, oldVersion ) > 0 );
}
@Test
public void writeMultiget() {
- final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
final EntityCollectionManager manager = factory.createCollectionManager( context );
final int multigetSize = serializationFig.getMaxLoadSize();
@@ -381,10 +371,10 @@ public class EntityCollectionManagerIT {
final EntitySet entitySet = manager.load( entityIds ).toBlocking().lastOrDefault( null );
- assertNotNull(entitySet);
+ assertNotNull( entitySet );
assertEquals( multigetSize, entitySet.size() );
- assertFalse(entitySet.isEmpty());
+ assertFalse( entitySet.isEmpty() );
/**
* Validate every element exists
@@ -405,7 +395,7 @@ public class EntityCollectionManagerIT {
@Test
public void writeMultigetRepair() {
- final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
final EntityCollectionManager manager = factory.createCollectionManager( context );
final int multigetSize = serializationFig.getMaxLoadSize();
@@ -444,7 +434,7 @@ public class EntityCollectionManagerIT {
assertEquals( "Same entity returned", expected, returned.getEntity().get() );
- assertTrue((Boolean) returned.getEntity().get().getField("updated").getValue());
+ assertTrue( ( Boolean ) returned.getEntity().get().getField( "updated" ).getValue() );
}
}
@@ -452,7 +442,7 @@ public class EntityCollectionManagerIT {
@Test( expected = IllegalArgumentException.class )
public void readTooLarge() {
- final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
final EntityCollectionManager manager = factory.createCollectionManager( context );
final int multigetSize = serializationFig.getMaxLoadSize() + 1;
@@ -474,7 +464,7 @@ public class EntityCollectionManagerIT {
@Test
public void testGetVersion() {
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
final EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -495,7 +485,7 @@ public class EntityCollectionManagerIT {
VersionSet results =
- manager.getLatestVersion( Arrays.asList( created1.getId(), created2.getId() ) ).toBlocking().last();
+ manager.getLatestVersion( Arrays.asList( created1.getId(), created2.getId() ) ).toBlocking().last();
final MvccLogEntry version1Log = results.getMaxVersion( created1.getId() );
@@ -515,7 +505,7 @@ public class EntityCollectionManagerIT {
@Test
public void testVersionLogWrite() {
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
final EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -529,10 +519,10 @@ public class EntityCollectionManagerIT {
final UUID v1Version = v1Created.getVersion();
- final VersionSet resultsV1 = manager.getLatestVersion(Arrays.asList(v1Created.getId())).toBlocking().last();
+ final VersionSet resultsV1 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last();
- final MvccLogEntry version1Log = resultsV1.getMaxVersion(v1Created.getId());
+ final MvccLogEntry version1Log = resultsV1.getMaxVersion( v1Created.getId() );
assertEquals( v1Created.getId(), version1Log.getEntityId() );
assertEquals( v1Version, version1Log.getVersion() );
assertEquals( MvccLogEntry.State.COMPLETE, version1Log.getState() );
@@ -543,7 +533,7 @@ public class EntityCollectionManagerIT {
final UUID v2Version = v2Created.getVersion();
- assertTrue("Newer version in v2", UUIDComparator.staticCompare(v2Version, v1Version) > 0);
+ assertTrue( "Newer version in v2", UUIDComparator.staticCompare( v2Version, v1Version ) > 0 );
final VersionSet resultsV2 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last();
@@ -560,7 +550,7 @@ public class EntityCollectionManagerIT {
@Test
public void testVersionLogUpdate() {
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
final EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -608,7 +598,7 @@ public class EntityCollectionManagerIT {
@Test
public void healthTest() {
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
final EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -632,7 +622,7 @@ public class EntityCollectionManagerIT {
final Entity entity = EntityHelper.generateEntity( setSize );
//now we have one massive, entity, save it and retrieve it.
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
final EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -652,20 +642,21 @@ public class EntityCollectionManagerIT {
SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", currentMaxSize + "" );
}
+
@Test
public void invalidNameRepair() throws ConnectionException {
//write an entity with a unique field
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
Entity newEntity = new Entity( new SimpleId( "test" ) );
//if we add a second field we get a second entity that is the exact same. Is this expected?
- final IntegerField expectedInteger = new IntegerField( "count", 5, true );
- // final StringField expectedString = new StringField( "yes", "fred", true );
+ final IntegerField expectedInteger = new IntegerField( "count", 5, true );
+ // final StringField expectedString = new StringField( "yes", "fred", true );
newEntity.setField( expectedInteger );
- // newEntity.setField( expectedString );
+ // newEntity.setField( expectedString );
EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -677,23 +668,26 @@ public class EntityCollectionManagerIT {
assertNotNull( "Id was assigned", createReturned.getId() );
assertNotNull( "Version was assigned", createReturned.getVersion() );
- FieldSet
- fieldResults = manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) ).toBlocking().last();
+ FieldSet fieldResults =
+ manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) )
+ .toBlocking().last();
- assertEquals(1,fieldResults.size());
+ assertEquals( 1, fieldResults.size() );
//verify the entity is correct.
- assertEquals( "Same value", createReturned, fieldResults.getEntity( expectedInteger ).getEntity().get()); //loadReturned );
+ assertEquals( "Same value", createReturned,
+ fieldResults.getEntity( expectedInteger ).getEntity().get() ); //loadReturned );
//use the entity serializationStrategy to remove the entity data.
//do a mark as one test, and a delete as another
- entitySerializationStrategy.delete( context,createReturned.getId(),createReturned.getVersion() ).execute();
+ entitySerializationStrategy.delete( context, createReturned.getId(), createReturned.getVersion() ).execute();
//try to load via the unique field, should have triggered repair
- final FieldSet
- results = manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) ).toBlocking().last();
+ final FieldSet results =
+ manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) )
+ .toBlocking().last();
//verify no entity returned
@@ -701,37 +695,104 @@ public class EntityCollectionManagerIT {
//user the unique serialization to verify it's been deleted from cassandra
- UniqueValueSet uniqueValues = uniqueValueSerializationStrategy.load( context, newEntity.getId().getType(), createReturned.getFields() );
+ UniqueValueSet uniqueValues =
+ uniqueValueSerializationStrategy.load( context, newEntity.getId().getType(), createReturned.getFields() );
assertFalse( uniqueValues.iterator().hasNext() );
-
}
@Test
public void testGetIdField() throws Exception {
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
EntityCollectionManager manager = factory.createCollectionManager( context );
// create an entity of type "item" with a unique_id field value = 1
Entity entity1 = new Entity( new SimpleId( "item" ) );
- entity1.setField( new StringField( "unique_id", "1", true ));
+ entity1.setField( new StringField( "unique_id", "1", true ) );
manager.write( entity1 ).toBlocking().last();
- final Observable<Id> idObs = manager.getIdField("item", new StringField("unique_id", "1"));
- Id id = idObs.toBlocking().lastOrDefault(null);
- assertEquals(entity1.getId(), id);
+ final Observable<Id> idObs = manager.getIdField( "item", new StringField( "unique_id", "1" ) );
+ Id id = idObs.toBlocking().lastOrDefault( null );
+ assertEquals( entity1.getId(), id );
// create an entity of type "deleted_item" with a unique_id field value = 1
Entity entity2 = new Entity( new SimpleId( "deleted_item" ) );
- entity2.setField( new StringField( "unique_id", "1", true ));
+ entity2.setField( new StringField( "unique_id", "1", true ) );
manager = factory.createCollectionManager( context );
manager.write( entity2 ).toBlocking().last();
- final Observable<Id> id2Obs = manager.getIdField("deleted_item", new StringField("unique_id", "1"));
- Id id2 = id2Obs.toBlocking().lastOrDefault(null);
- assertEquals(entity2.getId(), id2);
+ final Observable<Id> id2Obs = manager.getIdField( "deleted_item", new StringField( "unique_id", "1" ) );
+ Id id2 = id2Obs.toBlocking().lastOrDefault( null );
+ assertEquals( entity2.getId(), id2 );
+ }
+
+
+ @Test
+ public void writeGetVersionsDelete() {
+
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+
+ Entity entity = new Entity( new SimpleId( "test" ) );
+ entity.setField( new IntegerField( "counter", 0 ) );
+
+ EntityCollectionManager manager = factory.createCollectionManager( context );
+
+ Entity createReturned = manager.write( entity ).toBlocking().lastOrDefault( null );
+
+ assertNotNull( "Id was assigned", createReturned.getId() );
+
+ final int size = 200;
+
+ final Id entityId = createReturned.getId();
+
+ List<UUID> versions = new ArrayList<>( size );
+ versions.add( entity.getVersion() );
+
+ //write new versions
+ for ( int i = 1; i < size; i++ ) {
+ final Entity newEntity = new Entity( entityId );
+
+ final Entity returnedEntity = manager.write( newEntity ).toBlocking().last();
+
+ versions.add( returnedEntity.getVersion() );
+ }
+
+
+ //now get our values, and load the latest version
+
+ final Entity lastVersion = manager.load( entityId ).toBlocking().last();
+
+ //ensure the latest version is correct
+ assertEquals( versions.get( versions.size() - 1 ), lastVersion.getVersion() );
+
+
+ // now ensure all versions are correct
+ final List<MvccLogEntry> entries = manager.getVersions( entityId ).toList().toBlocking().last();
+
+
+ assertEquals( "Same size expected", versions.size(), entries.size() );
+
+ for ( int i = 0; i < versions.size(); i++ ) {
+ assertEquals( versions.get( i ), entries.get( i ).getVersion() );
+ }
+
+
+ //now get all the log versions, and delete them all we do it in 2+ batches to ensure we clean up as expected
+ manager.getVersions( entityId ).buffer( 100 ).flatMap( bufferList -> manager.delete( bufferList ) )
+ .toBlocking().last();
+
+
+ //now load them, there shouldn't be any versions
+ final List<MvccLogEntry> postDeleteEntries = manager.getVersions( entityId ).toList().toBlocking().last();
+
+ assertEquals( "All log entries should be removed", 0, postDeleteEntries.size() );
+
+ final Entity postDeleteLastVersion = manager.load( entityId ).toBlocking().lastOrDefault( null );
+
+ //ensure the latest version is correct
+ assertNull( "Last version was deleted", postDeleteLastVersion );
}
}