You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/07 18:40:58 UTC
[3/5] incubator-usergrid git commit: First pass at refactoring mark +
sweep
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
deleted file mode 100644
index e5c2896..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package org.apache.usergrid.persistence.collection.serialization.impl;
-
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Preconditions;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
- */
-public class LogEntryIterator implements Iterator<MvccLogEntry> {
-
-
- private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
- private final ApplicationScope scope;
- private final Id entityId;
- private final int pageSize;
-
-
- private Iterator<MvccLogEntry> elementItr;
- private UUID nextStart;
-
-
- /**
- * @param logEntrySerializationStrategy The serialization strategy to get the log entries
- * @param scope The scope of the entity
- * @param entityId The id of the entity
- * @param maxVersion The max version of the entity. Iterator will iterate from max to min starting with the version
- * < max
- * @param pageSize The fetch size to get when querying the serialization strategy
- */
- public LogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
- final ApplicationScope scope, final Id entityId, final UUID maxVersion,
- final int pageSize ) {
-
- Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" );
-
- this.logEntrySerializationStrategy = logEntrySerializationStrategy;
- this.scope = scope;
- this.entityId = entityId;
- this.nextStart = maxVersion;
- this.pageSize = pageSize;
- }
-
-
- @Override
- public boolean hasNext() {
- if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
- try {
- advance();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to query cassandra", e );
- }
- }
-
- return elementItr.hasNext();
- }
-
-
- @Override
- public MvccLogEntry next() {
- if ( !hasNext() ) {
- throw new NoSuchElementException( "No more elements exist" );
- }
-
- return elementItr.next();
- }
-
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException( "Remove is unsupported" );
- }
-
-
- /**
- * Advance our iterator
- */
- public void advance() throws ConnectionException {
-
- final int requestedSize = pageSize + 1;
-
- //loop through even entry that's < this one and remove it
- List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize );
-
- //we always remove the first version if it's equal since it's returned
- if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
- results.remove( 0 );
- }
-
-
- //we have results, set our next start
- if ( results.size() == pageSize ) {
- nextStart = results.get( results.size() - 1 ).getVersion();
- }
- //nothing left to do
- else {
- nextStart = null;
- }
-
- elementItr = results.iterator();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java
new file mode 100644
index 0000000..072e0ea
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+
+import rx.Observable;
+import rx.Subscriber;
+
+
+/**
+ * An observable that emits log entries from MIN to MAX
+ */
+public class LogEntryObservable implements Observable.OnSubscribe<MvccLogEntry>{
+
+ private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
+
+
+ public LogEntryObservable( final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy ) {
+ this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
+ }
+
+
+ @Override
+ public void call( final Subscriber<? super MvccLogEntry> subscriber ) {
+
+ subscriber.onStart();
+
+ while(!subscriber.isUnsubscribed()){
+
+
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
new file mode 100644
index 0000000..a8e15a7
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
@@ -0,0 +1,114 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
+ */
+public class MinMaxLogEntryIterator implements Iterator<MvccLogEntry> {
+
+
+ private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+ private final ApplicationScope scope;
+ private final Id entityId;
+ private final int pageSize;
+
+
+ private Iterator<MvccLogEntry> elementItr;
+ private UUID nextStart;
+
+
+ /**
+ * @param logEntrySerializationStrategy The serialization strategy to get the log entries
+ * @param scope The scope of the entity
+ * @param entityId The id of the entity
+ * @param maxVersion The max version of the entity. Iterator will iterate from min to min starting with the version
+ * < max
+ * @param pageSize The fetch size to get when querying the serialization strategy
+ */
+ public MinMaxLogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final ApplicationScope scope, final Id entityId, final UUID maxVersion,
+ final int pageSize ) {
+
+ Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" );
+
+ this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+ this.scope = scope;
+ this.entityId = entityId;
+ this.nextStart = maxVersion;
+ this.pageSize = pageSize;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
+ try {
+ advance();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to query cassandra", e );
+ }
+ }
+
+ return elementItr.hasNext();
+ }
+
+
+ @Override
+ public MvccLogEntry next() {
+ if ( !hasNext() ) {
+ throw new NoSuchElementException( "No more elements exist" );
+ }
+
+ return elementItr.next();
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "Remove is unsupported" );
+ }
+
+
+ /**
+ * Advance our iterator
+ */
+ public void advance() throws ConnectionException {
+
+ final int requestedSize = pageSize + 1;
+
+ //loop through even entry that's < this one and remove it
+ List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize );
+
+ //we always remove the first version if it's equal since it's returned
+ if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
+ results.remove( 0 );
+ }
+
+
+ //we have results, set our next start
+ if ( results.size() == pageSize ) {
+ nextStart = results.get( results.size() - 1 ).getVersion();
+ }
+ //nothing left to do
+ else {
+ nextStart = null;
+ }
+
+ elementItr = results.iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
index 81c0248..fdef3c3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
@@ -111,6 +111,20 @@ public class MvccLogEntrySerializationProxyImpl implements MvccLogEntrySerializa
@Override
+ public List<MvccLogEntry> loadReversed( final ApplicationScope applicationScope, final Id entityId,
+ final UUID minVersion, final int maxSize ) {
+
+ final MigrationRelationship<MvccLogEntrySerializationStrategy> migration = getMigrationRelationShip();
+
+ if ( migration.needsMigration() ) {
+ return migration.from.loadReversed( applicationScope, entityId, minVersion, maxSize );
+ }
+
+ return migration.to.loadReversed( applicationScope, entityId, minVersion, maxSize );
+ }
+
+
+ @Override
public MutationBatch delete( final ApplicationScope applicationScope, final Id entityId, final UUID version ) {
final MigrationRelationship<MvccLogEntrySerializationStrategy> migration = getMigrationRelationShip();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
index 76e9dba..73804e4 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -31,11 +30,6 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.IntegerType;
-import org.apache.cassandra.db.marshal.ReversedType;
-import org.apache.cassandra.db.marshal.UUIDType;
-
import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.collection.VersionSet;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
@@ -43,10 +37,7 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.impl.util.LegacyScopeUtils;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -126,12 +117,10 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
//didnt put the max in the error message, I don't want to take the string construction hit every time
Preconditions.checkArgument( entityIds.size() <= fig.getMaxLoadSize(),
- "requested size cannot be over configured maximum" );
+ "requested size cannot be over configured maximum" );
final Id applicationId = collectionScope.getApplication();
- final Id ownerId = applicationId;
-
final List<ScopedRowKey<K>> rowKeys = new ArrayList<>( entityIds.size() );
@@ -155,7 +144,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
}
catch ( ConnectionException e ) {
throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra",
- e );
+ e );
}
@@ -181,7 +170,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
final StageStatus stageStatus = column.getValue( SER );
final MvccLogEntry logEntry =
- new MvccLogEntryImpl( entityId, version, stageStatus.stage, stageStatus.state );
+ new MvccLogEntryImpl( entityId, version, stageStatus.stage, stageStatus.state );
versionResults.addEntry( logEntry );
@@ -208,13 +197,41 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
final ScopedRowKey<K> rowKey = createKey( applicationId, entityId );
+ columns =
+ keyspace.prepareQuery( CF_ENTITY_LOG ).getKey( rowKey ).withColumnRange( version, null, false, maxSize )
+ .execute().getResult();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to load log entries", e );
+ }
+
+ return parseResults( columns, entityId );
+ }
+
+
+ @Override
+ public List<MvccLogEntry> loadReversed( final ApplicationScope applicationScope, final Id entityId,
+ final UUID minVersion, final int maxSize ) {
+ ColumnList<UUID> columns;
+ try {
+
+ final Id applicationId = applicationScope.getApplication();
+
+ final ScopedRowKey<K> rowKey = createKey( applicationId, entityId );
+
+
columns = keyspace.prepareQuery( CF_ENTITY_LOG ).getKey( rowKey )
- .withColumnRange( version, null, false, maxSize ).execute().getResult();
+ .withColumnRange( minVersion, null, true, maxSize ).execute().getResult();
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to load log entries", e );
}
+ return parseResults( columns, entityId );
+ }
+
+
+ private List<MvccLogEntry> parseResults( final ColumnList<UUID> columns, final Id entityId ) {
List<MvccLogEntry> results = new ArrayList<MvccLogEntry>( columns.size() );
@@ -245,8 +262,6 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
}
-
-
/**
* Simple callback to perform puts and deletes with a common row setup code
*/
@@ -281,12 +296,14 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
return batch;
}
+
protected abstract MultiTennantColumnFamily<ScopedRowKey<K>, UUID> getColumnFamily();
- protected abstract ScopedRowKey<K> createKey(final Id applicationId, final Id entityId);
+ protected abstract ScopedRowKey<K> createKey( final Id applicationId, final Id entityId );
+
+ protected abstract Id getEntityIdFromKey( final ScopedRowKey<K> key );
- protected abstract Id getEntityIdFromKey(final ScopedRowKey<K> key);
/**
* Internal stage shard
@@ -319,7 +336,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
*/
private static class StatusCache {
private Map<Integer, MvccLogEntry.State> values =
- new HashMap<Integer, MvccLogEntry.State>( MvccLogEntry.State.values().length );
+ new HashMap<Integer, MvccLogEntry.State>( MvccLogEntry.State.values().length );
private StatusCache() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index 3168817..e825bbc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -31,16 +31,12 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.impl.EntityVersionCleanupTask;
-import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV3Impl;
-import org.apache.usergrid.persistence.collection.serialization.impl.MvccLogEntrySerializationStrategyV2Impl;
import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
-import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyV2Impl;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
@@ -76,7 +72,6 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
private final Keyspace keyspace;
private final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions;
- private final EntityVersionTaskFactory entityVersionCleanupFactory;
private final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3;
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
@@ -85,13 +80,11 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
@Inject
public MvccEntityDataMigrationImpl( final Keyspace keyspace,
final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions,
- final EntityVersionTaskFactory entityVersionCleanupFactory,
final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3,
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy ) {
this.keyspace = keyspace;
this.allVersions = allVersions;
- this.entityVersionCleanupFactory = entityVersionCleanupFactory;
this.mvccEntitySerializationStrategyV3 = mvccEntitySerializationStrategyV3;
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
@@ -162,7 +155,8 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
atomicLong.addAndGet( entities.size() );
- List<EntityVersionCleanupTask> entityVersionCleanupTasks = new ArrayList( entities.size() );
+ final List<Id> toSaveIds = new ArrayList<>( entities.size() );
+
for ( EntityToSaveMessage message : entities ) {
final MutationBatch entityRewrite = migration.to.write( message.scope, message.entity );
@@ -187,6 +181,9 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
final UUID version = message.entity.getVersion();
+
+ toSaveIds.add( entityId );
+
// re-write the unique
// values
// but this
@@ -215,7 +212,6 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
* Migrate the log entry to the new format
*/
for(final MvccLogEntry entry: logEntries){
-
final MutationBatch mb = mvccLogEntrySerializationStrategy.write( message.scope, entry );
totalBatch.mergeShallow( mb );
@@ -223,24 +219,18 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
- //schedule our cleanup task to clean up all the data
- final EntityVersionCleanupTask task = entityVersionCleanupFactory
- .getCleanupTask( message.scope, message.entity.getId(), version, false );
- entityVersionCleanupTasks.add( task );
}
executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong );
//now run our cleanup task
- for ( EntityVersionCleanupTask entityVersionCleanupTask : entityVersionCleanupTasks ) {
- try {
- entityVersionCleanupTask.call();
- }
- catch ( Exception e ) {
- LOGGER.error( "Unable to run cleanup task", e );
- }
+ for ( Id updatedId : toSaveIds ) {
+
+
+
+
}
} ).subscribeOn( Schedulers.io() );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
deleted file mode 100644
index 8c26c5b..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ /dev/null
@@ -1,715 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
-
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.util.LogEntryMock;
-import org.apache.usergrid.persistence.collection.util.UniqueValueEntryMock;
-import org.apache.usergrid.persistence.collection.util.VersionGenerator;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Cleanup task tests
- */
-public class EntityVersionCleanupTaskTest {
-
- private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4, 0 );
-
-
- @AfterClass
- public static void shutdown() {
- taskExecutor.shutdown();
- }
-
-
- @Test( timeout = 10000 )
- public void noListenerOneVersion() throws Exception {
-
-
- final SerializationFig serializationFig = mock( SerializationFig.class );
-
- when( serializationFig.getBufferSize() ).thenReturn( 10 );
-
- final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
-
-
- final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
-
- final Keyspace keyspace = mock( Keyspace.class );
-
- final MutationBatch entityBatch = mock( MutationBatch.class );
-
- when( keyspace.prepareMutationBatch() ).thenReturn(
- mock( MutationBatch.class ) ) // don't care what happens to this one
- .thenReturn( entityBatch );
-
- // intentionally no events
- final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
-
- final Id applicationId = new SimpleId( "application" );
-
- final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
-
- final Id entityId = new SimpleId( "user" );
-
- final List<UUID> versions = VersionGenerator.generateVersions( 2 );
-
- // mock up a single log entry for our first test
- final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
-
-
- //get the version we're keeping, it's first in our list
- final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
-
- //mock up unique version output
- final UniqueValueEntryMock uniqueValueEntryMock =
- UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
-
-
- EntityVersionCleanupTask cleanupTask =
- new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
- version, false );
-
- final MutationBatch newBatch = mock( MutationBatch.class );
-
-
- // set up returning a mutator
- when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
-
- //return a new batch when it's called
- when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
-
-
- cleanupTask.call();
-
-
- //get the second field, this should be deleted
- final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
-
- final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
-
-
- //verify delete was invoked
- verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
-
- //verify the delete was invoked
- verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
-
- // verify it was run
- verify( entityBatch ).execute();
- }
-
-
- /**
- * Tests the cleanup task on the first version created
- */
- @Test( timeout = 10000 )
- public void noListenerNoVersions() throws Exception {
-
-
- final SerializationFig serializationFig = mock( SerializationFig.class );
-
- when( serializationFig.getBufferSize() ).thenReturn( 10 );
-
- final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
-
-
- final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
-
- final Keyspace keyspace = mock( Keyspace.class );
-
- final MutationBatch entityBatch = mock( MutationBatch.class );
-
- when( keyspace.prepareMutationBatch() ).thenReturn(
- mock( MutationBatch.class ) ) // don't care what happens to this one
- .thenReturn( entityBatch );
-
- // intentionally no events
- final Set<EntityVersionDeleted> listeners = new HashSet<>();
-
- final Id applicationId = new SimpleId( "application" );
-
- final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
-
- final Id entityId = new SimpleId( "user" );
-
-
- final List<UUID> versions = VersionGenerator.generateVersions( 1 );
-
- // mock up a single log entry, with no other entries
- final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
-
-
- //get the version we're keeping, it's first in our list
- final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
-
- //mock up unique version output
- final UniqueValueEntryMock uniqueValueEntryMock =
- UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
-
-
- EntityVersionCleanupTask cleanupTask =
- new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId, version, false );
-
- final MutationBatch newBatch = mock( MutationBatch.class );
-
-
- // set up returning a mutator
- when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
-
- //return a new batch when it's called
- when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
-
-
- cleanupTask.call();
-
-
- //verify delete was never invoked
- verify( uvss, never() ).delete( any( ApplicationScope.class ), any( UniqueValue.class ) );
-
- //verify the delete was never invoked
- verify( less, never() ).delete( any( ApplicationScope.class ), any( Id.class ), any( UUID.class ) );
- }
-
-
- @Test( timeout = 10000 )
- public void singleListenerSingleVersion() throws Exception {
-
-
- //create a latch for the event listener, and add it to the list of events
- final int sizeToReturn = 1;
-
- final CountDownLatch latch = new CountDownLatch( sizeToReturn );
-
- final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
-
- final Set<EntityVersionDeleted> listeners = new HashSet<>();
-
- listeners.add( eventListener );
-
-
- final SerializationFig serializationFig = mock( SerializationFig.class );
-
- when( serializationFig.getBufferSize() ).thenReturn( 10 );
-
- final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
-
- final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
-
- final Keyspace keyspace = mock( Keyspace.class );
-
- final MutationBatch entityBatch = mock( MutationBatch.class );
-
- when( keyspace.prepareMutationBatch() ).thenReturn(
- mock( MutationBatch.class ) ) // don't care what happens to this one
- .thenReturn( entityBatch );
-
-
- final Id applicationId = new SimpleId( "application" );
-
- final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
-
- final Id entityId = new SimpleId( "user" );
-
-
- final List<UUID> versions = VersionGenerator.generateVersions( 2 );
-
-
- // mock up a single log entry for our first test
- final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
-
-
- //get the version we're keeping, it's first in our list
- final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
-
- //mock up unique version output
- final UniqueValueEntryMock uniqueValueEntryMock =
- UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
-
-
- EntityVersionCleanupTask cleanupTask =
- new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
- version, false );
-
- final MutationBatch newBatch = mock( MutationBatch.class );
-
-
- // set up returning a mutator
- when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
-
- //return a new batch when it's called
- when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
-
-
- cleanupTask.call();
-
-
- //get the second field, this should be deleted
- final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
-
- final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
-
-
- //verify delete was invoked
- verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
-
- //verify the delete was invoked
- verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
-
- // verify it was run
- verify( entityBatch ).execute();
-
-
- //the latch was executed
- latch.await();
- }
-
-
- @Test//(timeout=10000)
- public void multipleListenerMultipleVersions() throws Exception {
-
- final SerializationFig serializationFig = mock( SerializationFig.class );
-
- when( serializationFig.getBufferSize() ).thenReturn( 10 );
-
-
- //create a latch for the event listener, and add it to the list of events
- final int sizeToReturn = 10;
-
- final CountDownLatch latch = new CountDownLatch( sizeToReturn / serializationFig.getBufferSize() * 3 );
-
- final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch );
- final EntityVersionDeletedTest listener2 = new EntityVersionDeletedTest( latch );
- final EntityVersionDeletedTest listener3 = new EntityVersionDeletedTest( latch );
-
- final Set<EntityVersionDeleted> listeners = new HashSet<>();
-
- listeners.add( listener1 );
- listeners.add( listener2 );
- listeners.add( listener3 );
-
-
-
-
- final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
-
- final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
-
- final Keyspace keyspace = mock( Keyspace.class );
-
- final MutationBatch entityBatch = mock( MutationBatch.class );
-
- when( keyspace.prepareMutationBatch() ).thenReturn(
- mock( MutationBatch.class ) ) // don't care what happens to this one
- .thenReturn( entityBatch );
-
-
-
-
- final Id applicationId = new SimpleId( "application" );
-
- final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
-
- final Id entityId = new SimpleId( "user" );
-
- final List<UUID> versions = VersionGenerator.generateVersions( 2 );
-
-
- // mock up a single log entry for our first test
- final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
-
-
- //get the version we're keeping, it's first in our list
- final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
-
- //mock up unique version output
- final UniqueValueEntryMock uniqueValueEntryMock =
- UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
-
-
- EntityVersionCleanupTask cleanupTask =
- new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
- version, false );
-
- final MutationBatch newBatch = mock( MutationBatch.class );
-
-
- // set up returning a mutator
- when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
-
- //return a new batch when it's called
- when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
-
-
- cleanupTask.call();
-
-
- //get the second field, this should be deleted
- final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
-
- final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
-
-
- //verify delete was invoked
- verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
-
- //verify the delete was invoked
- verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
-
- // verify it was run
- verify( entityBatch ).execute();
-
-
- //the latch was executed
- latch.await();
-
- //we deleted the version
- //verify we deleted everything
- //verify delete was invoked
- verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
-
- //verify the delete was invoked
- verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
-
- // verify it was run
- verify( entityBatch ).execute();
-
- //the latch was executed
- latch.await();
- }
-
-
- /**
- * Tests what happens when our listeners are VERY slow
- */
-// @Ignore( "Test is a work in progress" )
- @Test( timeout = 10000 )
- public void multipleListenerMultipleVersionsNoThreadsToRun()
- throws ExecutionException, InterruptedException, ConnectionException {
-
-
- final SerializationFig serializationFig = mock( SerializationFig.class );
-
- when( serializationFig.getBufferSize() ).thenReturn( 10 );
-
-
- //create a latch for the event listener, and add it to the list of events
- final int sizeToReturn = 10;
-
-
- final int listenerCount = 5;
-
- final CountDownLatch latch =
- new CountDownLatch( sizeToReturn / serializationFig.getBufferSize() * listenerCount );
- final Semaphore waitSemaphore = new Semaphore( 0 );
-
-
- final SlowListener listener1 = new SlowListener( latch, waitSemaphore );
- final SlowListener listener2 = new SlowListener( latch, waitSemaphore );
- final SlowListener listener3 = new SlowListener( latch, waitSemaphore );
- final SlowListener listener4 = new SlowListener( latch, waitSemaphore );
- final SlowListener listener5 = new SlowListener( latch, waitSemaphore );
-
- final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
-
- listeners.add( listener1 );
- listeners.add( listener2 );
- listeners.add( listener3 );
- listeners.add( listener4 );
- listeners.add( listener5 );
-
-
- final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
-
- final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
-
- final Keyspace keyspace = mock( Keyspace.class );
-
- final MutationBatch entityBatch = mock( MutationBatch.class );
-
- when( keyspace.prepareMutationBatch() ).thenReturn(
- mock( MutationBatch.class ) ) // don't care what happens to this one
- .thenReturn( entityBatch );
-
-
- final Id applicationId = new SimpleId( "application" );
-
- final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
-
- final Id entityId = new SimpleId( "user" );
-
-
- final List<UUID> versions = VersionGenerator.generateVersions( 2 );
-
- // mock up a single log entry for our first test
- final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
-
-
- //get the version we're keeping, it's first in our list
- final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
-
-
- //mock up unique version output
- final UniqueValueEntryMock uniqueValueEntryMock =
- UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
-
-
- EntityVersionCleanupTask cleanupTask =
- new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
- version, false);
-
- final MutationBatch newBatch = mock( MutationBatch.class );
-
-
- // set up returning a mutator
- when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
-
- //return a new batch when it's called
- when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
-
-
- //start the task
- ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
-
- /**
- * While we're not done, release latches every 200 ms
- */
- while ( !future.isDone() ) {
- Thread.sleep( 200 );
- waitSemaphore.release( listenerCount );
- }
-
- //wait for the task
- future.get();
-
-
- //get the second field, this should be deleted
- final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
-
- final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
-
-
- //verify delete was invoked
- verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
-
- //verify the delete was invoked
- verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
-
- // verify it was run
- verify( entityBatch ).execute();
-
-
- //the latch was executed
- latch.await();
-
- //we deleted the version
- //verify we deleted everything
- //verify delete was invoked
- verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
-
- //verify the delete was invoked
- verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
-
- // verify it was run
- verify( entityBatch ).execute();
-
- //the latch was executed
- latch.await();
-
-
- //the latch was executed
- latch.await();
- }
-
-
- /**
- * Tests that our task will run in the caller if there's no threads, ensures that the task runs
- */
- @Test( timeout = 10000 )
- public void singleListenerSingleVersionRejected()
- throws ExecutionException, InterruptedException, ConnectionException {
-
-
-
- //create a latch for the event listener, and add it to the list of events
- final int sizeToReturn = 1;
-
- final CountDownLatch latch = new CountDownLatch( sizeToReturn );
-
- final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
-
- final Set<EntityVersionDeleted> listeners = new HashSet<>();
-
- listeners.add( eventListener );
-
-
- final SerializationFig serializationFig = mock( SerializationFig.class );
-
- when( serializationFig.getBufferSize() ).thenReturn( 10 );
-
- final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
-
- final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
-
- final Keyspace keyspace = mock( Keyspace.class );
-
- final MutationBatch entityBatch = mock( MutationBatch.class );
-
- when( keyspace.prepareMutationBatch() ).thenReturn(
- mock( MutationBatch.class ) ) // don't care what happens to this one
- .thenReturn( entityBatch );
-
-
- final Id applicationId = new SimpleId( "application" );
-
- final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
-
- final Id entityId = new SimpleId( "user" );
-
-
- final List<UUID> versions = VersionGenerator.generateVersions( 2 );
-
-
- // mock up a single log entry for our first test
- final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
-
-
- //get the version we're keeping, it's first in our list
- final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
-
- //mock up unique version output
- final UniqueValueEntryMock uniqueValueEntryMock =
- UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
-
-
- EntityVersionCleanupTask cleanupTask =
- new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
- version, false );
-
- final MutationBatch newBatch = mock( MutationBatch.class );
-
-
- // set up returning a mutator
- when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
-
- //return a new batch when it's called
- when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
-
-
- cleanupTask.rejected();
-
-
- //get the second field, this should be deleted
- final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
-
- final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
-
-
- //verify delete was invoked
- verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
-
- //verify the delete was invoked
- verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
-
- // verify it was run
- verify( entityBatch ).execute();
-
-
- //the latch was executed
- latch.await();
- }
-
-
- private static class EntityVersionDeletedTest implements EntityVersionDeleted {
- final CountDownLatch invocationLatch;
-
-
- private EntityVersionDeletedTest( final CountDownLatch invocationLatch ) {
- this.invocationLatch = invocationLatch;
- }
-
-
- @Override
- public void versionDeleted( final ApplicationScope scope, final Id entityId,
- final List<MvccLogEntry> entityVersion ) {
- invocationLatch.countDown();
- }
- }
-
-
- private static class SlowListener extends EntityVersionDeletedTest {
- final Semaphore blockLatch;
-
-
- private SlowListener( final CountDownLatch invocationLatch, final Semaphore blockLatch ) {
- super( invocationLatch );
- this.blockLatch = blockLatch;
- }
-
-
- @Override
- public void versionDeleted( final ApplicationScope scope, final Id entityId,
- final List<MvccLogEntry> entityVersion ) {
-
- //wait for unblock to happen before counting down invocation latches
- try {
- blockLatch.acquire();
- }
- catch ( InterruptedException e ) {
- throw new RuntimeException( e );
- }
- super.versionDeleted( scope, entityId, entityVersion );
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
deleted file mode 100644
index e993fad..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Created task tests.
- */
-public class EntityVersionCreatedTaskTest {
-
- private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4, 0 );
-
- @AfterClass
- public static void shutdown() {
- taskExecutor.shutdown();
- }
-
-
- @Test(timeout=10000)
- public void noListener()
- throws ExecutionException, InterruptedException, ConnectionException {
-
- // create a latch for the event listener, and add it to the list of events
-
- final Set<EntityVersionCreated> listeners = mock( Set.class );
-
- when ( listeners.size()).thenReturn( 0 );
-
- final Id applicationId = new SimpleId( "application" );
-
- final ApplicationScope appScope = new ApplicationScopeImpl(applicationId);
-
- final Id entityId = new SimpleId( "user" );
- final Entity entity = new Entity( entityId );
-
- // start the task
-
- EntityVersionCreatedTask entityVersionCreatedTask =
- new EntityVersionCreatedTask( appScope, listeners, entity);
-
- try {
- entityVersionCreatedTask.call();
- }catch(Exception e){
- Assert.fail(e.getMessage());
- }
-
-
- // wait for the task
- // future.get();
-
- //mocked listener makes sure that the task is called
- verify( listeners ).size();
-
- }
- @Test(timeout=10000)
- public void oneListener()
- throws ExecutionException, InterruptedException, ConnectionException {
-
- // create a latch for the event listener, and add it to the list of events
-
- final int sizeToReturn = 1;
-
- final CountDownLatch latch = new CountDownLatch( sizeToReturn );
-
- final EntityVersionCreatedTest eventListener = new EntityVersionCreatedTest(latch);
-
- final Set<EntityVersionCreated> listeners = mock( Set.class );
- final Iterator<EntityVersionCreated> helper = mock(Iterator.class);
-
- when ( listeners.size()).thenReturn( 1 );
- when ( listeners.iterator()).thenReturn( helper );
- when ( helper.next() ).thenReturn( eventListener );
-
- final Id applicationId = new SimpleId( "application" );
-
- final ApplicationScope appScope = new ApplicationScopeImpl(applicationId);
-
- final Id entityId = new SimpleId( "user" );
- final Entity entity = new Entity( entityId );
-
- // start the task
-
- EntityVersionCreatedTask entityVersionCreatedTask =
- new EntityVersionCreatedTask( appScope, listeners, entity);
-
- try {
- entityVersionCreatedTask.call();
- }catch(Exception e){
-
- Assert.fail(e.getMessage());
- }
- //mocked listener makes sure that the task is called
- verify( listeners ).size();
- verify( listeners ).iterator();
- verify( helper ).next();
-
- }
-
- @Test(timeout=10000)
- public void multipleListener()
- throws ExecutionException, InterruptedException, ConnectionException {
-
- final int sizeToReturn = 3;
-
- final Set<EntityVersionCreated> listeners = mock( Set.class );
- final Iterator<EntityVersionCreated> helper = mock(Iterator.class);
-
- when ( listeners.size()).thenReturn( 3 );
- when ( listeners.iterator()).thenReturn( helper );
-
- final Id applicationId = new SimpleId( "application" );
-
- final ApplicationScope appScope = new ApplicationScopeImpl(applicationId);
-
- final Id entityId = new SimpleId( "user" );
- final Entity entity = new Entity( entityId );
-
- // start the task
-
- EntityVersionCreatedTask entityVersionCreatedTask =
- new EntityVersionCreatedTask( appScope, listeners, entity);
-
- final CountDownLatch latch = new CountDownLatch( sizeToReturn );
-
- final EntityVersionCreatedTest listener1 = new EntityVersionCreatedTest(latch);
- final EntityVersionCreatedTest listener2 = new EntityVersionCreatedTest(latch);
- final EntityVersionCreatedTest listener3 = new EntityVersionCreatedTest(latch);
-
- when ( helper.next() ).thenReturn( listener1,listener2,listener3);
-
- try {
- entityVersionCreatedTask.call();
- }catch(Exception e){
- ;
- }
- //ListenableFuture<Void> future = taskExecutor.submit( entityVersionCreatedTask );
-
- //wait for the task
- //intentionally fails due to difficulty mocking observable
-
- //mocked listener makes sure that the task is called
- verify( listeners ).size();
- //verifies that the observable made listener iterate.
- verify( listeners ).iterator();
- }
-
- @Test(timeout=10000)
- public void oneListenerRejected()
- throws ExecutionException, InterruptedException, ConnectionException {
-
- // create a latch for the event listener, and add it to the list of events
-
- final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 0, 0 );
-
- final int sizeToReturn = 1;
-
- final CountDownLatch latch = new CountDownLatch( sizeToReturn );
-
- final EntityVersionCreatedTest eventListener = new EntityVersionCreatedTest(latch);
-
- final Set<EntityVersionCreated> listeners = mock( Set.class );
- final Iterator<EntityVersionCreated> helper = mock(Iterator.class);
-
- when ( listeners.size()).thenReturn( 1 );
- when ( listeners.iterator()).thenReturn( helper );
- when ( helper.next() ).thenReturn( eventListener );
-
- final Id applicationId = new SimpleId( "application" );
-
- final ApplicationScope appScope = new ApplicationScopeImpl(applicationId);
-
- final Id entityId = new SimpleId( "user" );
- final Entity entity = new Entity( entityId );
-
- // start the task
-
- EntityVersionCreatedTask entityVersionCreatedTask =
- new EntityVersionCreatedTask( appScope, listeners, entity);
-
- entityVersionCreatedTask.rejected();
-
- //mocked listener makes sure that the task is called
- verify( listeners ).size();
- verify( listeners ).iterator();
- verify( helper ).next();
-
- }
-
- private static class EntityVersionCreatedTest implements EntityVersionCreated {
- final CountDownLatch invocationLatch;
-
- private EntityVersionCreatedTest( final CountDownLatch invocationLatch) {
- this.invocationLatch = invocationLatch;
- }
-
- @Override
- public void versionCreated( final ApplicationScope scope, final Entity entity ) {
- invocationLatch.countDown();
- }
- }
-}