You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/07 18:40:57 UTC
[2/5] incubator-usergrid git commit: First pass at refactoring mark +
sweep
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanupTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanupTest.java
new file mode 100644
index 0000000..c1f76f2
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanupTest.java
@@ -0,0 +1,712 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
+
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.util.LogEntryMock;
+import org.apache.usergrid.persistence.collection.util.UniqueValueEntryMock;
+import org.apache.usergrid.persistence.collection.util.VersionGenerator;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class UniqueCleanupTest {
+
+//
+// private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4, 0 );
+//
+//
+// @AfterClass
+// public static void shutdown() {
+// taskExecutor.shutdown();
+// }
+//
+//
+// @Test( timeout = 10000 )
+// public void noListenerOneVersion() throws Exception {
+//
+//
+// final SerializationFig serializationFig = mock( SerializationFig.class );
+//
+// when( serializationFig.getBufferSize() ).thenReturn( 10 );
+//
+// final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+//
+//
+// final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+//
+// final Keyspace keyspace = mock( Keyspace.class );
+//
+// final MutationBatch entityBatch = mock( MutationBatch.class );
+//
+// when( keyspace.prepareMutationBatch() ).thenReturn(
+// mock( MutationBatch.class ) ) // don't care what happens to this one
+// .thenReturn( entityBatch );
+//
+// // intentionally no events
+// final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
+//
+// final Id applicationId = new SimpleId( "application" );
+//
+// final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
+//
+// final Id entityId = new SimpleId( "user" );
+//
+// final List<UUID> versions = VersionGenerator.generateVersions( 2 );
+//
+// // mock up a single log entry for our first test
+// final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+//
+//
+// //get the version we're keeping, it's first in our list
+// final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+//
+// //mock up unique version output
+// final UniqueValueEntryMock uniqueValueEntryMock =
+// UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+//
+//
+// EntityVersionCleanupTask cleanupTask =
+// new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
+// version, false );
+//
+// final MutationBatch newBatch = mock( MutationBatch.class );
+//
+//
+// // set up returning a mutator
+// when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+//
+// //return a new batch when it's called
+// when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+//
+//
+// cleanupTask.call();
+//
+//
+// //get the second field, this should be deleted
+// final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
+//
+// final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
+//
+//
+// //verify delete was invoked
+// verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+//
+// //verify the delete was invoked
+// verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+//
+// // verify it was run
+// verify( entityBatch ).execute();
+// }
+//
+//
+// /**
+// * Tests the cleanup task on the first version created
+// */
+// @Test( timeout = 10000 )
+// public void noListenerNoVersions() throws Exception {
+//
+//
+// final SerializationFig serializationFig = mock( SerializationFig.class );
+//
+// when( serializationFig.getBufferSize() ).thenReturn( 10 );
+//
+// final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+//
+//
+// final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+//
+// final Keyspace keyspace = mock( Keyspace.class );
+//
+// final MutationBatch entityBatch = mock( MutationBatch.class );
+//
+// when( keyspace.prepareMutationBatch() ).thenReturn(
+// mock( MutationBatch.class ) ) // don't care what happens to this one
+// .thenReturn( entityBatch );
+//
+// // intentionally no events
+// final Set<EntityVersionDeleted> listeners = new HashSet<>();
+//
+// final Id applicationId = new SimpleId( "application" );
+//
+// final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
+//
+// final Id entityId = new SimpleId( "user" );
+//
+//
+// final List<UUID> versions = VersionGenerator.generateVersions( 1 );
+//
+// // mock up a single log entry, with no other entries
+// final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+//
+//
+// //get the version we're keeping, it's first in our list
+// final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+//
+// //mock up unique version output
+// final UniqueValueEntryMock uniqueValueEntryMock =
+// UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+//
+//
+// EntityVersionCleanupTask cleanupTask =
+// new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId, version, false );
+//
+// final MutationBatch newBatch = mock( MutationBatch.class );
+//
+//
+// // set up returning a mutator
+// when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+//
+// //return a new batch when it's called
+// when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+//
+//
+// cleanupTask.call();
+//
+//
+// //verify delete was never invoked
+// verify( uvss, never() ).delete( any( ApplicationScope.class ), any( UniqueValue.class ) );
+//
+// //verify the delete was never invoked
+// verify( less, never() ).delete( any( ApplicationScope.class ), any( Id.class ), any( UUID.class ) );
+// }
+//
+//
+// @Test( timeout = 10000 )
+// public void singleListenerSingleVersion() throws Exception {
+//
+//
+// //create a latch for the event listener, and add it to the list of events
+// final int sizeToReturn = 1;
+//
+// final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+//
+// final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
+//
+// final Set<EntityVersionDeleted> listeners = new HashSet<>();
+//
+// listeners.add( eventListener );
+//
+//
+// final SerializationFig serializationFig = mock( SerializationFig.class );
+//
+// when( serializationFig.getBufferSize() ).thenReturn( 10 );
+//
+// final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+//
+// final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+//
+// final Keyspace keyspace = mock( Keyspace.class );
+//
+// final MutationBatch entityBatch = mock( MutationBatch.class );
+//
+// when( keyspace.prepareMutationBatch() ).thenReturn(
+// mock( MutationBatch.class ) ) // don't care what happens to this one
+// .thenReturn( entityBatch );
+//
+//
+// final Id applicationId = new SimpleId( "application" );
+//
+// final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
+//
+// final Id entityId = new SimpleId( "user" );
+//
+//
+// final List<UUID> versions = VersionGenerator.generateVersions( 2 );
+//
+//
+// // mock up a single log entry for our first test
+// final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+//
+//
+// //get the version we're keeping, it's first in our list
+// final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+//
+// //mock up unique version output
+// final UniqueValueEntryMock uniqueValueEntryMock =
+// UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+//
+//
+// EntityVersionCleanupTask cleanupTask =
+// new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
+// version, false );
+//
+// final MutationBatch newBatch = mock( MutationBatch.class );
+//
+//
+// // set up returning a mutator
+// when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+//
+// //return a new batch when it's called
+// when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+//
+//
+// cleanupTask.call();
+//
+//
+// //get the second field, this should be deleted
+// final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
+//
+// final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
+//
+//
+// //verify delete was invoked
+// verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+//
+// //verify the delete was invoked
+// verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+//
+// // verify it was run
+// verify( entityBatch ).execute();
+//
+//
+// //the latch was executed
+// latch.await();
+// }
+//
+//
+// @Test//(timeout=10000)
+// public void multipleListenerMultipleVersions() throws Exception {
+//
+// final SerializationFig serializationFig = mock( SerializationFig.class );
+//
+// when( serializationFig.getBufferSize() ).thenReturn( 10 );
+//
+//
+// //create a latch for the event listener, and add it to the list of events
+// final int sizeToReturn = 10;
+//
+// final CountDownLatch latch = new CountDownLatch( sizeToReturn / serializationFig.getBufferSize() * 3 );
+//
+// final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch );
+// final EntityVersionDeletedTest listener2 = new EntityVersionDeletedTest( latch );
+// final EntityVersionDeletedTest listener3 = new EntityVersionDeletedTest( latch );
+//
+// final Set<EntityVersionDeleted> listeners = new HashSet<>();
+//
+// listeners.add( listener1 );
+// listeners.add( listener2 );
+// listeners.add( listener3 );
+//
+//
+//
+//
+// final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+//
+// final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+//
+// final Keyspace keyspace = mock( Keyspace.class );
+//
+// final MutationBatch entityBatch = mock( MutationBatch.class );
+//
+// when( keyspace.prepareMutationBatch() ).thenReturn(
+// mock( MutationBatch.class ) ) // don't care what happens to this one
+// .thenReturn( entityBatch );
+//
+//
+//
+//
+// final Id applicationId = new SimpleId( "application" );
+//
+// final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
+//
+// final Id entityId = new SimpleId( "user" );
+//
+// final List<UUID> versions = VersionGenerator.generateVersions( 2 );
+//
+//
+// // mock up a single log entry for our first test
+// final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+//
+//
+// //get the version we're keeping, it's first in our list
+// final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+//
+// //mock up unique version output
+// final UniqueValueEntryMock uniqueValueEntryMock =
+// UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+//
+//
+// EntityVersionCleanupTask cleanupTask =
+// new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
+// version, false );
+//
+// final MutationBatch newBatch = mock( MutationBatch.class );
+//
+//
+// // set up returning a mutator
+// when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+//
+// //return a new batch when it's called
+// when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+//
+//
+// cleanupTask.call();
+//
+//
+// //get the second field, this should be deleted
+// final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
+//
+// final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
+//
+//
+// //verify delete was invoked
+// verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+//
+// //verify the delete was invoked
+// verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+//
+// // verify it was run
+// verify( entityBatch ).execute();
+//
+//
+// //the latch was executed
+// latch.await();
+//
+// //we deleted the version
+// //verify we deleted everything
+// //verify delete was invoked
+// verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+//
+// //verify the delete was invoked
+// verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+//
+// // verify it was run
+// verify( entityBatch ).execute();
+//
+// //the latch was executed
+// latch.await();
+// }
+//
+//
+// /**
+// * Tests what happens when our listeners are VERY slow
+// */
+//// @Ignore( "Test is a work in progress" )
+// @Test( timeout = 10000 )
+// public void multipleListenerMultipleVersionsNoThreadsToRun()
+// throws ExecutionException, InterruptedException, ConnectionException {
+//
+//
+// final SerializationFig serializationFig = mock( SerializationFig.class );
+//
+// when( serializationFig.getBufferSize() ).thenReturn( 10 );
+//
+//
+// //create a latch for the event listener, and add it to the list of events
+// final int sizeToReturn = 10;
+//
+//
+// final int listenerCount = 5;
+//
+// final CountDownLatch latch =
+// new CountDownLatch( sizeToReturn / serializationFig.getBufferSize() * listenerCount );
+// final Semaphore waitSemaphore = new Semaphore( 0 );
+//
+//
+// final SlowListener listener1 = new SlowListener( latch, waitSemaphore );
+// final SlowListener listener2 = new SlowListener( latch, waitSemaphore );
+// final SlowListener listener3 = new SlowListener( latch, waitSemaphore );
+// final SlowListener listener4 = new SlowListener( latch, waitSemaphore );
+// final SlowListener listener5 = new SlowListener( latch, waitSemaphore );
+//
+// final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
+//
+// listeners.add( listener1 );
+// listeners.add( listener2 );
+// listeners.add( listener3 );
+// listeners.add( listener4 );
+// listeners.add( listener5 );
+//
+//
+// final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+//
+// final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+//
+// final Keyspace keyspace = mock( Keyspace.class );
+//
+// final MutationBatch entityBatch = mock( MutationBatch.class );
+//
+// when( keyspace.prepareMutationBatch() ).thenReturn(
+// mock( MutationBatch.class ) ) // don't care what happens to this one
+// .thenReturn( entityBatch );
+//
+//
+// final Id applicationId = new SimpleId( "application" );
+//
+// final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
+//
+// final Id entityId = new SimpleId( "user" );
+//
+//
+// final List<UUID> versions = VersionGenerator.generateVersions( 2 );
+//
+// // mock up a single log entry for our first test
+// final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+//
+//
+// //get the version we're keeping, it's first in our list
+// final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+//
+//
+// //mock up unique version output
+// final UniqueValueEntryMock uniqueValueEntryMock =
+// UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+//
+//
+// EntityVersionCleanupTask cleanupTask =
+// new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
+// version, false);
+//
+// final MutationBatch newBatch = mock( MutationBatch.class );
+//
+//
+// // set up returning a mutator
+// when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+//
+// //return a new batch when it's called
+// when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+//
+//
+// //start the task
+// ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
+//
+// /**
+// * While we're not done, release latches every 200 ms
+// */
+// while ( !future.isDone() ) {
+// Thread.sleep( 200 );
+// waitSemaphore.release( listenerCount );
+// }
+//
+// //wait for the task
+// future.get();
+//
+//
+// //get the second field, this should be deleted
+// final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
+//
+// final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
+//
+//
+// //verify delete was invoked
+// verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+//
+// //verify the delete was invoked
+// verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+//
+// // verify it was run
+// verify( entityBatch ).execute();
+//
+//
+// //the latch was executed
+// latch.await();
+//
+// //we deleted the version
+// //verify we deleted everything
+// //verify delete was invoked
+// verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+//
+// //verify the delete was invoked
+// verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+//
+// // verify it was run
+// verify( entityBatch ).execute();
+//
+// //the latch was executed
+// latch.await();
+//
+//
+// //the latch was executed
+// latch.await();
+// }
+//
+//
+// /**
+// * Tests that our task will run in the caller if there's no threads, ensures that the task runs
+// */
+// @Test( timeout = 10000 )
+// public void singleListenerSingleVersionRejected()
+// throws ExecutionException, InterruptedException, ConnectionException {
+//
+//
+//
+// //create a latch for the event listener, and add it to the list of events
+// final int sizeToReturn = 1;
+//
+// final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+//
+// final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
+//
+// final Set<EntityVersionDeleted> listeners = new HashSet<>();
+//
+// listeners.add( eventListener );
+//
+//
+// final SerializationFig serializationFig = mock( SerializationFig.class );
+//
+// when( serializationFig.getBufferSize() ).thenReturn( 10 );
+//
+// final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+//
+// final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+//
+// final Keyspace keyspace = mock( Keyspace.class );
+//
+// final MutationBatch entityBatch = mock( MutationBatch.class );
+//
+// when( keyspace.prepareMutationBatch() ).thenReturn(
+// mock( MutationBatch.class ) ) // don't care what happens to this one
+// .thenReturn( entityBatch );
+//
+//
+// final Id applicationId = new SimpleId( "application" );
+//
+// final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
+//
+// final Id entityId = new SimpleId( "user" );
+//
+//
+// final List<UUID> versions = VersionGenerator.generateVersions( 2 );
+//
+//
+// // mock up a single log entry for our first test
+// final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+//
+//
+// //get the version we're keeping, it's first in our list
+// final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+//
+// //mock up unique version output
+// final UniqueValueEntryMock uniqueValueEntryMock =
+// UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+//
+//
+// EntityVersionCleanupTask cleanupTask =
+// new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
+// version, false );
+//
+// final MutationBatch newBatch = mock( MutationBatch.class );
+//
+//
+// // set up returning a mutator
+// when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+//
+// //return a new batch when it's called
+// when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+//
+//
+// cleanupTask.rejected();
+//
+//
+// //get the second field, this should be deleted
+// final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
+//
+// final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
+//
+//
+// //verify delete was invoked
+// verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+//
+// //verify the delete was invoked
+// verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+//
+// // verify it was run
+// verify( entityBatch ).execute();
+//
+//
+// //the latch was executed
+// latch.await();
+// }
+//
+//
+// private static class EntityVersionDeletedTest implements EntityVersionDeleted {
+// final CountDownLatch invocationLatch;
+//
+//
+// private EntityVersionDeletedTest( final CountDownLatch invocationLatch ) {
+// this.invocationLatch = invocationLatch;
+// }
+//
+//
+// @Override
+// public void versionDeleted( final ApplicationScope scope, final Id entityId,
+// final List<MvccLogEntry> entityVersion ) {
+// invocationLatch.countDown();
+// }
+// }
+//
+//
+// private static class SlowListener extends EntityVersionDeletedTest {
+// final Semaphore blockLatch;
+//
+//
+// private SlowListener( final CountDownLatch invocationLatch, final Semaphore blockLatch ) {
+// super( invocationLatch );
+// this.blockLatch = blockLatch;
+// }
+//
+//
+// @Override
+// public void versionDeleted( final ApplicationScope scope, final Id entityId,
+// final List<MvccLogEntry> entityVersion ) {
+//
+// //wait for unblock to happen before counting down invocation latches
+// try {
+// blockLatch.acquire();
+// }
+// catch ( InterruptedException e ) {
+// throw new RuntimeException( e );
+// }
+// super.versionDeleted( scope, entityId, entityVersion );
+// }
+// }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompactTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompactTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompactTest.java
new file mode 100644
index 0000000..8ffba71
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompactTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
+
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class VersionCompactTest {
+
+// private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4, 0 );
+//
+// @AfterClass
+// public static void shutdown() {
+// taskExecutor.shutdown();
+// }
+//
+//
+// @Test(timeout=10000)
+// public void noListener()
+// throws ExecutionException, InterruptedException, ConnectionException {
+//
+// // create a latch for the event listener, and add it to the list of events
+//
+// final Set<EntityVersionCreated> listeners = mock( Set.class );
+//
+// when ( listeners.size()).thenReturn( 0 );
+//
+// final Id applicationId = new SimpleId( "application" );
+//
+// final ApplicationScope appScope = new ApplicationScopeImpl(applicationId);
+//
+// final Id entityId = new SimpleId( "user" );
+// final Entity entity = new Entity( entityId );
+//
+// // start the task
+//
+// EntityVersionCreatedTask entityVersionCreatedTask =
+// new EntityVersionCreatedTask( appScope, listeners, entity);
+//
+// try {
+// entityVersionCreatedTask.call();
+// }catch(Exception e){
+// Assert.fail( e.getMessage() );
+// }
+//
+//
+// // wait for the task
+// // future.get();
+//
+// //mocked listener makes sure that the task is called
+// verify( listeners ).size();
+//
+// }
+// @Test(timeout=10000)
+// public void oneListener()
+// throws ExecutionException, InterruptedException, ConnectionException {
+//
+// // create a latch for the event listener, and add it to the list of events
+//
+// final int sizeToReturn = 1;
+//
+// final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+//
+// final EntityVersionCreatedTest eventListener = new EntityVersionCreatedTest(latch);
+//
+// final Set<EntityVersionCreated> listeners = mock( Set.class );
+// final Iterator<EntityVersionCreated> helper = mock(Iterator.class);
+//
+// when ( listeners.size()).thenReturn( 1 );
+// when ( listeners.iterator()).thenReturn( helper );
+// when ( helper.next() ).thenReturn( eventListener );
+//
+// final Id applicationId = new SimpleId( "application" );
+//
+// final ApplicationScope appScope = new ApplicationScopeImpl(applicationId);
+//
+// final Id entityId = new SimpleId( "user" );
+// final Entity entity = new Entity( entityId );
+//
+// // start the task
+//
+// EntityVersionCreatedTask entityVersionCreatedTask =
+// new EntityVersionCreatedTask( appScope, listeners, entity);
+//
+// try {
+// entityVersionCreatedTask.call();
+// }catch(Exception e){
+//
+// Assert.fail(e.getMessage());
+// }
+// //mocked listener makes sure that the task is called
+// verify( listeners ).size();
+// verify( listeners ).iterator();
+// verify( helper ).next();
+//
+// }
+//
+// @Test(timeout=10000)
+// public void multipleListener()
+// throws ExecutionException, InterruptedException, ConnectionException {
+//
+// final int sizeToReturn = 3;
+//
+// final Set<EntityVersionCreated> listeners = mock( Set.class );
+// final Iterator<EntityVersionCreated> helper = mock(Iterator.class);
+//
+// when ( listeners.size()).thenReturn( 3 );
+// when ( listeners.iterator()).thenReturn( helper );
+//
+// final Id applicationId = new SimpleId( "application" );
+//
+// final ApplicationScope appScope = new ApplicationScopeImpl(applicationId);
+//
+// final Id entityId = new SimpleId( "user" );
+// final Entity entity = new Entity( entityId );
+//
+// // start the task
+//
+// EntityVersionCreatedTask entityVersionCreatedTask =
+// new EntityVersionCreatedTask( appScope, listeners, entity);
+//
+// final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+//
+// final EntityVersionCreatedTest listener1 = new EntityVersionCreatedTest(latch);
+// final EntityVersionCreatedTest listener2 = new EntityVersionCreatedTest(latch);
+// final EntityVersionCreatedTest listener3 = new EntityVersionCreatedTest(latch);
+//
+// when ( helper.next() ).thenReturn( listener1,listener2,listener3);
+//
+// try {
+// entityVersionCreatedTask.call();
+// }catch(Exception e){
+// ;
+// }
+// //ListenableFuture<Void> future = taskExecutor.submit( entityVersionCreatedTask );
+//
+// //wait for the task
+// //intentionally fails due to difficulty mocking observable
+//
+// //mocked listener makes sure that the task is called
+// verify( listeners ).size();
+// //verifies that the observable made listener iterate.
+// verify( listeners ).iterator();
+// }
+//
+// @Test(timeout=10000)
+// public void oneListenerRejected()
+// throws ExecutionException, InterruptedException, ConnectionException {
+//
+// // create a latch for the event listener, and add it to the list of events
+//
+// final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 0, 0 );
+//
+// final int sizeToReturn = 1;
+//
+// final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+//
+// final EntityVersionCreatedTest eventListener = new EntityVersionCreatedTest(latch);
+//
+// final Set<EntityVersionCreated> listeners = mock( Set.class );
+// final Iterator<EntityVersionCreated> helper = mock(Iterator.class);
+//
+// when ( listeners.size()).thenReturn( 1 );
+// when ( listeners.iterator()).thenReturn( helper );
+// when ( helper.next() ).thenReturn( eventListener );
+//
+// final Id applicationId = new SimpleId( "application" );
+//
+// final ApplicationScope appScope = new ApplicationScopeImpl(applicationId);
+//
+// final Id entityId = new SimpleId( "user" );
+// final Entity entity = new Entity( entityId );
+//
+// // start the task
+//
+// EntityVersionCreatedTask entityVersionCreatedTask =
+// new EntityVersionCreatedTask( appScope, listeners, entity);
+//
+// entityVersionCreatedTask.rejected();
+//
+// //mocked listener makes sure that the task is called
+// verify( listeners ).size();
+// verify( listeners ).iterator();
+// verify( helper ).next();
+//
+// }
+//
+// private static class EntityVersionCreatedTest implements EntityVersionCreated {
+// final CountDownLatch invocationLatch;
+//
+// private EntityVersionCreatedTest( final CountDownLatch invocationLatch) {
+// this.invocationLatch = invocationLatch;
+// }
+//
+// @Override
+// public void versionCreated( final ApplicationScope scope, final Entity entity ) {
+// invocationLatch.countDown();
+// }
+// }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
deleted file mode 100644
index 720e602..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-package org.apache.usergrid.persistence.collection.serialization.impl;
-
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.UUID;
-
-import org.junit.Test;
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.util.LogEntryMock;
-import org.apache.usergrid.persistence.collection.util.VersionGenerator;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Tests iterator paging
- */
-public class LogEntryIteratorTest {
-
-
- @Test
- public void empty() throws ConnectionException {
-
- final MvccLogEntrySerializationStrategy logEntrySerializationStrategy =
- mock( MvccLogEntrySerializationStrategy.class );
-
- final ApplicationScope scope =
- new ApplicationScopeImpl( new SimpleId( "application" ));
-
- final Id entityId = new SimpleId( "entity" );
-
- final int pageSize = 100;
-
-
- //set the start version, it should be discarded
- UUID start = UUIDGenerator.newTimeUUID();
-
- when( logEntrySerializationStrategy.load( same( scope ), same( entityId ), same( start ), same( pageSize ) ) )
- .thenReturn( new ArrayList<MvccLogEntry>() );
-
-
- //now iterate we should get everything
- LogEntryIterator itr = new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize );
-
-
- assertFalse( itr.hasNext() );
- }
-
-
- @Test
- public void partialLastPage() throws ConnectionException {
-
-
- final int pageSize = 10;
- final int totalPages = 3;
- final int lastPageSize = pageSize / 2;
-
- //have one half page
-
- pageElements( pageSize, totalPages, lastPageSize );
- }
-
-
- @Test
- public void emptyLastPage() throws ConnectionException {
-
-
- final int pageSize = 10;
- final int totalPages = 3;
- final int lastPageSize = 0;
-
- //have one half page
-
- pageElements( pageSize, totalPages, lastPageSize );
- }
-
-
- public void pageElements( final int pageSize, final int totalPages, final int lastPageSize )
- throws ConnectionException {
-
- final MvccLogEntrySerializationStrategy logEntrySerializationStrategy =
- mock( MvccLogEntrySerializationStrategy.class );
-
- final ApplicationScope scope =
- new ApplicationScopeImpl( new SimpleId( "application" ) );
-
- final Id entityId = new SimpleId( "entity" );
-
-
- //have one half page
- final int toGenerate = pageSize * totalPages + lastPageSize;
-
-
- final LogEntryMock mockResults =
- LogEntryMock.createLogEntryMock( logEntrySerializationStrategy, scope, entityId, VersionGenerator.generateVersions( toGenerate ) );
-
- Iterator<MvccLogEntry> expectedEntries = mockResults.getEntries().iterator();
-
- //this element should be skipped
- UUID start = expectedEntries.next().getVersion();
-
- //now iterate we should get everything
- LogEntryIterator itr = new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize );
-
-
- while ( expectedEntries.hasNext() && itr.hasNext() ) {
- final MvccLogEntry expected = expectedEntries.next();
-
- final MvccLogEntry returned = itr.next();
-
- assertEquals( expected, returned );
- }
-
-
- assertFalse( itr.hasNext() );
- assertFalse( expectedEntries.hasNext() );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java
new file mode 100644
index 0000000..c82e1bf
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java
@@ -0,0 +1,134 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.util.LogEntryMock;
+import org.apache.usergrid.persistence.collection.util.VersionGenerator;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Tests iterator paging
+ */
+public class MinMaxLogEntryIteratorTest {
+
+
+ @Test
+ public void empty() throws ConnectionException {
+
+ final MvccLogEntrySerializationStrategy logEntrySerializationStrategy =
+ mock( MvccLogEntrySerializationStrategy.class );
+
+ final ApplicationScope scope =
+ new ApplicationScopeImpl( new SimpleId( "application" ));
+
+ final Id entityId = new SimpleId( "entity" );
+
+ final int pageSize = 100;
+
+
+ //set the start version, it should be discarded
+ UUID start = UUIDGenerator.newTimeUUID();
+
+ when( logEntrySerializationStrategy.load( same( scope ), same( entityId ), same( start ), same( pageSize ) ) )
+ .thenReturn( new ArrayList<MvccLogEntry>() );
+
+
+ //now iterate we should get everything
+ MinMaxLogEntryIterator
+ itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize );
+
+
+ assertFalse( itr.hasNext() );
+ }
+
+
+ @Test
+ public void partialLastPage() throws ConnectionException {
+
+
+ final int pageSize = 10;
+ final int totalPages = 3;
+ final int lastPageSize = pageSize / 2;
+
+ //have one half page
+
+ pageElements( pageSize, totalPages, lastPageSize );
+ }
+
+
+ @Test
+ public void emptyLastPage() throws ConnectionException {
+
+
+ final int pageSize = 10;
+ final int totalPages = 3;
+ final int lastPageSize = 0;
+
+ //have one half page
+
+ pageElements( pageSize, totalPages, lastPageSize );
+ }
+
+
+ public void pageElements( final int pageSize, final int totalPages, final int lastPageSize )
+ throws ConnectionException {
+
+ final MvccLogEntrySerializationStrategy logEntrySerializationStrategy =
+ mock( MvccLogEntrySerializationStrategy.class );
+
+ final ApplicationScope scope =
+ new ApplicationScopeImpl( new SimpleId( "application" ) );
+
+ final Id entityId = new SimpleId( "entity" );
+
+
+ //have one half page
+ final int toGenerate = pageSize * totalPages + lastPageSize;
+
+
+ final LogEntryMock mockResults =
+ LogEntryMock.createLogEntryMock( logEntrySerializationStrategy, scope, entityId, VersionGenerator.generateVersions( toGenerate ) );
+
+ Iterator<MvccLogEntry> expectedEntries = mockResults.getEntries().iterator();
+
+ //this element should be skipped
+ UUID start = expectedEntries.next().getVersion();
+
+ //now iterate we should get everything
+ MinMaxLogEntryIterator
+ itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize );
+
+
+ while ( expectedEntries.hasNext() && itr.hasNext() ) {
+ final MvccLogEntry expected = expectedEntries.next();
+
+ final MvccLogEntry returned = itr.next();
+
+ assertEquals( expected, returned );
+ }
+
+
+ assertFalse( itr.hasNext() );
+ assertFalse( expectedEntries.hasNext() );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
index 107b2a0..673903c 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.persistence.collection.serialization.impl;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -43,6 +44,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import com.fasterxml.uuid.UUIDComparator;
import com.google.inject.Inject;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
@@ -65,15 +67,15 @@ public abstract class MvccLogEntrySerializationStrategyImplTest {
private MvccLogEntrySerializationStrategy logEntryStrategy;
+
@Before
- public void wireLogEntryStrategy(){
+ public void wireLogEntryStrategy() {
logEntryStrategy = getLogEntryStrategy();
}
/**
* Get the log entry strategy from
- * @return
*/
protected abstract MvccLogEntrySerializationStrategy getLogEntryStrategy();
@@ -181,6 +183,85 @@ public abstract class MvccLogEntrySerializationStrategyImplTest {
}
+ @Test
+ public void getReversedEntries() throws ConnectionException {
+
+ final Id applicationId = new SimpleId( "application" );
+
+ ApplicationScope context = new ApplicationScopeImpl( applicationId );
+
+
+ final Id id = new SimpleId( "test" );
+
+ int count = 10;
+
+ final UUID[] versions = new UUID[count];
+ final Stage COMPLETE = Stage.COMPLETE;
+ final MvccLogEntry[] entries = new MvccLogEntry[count];
+
+
+ for ( int i = 0; i < count; i++ ) {
+ versions[i] = UUIDGenerator.newTimeUUID();
+
+ entries[i] = new MvccLogEntryImpl( id, versions[i], COMPLETE, MvccLogEntry.State.COMPLETE );
+ logEntryStrategy.write( context, entries[i] ).execute();
+
+ //Read it back
+
+ MvccLogEntry returned =
+ logEntryStrategy.load( context, Collections.singleton( id ), versions[i] ).getMaxVersion( id );
+
+ assertNotNull( "Returned value should not be null", returned );
+
+ assertEquals( "Returned should equal the saved", entries[i], returned );
+ }
+
+
+ final UUID[] assertVersions = Arrays.copyOf( versions, versions.length );
+
+ Arrays.sort( assertVersions, ( v1, v2 ) -> UUIDComparator.staticCompare( v1, v2 ) * -1 );
+
+ //now do a range scan from the end
+
+ final int half = count/2;
+
+ final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, versions[0], half);
+
+ assertEquals( half, results.size() );
+
+ for ( int i = 0; i < count / 2; i++ ) {
+ final MvccLogEntry saved = entries[i];
+ final MvccLogEntry returned = results.get( i );
+
+ assertEquals( "Entry was not equal to the saved value", saved, returned );
+ }
+
+
+ //now get the next batch
+ final List<MvccLogEntry> results2 =
+ logEntryStrategy.loadReversed( context, id, versions[half], count );
+
+ assertEquals( half, results2.size());
+
+ for ( int i = 0; i < half; i++ ) {
+ final MvccLogEntry saved = entries[half + i];
+ final MvccLogEntry returned = results2.get( i );
+
+ assertEquals( "Entry was not equal to the saved value", saved, returned );
+ }
+
+
+ //now delete them all and ensure we get no results back
+ for ( int i = 0; i < count; i++ ) {
+ logEntryStrategy.delete( context, id, versions[i] ).execute();
+ }
+
+ final List<MvccLogEntry> results3 = logEntryStrategy.loadReversed( context, id, null, versions.length );
+
+ assertEquals( "All log entries were deleted", 0, results3.size() );
+ }
+
+
@Test( expected = NullPointerException.class )
public void writeParamsNoContext() throws ConnectionException {
logEntryStrategy.write( null, mock( MvccLogEntry.class ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
new file mode 100644
index 0000000..c653458
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.executor;
+
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * A task executor that allows you to submit tasks
+ */
+public class TaskExecutorFactory {
+
+
+ /**
+ * Create a task executor
+ * @param schedulerName
+ * @param maxThreadCount
+ * @param maxQueueSize
+ * @return
+ */
+ public static ThreadPoolExecutor createTaskExecutor( final String schedulerName, final int maxThreadCount,
+ final int maxQueueSize ) {
+
+
+ final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( maxQueueSize );
+
+
+ final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
+
+
+ return threadPool;
+ }
+
+
+ /**
+ * Create a thread pool that will reject work if our audit tasks become overwhelmed
+ */
+ private static final class MaxSizeThreadPool extends ThreadPoolExecutor {
+
+ public MaxSizeThreadPool( final BlockingQueue<Runnable> queue, final String poolName, final int maxPoolSize ) {
+ super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( poolName ) );
+ }
+ }
+
+
+ /**
+ * Thread factory that will name and count threads for easier debugging
+ */
+ private static final class CountingThreadFactory implements ThreadFactory {
+
+ private final AtomicLong threadCounter = new AtomicLong();
+ private final String poolName;
+
+
+ private CountingThreadFactory( final String poolName ) {this.poolName = poolName;}
+
+
+ @Override
+ public Thread newThread( final Runnable r ) {
+ final long newValue = threadCounter.incrementAndGet();
+
+ final String threadName = poolName + "-" + newValue;
+
+ Thread t = new Thread( r, threadName );
+
+ //set it to be a daemon thread so it doesn't block shutdown
+ t.setDaemon( true );
+
+ return t;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
index d6cc5e8..f28e190 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
@@ -20,8 +20,6 @@
package org.apache.usergrid.persistence.core.rx;
-import org.apache.usergrid.persistence.core.task.Task;
-
import rx.Scheduler;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
deleted file mode 100644
index 9007167..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.task;
-
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.annotation.Nullable;
-
-import org.slf4j.Logger;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-
-/**
- * Implementation of the task executor with a unique name and size
- */
-public class NamedTaskExecutorImpl implements TaskExecutor {
-
- private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class );
-
- private final ListeningExecutorService executorService;
-
- private final String name;
- private final int poolSize;
-
-
- /**
- * @param name The name of this instance of the task executor
- * @param poolSize The size of the pool. This is the number of concurrent tasks that can execute at once.
- * @param queueLength The length of tasks to keep in the queue
- */
- public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) {
- Preconditions.checkNotNull( name );
- Preconditions.checkArgument( name.length() > 0, "name must have a length" );
- Preconditions.checkArgument( poolSize > -1, "poolSize must be > than -1" );
- Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
-
- this.name = name;
- this.poolSize = poolSize;
-
- // The user has chosen to disable asynchronous execution,
- // to create an executor service that will reject all requests
- if ( poolSize == 0 ) {
- executorService = MoreExecutors.listeningDecorator( new RejectingExecutorService());
- }
-
- //queue executions as normal
- else {
- final BlockingQueue<Runnable> queue = queueLength > 0
- ? new ArrayBlockingQueue<Runnable>(queueLength) : new SynchronousQueue<Runnable>();
-
- executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
- }
- }
-
-
- @Override
- public <V> ListenableFuture<V> submit( final Task<V> task ) {
-
- final ListenableFuture<V> future;
-
- try {
- future = executorService.submit( task );
-
- // Log our success or failures for debugging purposes
- Futures.addCallback( future, new TaskFutureCallBack<V>( task ) );
- }
- catch ( RejectedExecutionException ree ) {
- return Futures.immediateFuture( task.rejected());
- }
-
- return future;
- }
-
-
- @Override
- public void shutdown() {
- this.executorService.shutdownNow();
- }
-
-
- /**
- * Callback for when the task succeeds or fails.
- */
- private static final class TaskFutureCallBack<V> implements FutureCallback<V> {
-
- private final Task<V> task;
-
-
- private TaskFutureCallBack( Task<V> task ) {
- this.task = task;
- }
-
-
- @Override
- public void onSuccess( @Nullable final V result ) {
- LOG.debug( "Successfully completed task ", task );
- }
-
-
- @Override
- public void onFailure( final Throwable t ) {
- LOG.error( "Unable to execute task. Exception is ", t );
-
- task.exceptionThrown( t );
- }
- }
-
-
- /**
- * Create a thread pool that will reject work if our audit tasks become overwhelmed
- */
- private final class MaxSizeThreadPool extends ThreadPoolExecutor {
-
- public MaxSizeThreadPool( BlockingQueue<Runnable> queue ) {
-
- super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ),
- new RejectedHandler() );
- }
- }
-
-
- /**
- * Thread factory that will name and count threads for easier debugging
- */
- private final class CountingThreadFactory implements ThreadFactory {
-
- private final AtomicLong threadCounter = new AtomicLong();
-
-
- @Override
- public Thread newThread( final Runnable r ) {
- final long newValue = threadCounter.incrementAndGet();
-
- Thread t = new Thread( r, name + "-" + newValue );
-
- t.setDaemon( true );
-
- return t;
- }
- }
-
-
- /**
- * The handler that will handle rejected executions and signal the interface
- */
- private final class RejectedHandler implements RejectedExecutionHandler {
-
-
- @Override
- public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
- LOG.warn( "{} task queue full, rejecting task {}", name, r );
-
- throw new RejectedExecutionException( "Unable to run task, queue full" );
- }
-
- }
-
-
- /**
- * Executor implementation that simply rejects all incoming tasks
- */
- private static final class RejectingExecutorService implements ExecutorService{
-
- @Override
- public void shutdown() {
-
- }
-
-
- @Override
- public List<Runnable> shutdownNow() {
- return Collections.EMPTY_LIST;
- }
-
-
- @Override
- public boolean isShutdown() {
- return false;
- }
-
-
- @Override
- public boolean isTerminated() {
- return false;
- }
-
-
- @Override
- public boolean awaitTermination( final long timeout, final TimeUnit unit )
- throws InterruptedException {
- return false;
- }
-
-
- @Override
- public <T> Future<T> submit( final Callable<T> task ) {
- throw new RejectedExecutionException("No Asynchronous tasks allowed");
- }
-
-
- @Override
- public <T> Future<T> submit( final Runnable task, final T result ) {
- throw new RejectedExecutionException("No Asynchronous tasks allowed");
- }
-
-
- @Override
- public Future<?> submit( final Runnable task ) {
- throw new RejectedExecutionException("No Asynchronous tasks allowed");
- }
-
-
- @Override
- public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks )
- throws InterruptedException {
- throw new RejectedExecutionException("No Asynchronous tasks allowed");
- }
-
-
- @Override
- public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks,
- final long timeout, final TimeUnit unit ) throws InterruptedException {
-
- throw new RejectedExecutionException("No Asynchronous tasks allowed");
- }
-
-
- @Override
- public <T> T invokeAny( final Collection<? extends Callable<T>> tasks )
- throws InterruptedException, ExecutionException {
- throw new RejectedExecutionException("No Asynchronous tasks allowed");
- }
-
-
- @Override
- public <T> T invokeAny( final Collection<? extends Callable<T>> tasks, final long timeout,
- final TimeUnit unit ) throws InterruptedException, ExecutionException, TimeoutException {
- throw new RejectedExecutionException("No Asynchronous tasks allowed");
- }
-
-
- @Override
- public void execute( final Runnable command ) {
- throw new RejectedExecutionException("No Asynchronous tasks allowed");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
deleted file mode 100644
index 5582161..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.task;
-
-
-import java.util.concurrent.Callable;
-
-
-/**
- * The task to execute
- */
-public interface Task<V> extends Callable<V> {
-
-
- /**
- * Invoked when this task throws an uncaught exception.
- * @param throwable
- */
- void exceptionThrown(final Throwable throwable);
-
- /**
- * Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
- * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
- * task in the scheduling thread. Note that this has performance implications to the user. If you can drop the
- * request and process later (lazy repair for instance ) do so.
- *
- */
- V rejected();
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
deleted file mode 100644
index 5728d2e..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.task;
-
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-
-/**
- * An interface for execution of tasks
- */
-public interface TaskExecutor {
-
- /**
- * Submit the task asynchronously
- * @param task
- */
- public <V> ListenableFuture<V> submit( Task<V> task );
-
- /**
- * Stop the task executor without waiting for scheduled threads to run
- */
- public void shutdown();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
deleted file mode 100644
index 4f95918..0000000
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.task;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
-
-/**
- * Tests for the namedtask execution impl
- */
-public class NamedTaskExecutorImplTest {
-
-
- @Test
- public void jobSuccess() throws InterruptedException {
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
-
- final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
- final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
- final CountDownLatch runLatch = new CountDownLatch( 1 );
-
- final Task<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
-
- executor.submit( task );
-
-
- runLatch.await( 1000, TimeUnit.MILLISECONDS );
-
- assertEquals( 0l, exceptionLatch.getCount() );
-
- assertEquals( 0l, rejectedLatch.getCount() );
- }
-
-
- @Test
- public void exceptionThrown() throws InterruptedException {
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
-
- final CountDownLatch exceptionLatch = new CountDownLatch( 1 );
- final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
- final CountDownLatch runLatch = new CountDownLatch( 1 );
-
- final RuntimeException re = new RuntimeException( "throwing exception" );
-
- final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
- @Override
- public Void call() throws Exception {
- super.call();
- throw re;
- }
- };
-
- executor.submit( task );
-
-
- runLatch.await( 1000, TimeUnit.MILLISECONDS );
- exceptionLatch.await( 1000, TimeUnit.MILLISECONDS );
-
- assertSame( re, task.exceptions.get( 0 ) );
-
-
- assertEquals( 0l, rejectedLatch.getCount() );
- }
-
-
- @Test
- public void noCapacity() throws InterruptedException {
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
-
- final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
- final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
- final CountDownLatch runLatch = new CountDownLatch( 1 );
-
-
- final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
- @Override
- public Void call() throws Exception {
- super.call();
-
- //park this thread so it takes up a task and the next is rejected
- final Object mutex = new Object();
-
- synchronized ( mutex ) {
- mutex.wait();
- }
-
- return null;
- }
- };
-
- executor.submit( task );
-
-
- runLatch.await( 1000, TimeUnit.MILLISECONDS );
-
- //now submit the second task
-
-
- final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
- final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
- final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
-
-
- final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
-
- executor.submit( testTask );
-
-
- secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
-
- //if we get here we've been rejected, just double check we didn't run
-
- assertEquals( 1l, secondRunLatch.getCount() );
- assertEquals( 0l, secondExceptionLatch.getCount() );
- }
-
-
- @Test
- public void noCapacityWithQueue() throws InterruptedException {
-
- final int threadPoolSize = 1;
- final int queueSize = 10;
-
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
-
- final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
- final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
- final CountDownLatch runLatch = new CountDownLatch( 1 );
-
- int iterations = threadPoolSize + queueSize;
-
- for(int i = 0; i < iterations; i ++) {
-
- final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
- @Override
- public Void call() throws Exception {
- super.call();
-
- //park this thread so it takes up a task and the next is rejected
- final Object mutex = new Object();
-
- synchronized ( mutex ) {
- mutex.wait();
- }
-
- return null;
- }
- };
- executor.submit( task );
- }
-
-
-
- runLatch.await( 1000, TimeUnit.MILLISECONDS );
-
- //now submit the second task
-
-
- final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
- final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
- final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
-
-
- final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
-
- executor.submit( testTask );
-
-
- secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
-
- //if we get here we've been rejected, just double check we didn't run
-
- assertEquals( 1l, secondRunLatch.getCount() );
- assertEquals( 0l, secondExceptionLatch.getCount() );
- }
-
-
- @Test
- public void rejectingTaskExecutor() throws InterruptedException {
-
- final int threadPoolSize = 0;
- final int queueSize = 0;
-
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
-
- final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
- final CountDownLatch rejectedLatch = new CountDownLatch( 1 );
- final CountDownLatch runLatch = new CountDownLatch( 0 );
-
-
- //now submit the second task
-
-
-
- final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
-
- executor.submit( testTask );
-
-
- //should be immediately rejected
- rejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
-
- //if we get here we've been rejected, just double check we didn't run
-
- assertEquals( 0l, exceptionLatch.getCount() );
- assertEquals( 0l, runLatch.getCount() );
- }
-
-
- private static abstract class TestTask<V> implements Task<V> {
-
- private final List<Throwable> exceptions;
- private final CountDownLatch exceptionLatch;
- private final CountDownLatch rejectedLatch;
- private final CountDownLatch runLatch;
-
-
- private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
- final CountDownLatch runLatch ) {
- this.exceptionLatch = exceptionLatch;
- this.rejectedLatch = rejectedLatch;
- this.runLatch = runLatch;
-
- this.exceptions = new ArrayList<>();
- }
-
-
-
- @Override
- public void exceptionThrown( final Throwable throwable ) {
- this.exceptions.add( throwable );
- exceptionLatch.countDown();
- }
-
-
- @Override
- public V rejected() {
- rejectedLatch.countDown();
- return null;
- }
-
-
- @Override
- public V call() throws Exception {
- runLatch.countDown();
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
index 4c38c13..987a36c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
@@ -64,23 +64,23 @@ public interface GraphManager extends CPManager {
/**
- * @param edge The edge to delete
+ * @param edge Mark the edge as deleted in the graph
*
*
* EdgeDelete the edge. Implementation should also delete the incoming (reversed) edge. Only deletes the specific version
*/
- Observable<Edge> deleteEdge( Edge edge );
+ Observable<Edge> markEdge( Edge edge );
/**
*
- * Remove the node from the graph.
+ * Mark the node as removed from the graph.
*
* @param node The node to remove
* @param timestamp The timestamp to apply the delete operation. Any edges connected to this node with a timestmap
* <= the specified time will be removed from the graph
* @return
*/
- Observable<Id> deleteNode(Id node, long timestamp);
+ Observable<Id> markNode( Id node, long timestamp );
/**
* Get all versions of this edge where versions <= max version
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
index eb49711..d73f767 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
@@ -36,7 +36,7 @@ public interface GraphManagerFactory
*
* @param collectionScope The context to use when creating the graph manager
*/
- public GraphManager createEdgeManager( ApplicationScope collectionScope );
+ GraphManager createEdgeManager( ApplicationScope collectionScope );
void invalidate();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
index 114440f..38f62b5 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
@@ -59,6 +59,12 @@ public interface SearchByEdge {
long getMaxTimestamp();
/**
+ * Return true if we should filter edges marked for deletion
+ * @return
+ */
+ boolean filterMarked();
+
+ /**
* The optional start parameter. All edges emitted with be > the specified start edge.
* This is useful for paging. Simply use the last value returned in the previous call in the start parameter
* @return
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
index 5e47ae0..f213b00 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
@@ -65,6 +65,12 @@ public interface SearchByEdgeType {
*/
Order getOrder();
+ /**
+ * Return true to filter marked edges from the results
+ * @return
+ */
+ boolean filterMarked();
+
/**
* Options for ordering. By default, we want to perform descending for common use cases and read speed. This is our our data
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 6cdaef0..4b628d1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -28,8 +28,6 @@ import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
import org.apache.usergrid.persistence.core.migration.data.MigrationRelationship;
import org.apache.usergrid.persistence.core.migration.data.VersionedMigrationSet;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
-import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteListener;
@@ -196,15 +194,6 @@ public abstract class GraphModule extends AbstractModule {
}
- @Inject
- @Singleton
- @Provides
- @GraphTaskExecutor
- public TaskExecutor graphTaskExecutor( final GraphFig graphFig ) {
- return new NamedTaskExecutorImpl( "graphTaskExecutor", graphFig.getShardAuditWorkerCount(),
- graphFig.getShardAuditWorkerQueueSize() );
- }
-
/**