You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/11 19:37:16 UTC

[02/14] incubator-usergrid git commit: First pass at refactoring mark + sweep

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
index 107b2a0..9ed254c 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.persistence.collection.serialization.impl;
 
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -43,6 +45,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import com.fasterxml.uuid.UUIDComparator;
 import com.google.inject.Inject;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
@@ -65,15 +68,15 @@ public abstract class MvccLogEntrySerializationStrategyImplTest {
 
     private MvccLogEntrySerializationStrategy logEntryStrategy;
 
+
     @Before
-    public void wireLogEntryStrategy(){
+    public void wireLogEntryStrategy() {
         logEntryStrategy = getLogEntryStrategy();
     }
 
 
     /**
      * Get the log entry strategy from
-     * @return
      */
     protected abstract MvccLogEntrySerializationStrategy getLogEntryStrategy();
 
@@ -181,6 +184,131 @@ public abstract class MvccLogEntrySerializationStrategyImplTest {
     }
 
 
+    @Test
+    public void getReversedEntries() throws ConnectionException {
+
+        final Id applicationId = new SimpleId( "application" );
+
+        ApplicationScope context = new ApplicationScopeImpl( applicationId );
+
+
+        final Id id = new SimpleId( "test" );
+
+        int count = 10;
+
+        final UUID[] versions = new UUID[count];
+        final Stage COMPLETE = Stage.COMPLETE;
+        final MvccLogEntry[] entries = new MvccLogEntry[count];
+
+
+        for ( int i = 0; i < count; i++ ) {
+            versions[i] = UUIDGenerator.newTimeUUID();
+
+            entries[i] = new MvccLogEntryImpl( id, versions[i], COMPLETE, MvccLogEntry.State.COMPLETE );
+            logEntryStrategy.write( context, entries[i] ).execute();
+
+            //Read it back
+
+            MvccLogEntry returned =
+                logEntryStrategy.load( context, Collections.singleton( id ), versions[i] ).getMaxVersion( id );
+
+            assertNotNull( "Returned value should not be null", returned );
+
+            assertEquals( "Returned should equal the saved", entries[i], returned );
+        }
+
+
+        final UUID[] assertVersions = Arrays.copyOf( versions, versions.length );
+
+        Arrays.sort( assertVersions, ( v1, v2 ) -> UUIDComparator.staticCompare( v1, v2 ) * -1 );
+
+        //now do a range scan from the end
+
+        final int half = count / 2;
+
+        final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, versions[0], half );
+
+        assertEquals( half, results.size() );
+
+        for ( int i = 0; i < count / 2; i++ ) {
+            final MvccLogEntry saved = entries[i];
+            final MvccLogEntry returned = results.get( i );
+
+            assertEquals( "Entry was not equal to the saved value", saved, returned );
+        }
+
+
+        //now get the next batch
+        final List<MvccLogEntry> results2 = logEntryStrategy.loadReversed( context, id, versions[half], count );
+
+        assertEquals( half, results2.size() );
+
+        for ( int i = 0; i < half; i++ ) {
+            final MvccLogEntry saved = entries[half + i];
+            final MvccLogEntry returned = results2.get( i );
+
+            assertEquals( "Entry was not equal to the saved value", saved, returned );
+        }
+
+
+        //now delete them all and ensure we get no results back
+        for ( int i = 0; i < count; i++ ) {
+            logEntryStrategy.delete( context, id, versions[i] ).execute();
+        }
+
+        final List<MvccLogEntry> results3 = logEntryStrategy.loadReversed( context, id, null, versions.length );
+
+        assertEquals( "All log entries were deleted", 0, results3.size() );
+    }
+
+
+    @Test
+    public void createAndDeleteEntries() throws ConnectionException {
+
+        final Id applicationId = new SimpleId( "application" );
+
+        ApplicationScope context = new ApplicationScopeImpl( applicationId );
+
+
+        final Id id = new SimpleId( "test" );
+
+
+        final int size = 10;
+
+        final List<MvccLogEntry> savedEntries = new ArrayList<>( size );
+
+        for ( int i = 0; i < size; i++ ) {
+            final UUID version = UUIDGenerator.newTimeUUID();
+            MvccLogEntry saved = new MvccLogEntryImpl( id, version, Stage.COMMITTED, MvccLogEntry.State.COMPLETE );
+            logEntryStrategy.write( context, saved ).execute();
+
+            savedEntries.add( saved );
+        }
+
+        //now test we get them all back
+
+        final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, null, size );
+
+        assertEquals( size, results.size() );
+
+        //assert they're the same
+        for ( int i = 0; i < size; i++ ) {
+            assertEquals( savedEntries.get( i ), results.get( i ) );
+        }
+
+        //now delete them all
+
+        for ( final MvccLogEntry mvccLogEntry : savedEntries ) {
+            logEntryStrategy.delete( context, id, mvccLogEntry.getVersion() ).execute();
+        }
+
+        //now get them back, should be empty
+        final List<MvccLogEntry> emptyResults = logEntryStrategy.loadReversed( context, id, null, size );
+
+        assertEquals( 0, emptyResults.size() );
+    }
+
+
     @Test( expected = NullPointerException.class )
     public void writeParamsNoContext() throws ConnectionException {
         logEntryStrategy.write( null, mock( MvccLogEntry.class ) );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
index 93de9d4..cd6ad3d 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
@@ -20,15 +20,11 @@ package org.apache.usergrid.persistence.collection.util;/*
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.TreeMap;
 import java.util.UUID;
 
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
@@ -50,7 +46,11 @@ import static org.mockito.Mockito.when;
 public class LogEntryMock {
 
 
-    private final TreeMap<UUID, MvccLogEntry> entries = new TreeMap<>(ReversedUUIDComparator.INSTANCE);
+    private final TreeMap<UUID, MvccLogEntry> reversedEntries =
+        new TreeMap<>( ( o1, o2 ) -> UUIDComparator.staticCompare( o1, o2 ) * -1 );
+
+    private final TreeMap<UUID, MvccLogEntry> entries =
+        new TreeMap<>( ( o1, o2 ) -> UUIDComparator.staticCompare( o1, o2 ) );
 
     private final Id entityId;
 
@@ -61,78 +61,92 @@ public class LogEntryMock {
      * @param entityId The entity Id to use
      * @param versions The versions to use
      */
-    private LogEntryMock(final Id entityId, final List<UUID> versions ) {
+    private LogEntryMock( final Id entityId, final List<UUID> versions ) {
 
         this.entityId = entityId;
 
-        for ( UUID version: versions) {
-            entries.put( version, new MvccLogEntryImpl( entityId, version, Stage.ACTIVE, MvccLogEntry.State.COMPLETE ) );
+        for ( UUID version : versions ) {
+            final MvccLogEntry mvccLogEntry =
+                new MvccLogEntryImpl( entityId, version, Stage.ACTIVE, MvccLogEntry.State.COMPLETE );
+            reversedEntries.put( version, mvccLogEntry );
+            entries.put( version, mvccLogEntry );
         }
     }
 
 
     /**
      * Init the mock with the given data structure
+     *
      * @param logEntrySerializationStrategy The strategy to moc
-     * @param scope
-     * @throws ConnectionException
      */
-    private void initMock(  final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final ApplicationScope scope )
+    private void initMock( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                           final ApplicationScope scope )
 
-            throws ConnectionException {
+        throws ConnectionException {
 
         //wire up the mocks
-        when(logEntrySerializationStrategy.load( same( scope ), same( entityId ), any(UUID.class), any(Integer.class)  )).thenAnswer( new Answer<List<MvccLogEntry>>() {
-
+        when( logEntrySerializationStrategy
+            .load( same( scope ), same( entityId ), any( UUID.class ), any( Integer.class ) ) ).thenAnswer(
 
-            @Override
-            public List<MvccLogEntry> answer( final InvocationOnMock invocation ) throws Throwable {
+            invocation -> {
                 final UUID startVersion = ( UUID ) invocation.getArguments()[2];
-                final int count = (Integer)invocation.getArguments()[3];
+                final int count = ( Integer ) invocation.getArguments()[3];
 
                 final List<MvccLogEntry> results = new ArrayList<>( count );
 
-                final Iterator<MvccLogEntry> itr = entries.tailMap( startVersion, true ).values().iterator();
+                final Iterator<MvccLogEntry> itr = reversedEntries.tailMap( startVersion, true ).values().iterator();
 
-                for(int i = 0; i < count && itr.hasNext(); i ++){
+                for ( int i = 0; i < count && itr.hasNext(); i++ ) {
                     results.add( itr.next() );
                 }
 
 
                 return results;
-            }
-        } );
-    }
+            } );
 
 
-    /**
-     * Get the entry at the specified index from high to low
-     * @param index
-     * @return
-     */
-    public MvccLogEntry getEntryAtIndex(final int index){
+        //mock in reverse
 
-        final Iterator<MvccLogEntry> itr = entries.values().iterator();
+        when( logEntrySerializationStrategy
+            .loadReversed( same( scope ), same( entityId ), any( UUID.class ), any( Integer.class ) ) ).thenAnswer(
 
-        for(int i = 0; i < index; i ++){
-           itr.next();
-        }
+            invocation -> {
+                final UUID startVersion = ( UUID ) invocation.getArguments()[2];
+                final int count = ( Integer ) invocation.getArguments()[3];
+
+
+                final List<MvccLogEntry> results = new ArrayList<>( count );
+
+                final Iterator<MvccLogEntry> itr;
+
+                if ( startVersion == null ) {
+                    itr = entries.values().iterator();
+                }
+                else {
+                    itr = entries.tailMap( startVersion, true ).values().iterator();
+                }
+
+                for ( int i = 0; i < count && itr.hasNext(); i++ ) {
+                    results.add( itr.next() );
+                }
 
-        return itr.next();
+
+                return results;
+            } );
     }
 
 
     /**
-     *
      * @param logEntrySerializationStrategy The mock to use
      * @param scope The scope to use
      * @param entityId The entityId to use
      * @param versions The versions to mock
-     * @throws ConnectionException
      */
-    public static LogEntryMock createLogEntryMock(final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final  ApplicationScope scope,final Id entityId, final List<UUID> versions )
+    public static LogEntryMock createLogEntryMock(
+        final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final ApplicationScope scope,
+        final Id entityId, final List<UUID> versions )
 
-            throws ConnectionException {
+        throws ConnectionException {
 
         LogEntryMock mock = new LogEntryMock( entityId, versions );
         mock.initMock( logEntrySerializationStrategy, scope );
@@ -141,19 +155,12 @@ public class LogEntryMock {
     }
 
 
-    public Collection<MvccLogEntry> getEntries() {
-        return entries.values();
+    public Collection<MvccLogEntry> getReversedEntries() {
+        return reversedEntries.values();
     }
 
 
-    private static final class ReversedUUIDComparator implements Comparator<UUID> {
-
-        public static final ReversedUUIDComparator INSTANCE = new ReversedUUIDComparator();
-
-
-        @Override
-        public int compare( final UUID o1, final UUID o2 ) {
-            return UUIDComparator.staticCompare( o1, o2 ) * -1;
-        }
+    public Collection<MvccLogEntry> getEntries() {
+        return entries.values();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/resources/log4j.properties b/stack/corepersistence/collection/src/test/resources/log4j.properties
index acf5c39..7b55cf8 100644
--- a/stack/corepersistence/collection/src/test/resources/log4j.properties
+++ b/stack/corepersistence/collection/src/test/resources/log4j.properties
@@ -33,4 +33,5 @@ log4j.logger.cassandra.db=ERROR
 
 #log4j.logger.org.apache.usergrid=DEBUG
 #log4j.logger.org.apache.usergrid.persistence.collection=TRACE
+log4j.logger.org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact=TRACE
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
new file mode 100644
index 0000000..c653458
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.executor;
+
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * A task executor that allows you to submit tasks
+ */
+public class TaskExecutorFactory {
+
+
+    /**
+     * Create a task executor
+     * @param schedulerName
+     * @param maxThreadCount
+     * @param maxQueueSize
+     * @return
+     */
+    public static ThreadPoolExecutor createTaskExecutor( final String schedulerName, final int maxThreadCount,
+                                                         final int maxQueueSize ) {
+
+
+        final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( maxQueueSize );
+
+
+        final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
+
+
+        return threadPool;
+    }
+
+
+    /**
+     * Create a thread pool that will reject work if our audit tasks become overwhelmed
+     */
+    private static final class MaxSizeThreadPool extends ThreadPoolExecutor {
+
+        public MaxSizeThreadPool( final BlockingQueue<Runnable> queue, final String poolName, final int maxPoolSize ) {
+            super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( poolName ) );
+        }
+    }
+
+
+    /**
+     * Thread factory that will name and count threads for easier debugging
+     */
+    private static final class CountingThreadFactory implements ThreadFactory {
+
+        private final AtomicLong threadCounter = new AtomicLong();
+        private final String poolName;
+
+
+        private CountingThreadFactory( final String poolName ) {this.poolName = poolName;}
+
+
+        @Override
+        public Thread newThread( final Runnable r ) {
+            final long newValue = threadCounter.incrementAndGet();
+
+            final String threadName = poolName + "-" + newValue;
+
+            Thread t = new Thread( r, threadName );
+
+            //set it to be a daemon thread so it doesn't block shutdown
+            t.setDaemon( true );
+
+            return t;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
index d6cc5e8..f28e190 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
@@ -20,8 +20,6 @@
 package org.apache.usergrid.persistence.core.rx;
 
 
-import org.apache.usergrid.persistence.core.task.Task;
-
 import rx.Scheduler;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
deleted file mode 100644
index 9007167..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.task;
-
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.annotation.Nullable;
-
-import org.slf4j.Logger;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-
-/**
- * Implementation of the task executor with a unique name and size
- */
-public class NamedTaskExecutorImpl implements TaskExecutor {
-
-    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class );
-
-    private final ListeningExecutorService executorService;
-
-    private final String name;
-    private final int poolSize;
-
-
-    /**
-     * @param name The name of this instance of the task executor
-     * @param poolSize The size of the pool.  This is the number of concurrent tasks that can execute at once.
-     * @param queueLength The length of tasks to keep in the queue
-     */
-    public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) {
-        Preconditions.checkNotNull( name );
-        Preconditions.checkArgument( name.length() > 0, "name must have a length" );
-        Preconditions.checkArgument( poolSize > -1, "poolSize must be > than -1" );
-        Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
-
-        this.name = name;
-        this.poolSize = poolSize;
-
-        // The user has chosen to disable asynchronous execution, 
-        // to create an executor service that will reject all requests
-        if ( poolSize == 0 ) {
-            executorService = MoreExecutors.listeningDecorator( new RejectingExecutorService());
-        }
-
-        //queue executions as normal
-        else {
-            final BlockingQueue<Runnable> queue = queueLength > 0 
-                ? new ArrayBlockingQueue<Runnable>(queueLength) : new SynchronousQueue<Runnable>();
-
-            executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
-        }
-    }
-
-
-    @Override
-    public <V> ListenableFuture<V> submit( final Task<V> task ) {
-
-        final ListenableFuture<V> future;
-
-        try {
-            future = executorService.submit( task );
-
-            // Log our success or failures for debugging purposes
-            Futures.addCallback( future, new TaskFutureCallBack<V>( task ) );
-        }
-        catch ( RejectedExecutionException ree ) {
-            return Futures.immediateFuture( task.rejected());
-        }
-
-        return future;
-    }
-
-
-    @Override
-    public void shutdown() {
-        this.executorService.shutdownNow();
-    }
-
-
-    /**
-     * Callback for when the task succeeds or fails.
-     */
-    private static final class TaskFutureCallBack<V> implements FutureCallback<V> {
-
-        private final Task<V> task;
-
-
-        private TaskFutureCallBack( Task<V> task ) {
-            this.task = task;
-        }
-
-
-        @Override
-        public void onSuccess( @Nullable final V result ) {
-            LOG.debug( "Successfully completed task ", task );
-        }
-
-
-        @Override
-        public void onFailure( final Throwable t ) {
-            LOG.error( "Unable to execute task.  Exception is ", t );
-
-            task.exceptionThrown( t );
-        }
-    }
-
-
-    /**
-     * Create a thread pool that will reject work if our audit tasks become overwhelmed
-     */
-    private final class MaxSizeThreadPool extends ThreadPoolExecutor {
-
-        public MaxSizeThreadPool( BlockingQueue<Runnable> queue ) {
-
-            super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ),
-                    new RejectedHandler() );
-        }
-    }
-
-
-    /**
-     * Thread factory that will name and count threads for easier debugging
-     */
-    private final class CountingThreadFactory implements ThreadFactory {
-
-        private final AtomicLong threadCounter = new AtomicLong();
-
-
-        @Override
-        public Thread newThread( final Runnable r ) {
-            final long newValue = threadCounter.incrementAndGet();
-
-            Thread t = new Thread( r, name + "-" + newValue );
-
-            t.setDaemon( true );
-
-            return t;
-        }
-    }
-
-
-    /**
-     * The handler that will handle rejected executions and signal the interface
-     */
-    private final class RejectedHandler implements RejectedExecutionHandler {
-
-
-        @Override
-        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
-            LOG.warn( "{} task queue full, rejecting task {}", name, r );
-
-            throw new RejectedExecutionException( "Unable to run task, queue full" );
-        }
-
-    }
-
-
-    /**
-     * Executor implementation that simply rejects all incoming tasks
-     */
-    private static final class RejectingExecutorService implements ExecutorService{
-
-        @Override
-        public void shutdown() {
-
-        }
-
-
-        @Override
-        public List<Runnable> shutdownNow() {
-            return Collections.EMPTY_LIST;
-        }
-
-
-        @Override
-        public boolean isShutdown() {
-            return false;
-        }
-
-
-        @Override
-        public boolean isTerminated() {
-            return false;
-        }
-
-
-        @Override
-        public boolean awaitTermination( final long timeout, final TimeUnit unit ) 
-            throws InterruptedException {
-            return false;
-        }
-
-
-        @Override
-        public <T> Future<T> submit( final Callable<T> task ) {
-            throw new RejectedExecutionException("No Asynchronous tasks allowed");
-        }
-
-
-        @Override
-        public <T> Future<T> submit( final Runnable task, final T result ) {
-            throw new RejectedExecutionException("No Asynchronous tasks allowed");
-        }
-
-
-        @Override
-        public Future<?> submit( final Runnable task ) {
-            throw new RejectedExecutionException("No Asynchronous tasks allowed");
-        }
-
-
-        @Override
-        public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks )
-                throws InterruptedException {
-            throw new RejectedExecutionException("No Asynchronous tasks allowed");
-        }
-
-
-        @Override
-        public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks, 
-                final long timeout, final TimeUnit unit ) throws InterruptedException {
-
-            throw new RejectedExecutionException("No Asynchronous tasks allowed");
-        }
-
-
-        @Override
-        public <T> T invokeAny( final Collection<? extends Callable<T>> tasks )
-            throws InterruptedException, ExecutionException {
-            throw new RejectedExecutionException("No Asynchronous tasks allowed");
-        }
-
-
-        @Override
-        public <T> T invokeAny( final Collection<? extends Callable<T>> tasks, final long timeout, 
-            final TimeUnit unit ) throws InterruptedException, ExecutionException, TimeoutException {
-            throw new RejectedExecutionException("No Asynchronous tasks allowed");
-        }
-
-
-        @Override
-        public void execute( final Runnable command ) {
-            throw new RejectedExecutionException("No Asynchronous tasks allowed");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
deleted file mode 100644
index 5582161..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.task;
-
-
-import java.util.concurrent.Callable;
-
-
-/**
- * The task to execute
- */
-public interface Task<V> extends Callable<V> {
-
-
-    /**
-     * Invoked when this task throws an uncaught exception.
-     * @param throwable
-     */
-    void exceptionThrown(final Throwable throwable);
-
-    /**
-     * Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
-     * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
-     * task in the scheduling thread.  Note that this has performance implications to the user.  If you can drop the
-     * request and process later (lazy repair for instance ) do so.
-     *
-     */
-    V rejected();
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
deleted file mode 100644
index 5728d2e..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.task;
-
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-
-/**
- * An interface for execution of tasks
- */
-public interface  TaskExecutor {
-
-    /**
-     * Submit the task asynchronously
-     * @param task
-     */
-    public <V> ListenableFuture<V> submit( Task<V> task );
-
-    /**
-     * Stop the task executor without waiting for scheduled threads to run
-     */
-    public void shutdown();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
deleted file mode 100644
index 4f95918..0000000
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.task;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
-
-/**
- * Tests for the namedtask execution impl
- */
-public class NamedTaskExecutorImplTest {
-
-
-    @Test
-    public void jobSuccess() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
-
-        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
-        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
-        final CountDownLatch runLatch = new CountDownLatch( 1 );
-
-        final Task<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
-
-        executor.submit( task );
-
-
-        runLatch.await( 1000, TimeUnit.MILLISECONDS );
-
-        assertEquals( 0l, exceptionLatch.getCount() );
-
-        assertEquals( 0l, rejectedLatch.getCount() );
-    }
-
-
-    @Test
-    public void exceptionThrown() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
-
-        final CountDownLatch exceptionLatch = new CountDownLatch( 1 );
-        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
-        final CountDownLatch runLatch = new CountDownLatch( 1 );
-
-        final RuntimeException re = new RuntimeException( "throwing exception" );
-
-        final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
-            @Override
-            public Void call() throws Exception {
-                super.call();
-                throw re;
-            }
-        };
-
-        executor.submit( task );
-
-
-        runLatch.await( 1000, TimeUnit.MILLISECONDS );
-        exceptionLatch.await( 1000, TimeUnit.MILLISECONDS );
-
-        assertSame( re, task.exceptions.get( 0 ) );
-
-
-        assertEquals( 0l, rejectedLatch.getCount() );
-    }
-
-
-    @Test
-    public void noCapacity() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
-
-        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
-        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
-        final CountDownLatch runLatch = new CountDownLatch( 1 );
-
-
-        final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
-            @Override
-            public Void call() throws Exception {
-                super.call();
-
-                //park this thread so it takes up a task and the next is rejected
-                final Object mutex = new Object();
-
-                synchronized ( mutex ) {
-                    mutex.wait();
-                }
-
-                return null;
-            }
-        };
-
-        executor.submit( task );
-
-
-        runLatch.await( 1000, TimeUnit.MILLISECONDS );
-
-        //now submit the second task
-
-
-        final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
-        final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
-        final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
-
-
-        final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
-
-        executor.submit( testTask );
-
-
-        secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
-
-        //if we get here we've been rejected, just double check we didn't run
-
-        assertEquals( 1l, secondRunLatch.getCount() );
-        assertEquals( 0l, secondExceptionLatch.getCount() );
-    }
-
-
-    @Test
-    public void noCapacityWithQueue() throws InterruptedException {
-
-        final int threadPoolSize = 1;
-        final int queueSize = 10;
-
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
-
-        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
-        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
-        final CountDownLatch runLatch = new CountDownLatch( 1 );
-
-        int iterations = threadPoolSize + queueSize;
-
-        for(int i = 0; i < iterations; i ++) {
-
-            final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
-                @Override
-                public Void call() throws Exception {
-                    super.call();
-
-                    //park this thread so it takes up a task and the next is rejected
-                    final Object mutex = new Object();
-
-                    synchronized ( mutex ) {
-                        mutex.wait();
-                    }
-
-                    return null;
-                }
-            };
-            executor.submit( task );
-        }
-
-
-
-        runLatch.await( 1000, TimeUnit.MILLISECONDS );
-
-        //now submit the second task
-
-
-        final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
-        final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
-        final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
-
-
-        final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
-
-        executor.submit( testTask );
-
-
-        secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
-
-        //if we get here we've been rejected, just double check we didn't run
-
-        assertEquals( 1l, secondRunLatch.getCount() );
-        assertEquals( 0l, secondExceptionLatch.getCount() );
-    }
-
-
-    @Test
-    public void rejectingTaskExecutor() throws InterruptedException {
-
-        final int threadPoolSize = 0;
-        final int queueSize = 0;
-
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
-
-        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
-        final CountDownLatch rejectedLatch = new CountDownLatch( 1 );
-        final CountDownLatch runLatch = new CountDownLatch( 0 );
-
-
-        //now submit the second task
-
-
-
-        final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
-
-        executor.submit( testTask );
-
-
-        //should be immediately rejected
-        rejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
-
-        //if we get here we've been rejected, just double check we didn't run
-
-        assertEquals( 0l, exceptionLatch.getCount() );
-        assertEquals( 0l, runLatch.getCount() );
-    }
-
-
-    private static abstract class TestTask<V> implements Task<V> {
-
-        private final List<Throwable> exceptions;
-        private final CountDownLatch exceptionLatch;
-        private final CountDownLatch rejectedLatch;
-        private final CountDownLatch runLatch;
-
-
-        private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
-                          final CountDownLatch runLatch ) {
-            this.exceptionLatch = exceptionLatch;
-            this.rejectedLatch = rejectedLatch;
-            this.runLatch = runLatch;
-
-            this.exceptions = new ArrayList<>();
-        }
-
-
-
-        @Override
-        public void exceptionThrown( final Throwable throwable ) {
-            this.exceptions.add( throwable );
-            exceptionLatch.countDown();
-        }
-
-
-        @Override
-        public V rejected() {
-            rejectedLatch.countDown();
-            return null;
-        }
-
-
-        @Override
-        public V call() throws Exception {
-            runLatch.countDown();
-            return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 894e74a..2a153e2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -31,46 +31,46 @@ import org.safehaus.guicyfig.Key;
 public interface GraphFig extends GuicyFig {
 
 
-    public static final String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size";
+    String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size";
 
-    public static final String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size";
+    String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size";
 
     /**
      * The size of the shards.  This is approximate, and should be set lower than what you would like your max to be
      */
-    public static final String SHARD_SIZE = "usergrid.graph.shard.size";
+    String SHARD_SIZE = "usergrid.graph.shard.size";
 
 
     /**
      * Number of shards we can cache.
      */
-    public static final String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size";
+    String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size";
 
 
     /**
      * Get the cache timeout.  The local cache will exist for this amount of time max (in millis).
      */
-    public static final String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
+    String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
 
     /**
      * Number of worker threads to refresh the cache
      */
-    public static final String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count";
+    String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count";
 
 
     /**
      * The size of the worker count for shard auditing
      */
-    public static final String SHARD_AUDIT_QUEUE_SIZE = "usergrid.graph.shard.audit.worker.queue.size";
+    String SHARD_AUDIT_QUEUE_SIZE = "usergrid.graph.shard.audit.worker.queue.size";
 
 
     /**
      * The size of the worker count for shard auditing
      */
-    public static final String SHARD_AUDIT_WORKERS = "usergrid.graph.shard.audit.worker.count";
+    String SHARD_AUDIT_WORKERS = "usergrid.graph.shard.audit.worker.count";
 
 
-    public static final String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
+    String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
 
 
     /**
@@ -80,14 +80,14 @@ public interface GraphFig extends GuicyFig {
      * Note that you should also pad this for node clock drift.  A good value for this would be 2x the shard cache
      * timeout + 30 seconds, assuming you have NTP and allow a max drift of 30 seconds
      */
-    public static final String SHARD_MIN_DELTA = "usergrid.graph.shard.min.delta";
+    String SHARD_MIN_DELTA = "usergrid.graph.shard.min.delta";
 
 
-    public static final String COUNTER_WRITE_FLUSH_COUNT = "usergrid.graph.shard.counter.beginFlush.count";
+    String COUNTER_WRITE_FLUSH_COUNT = "usergrid.graph.shard.counter.beginFlush.count";
 
-    public static final String COUNTER_WRITE_FLUSH_INTERVAL = "usergrid.graph.shard.counter.beginFlush.interval";
+    String COUNTER_WRITE_FLUSH_INTERVAL = "usergrid.graph.shard.counter.beginFlush.interval";
 
-    public static final String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
+    String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
index 4c38c13..987a36c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
@@ -64,23 +64,23 @@ public interface GraphManager extends CPManager {
 
 
     /**
-     * @param edge The edge to delete
+     * @param edge Mark the edge as deleted in the graph
      *
      *
      * EdgeDelete the edge. Implementation should also delete the incoming (reversed) edge. Only deletes the specific version
      */
-    Observable<Edge> deleteEdge( Edge edge );
+    Observable<Edge> markEdge( Edge edge );
 
     /**
      *
-     * Remove the node from the graph.
+     * Mark the node as removed from the graph.
      *
      * @param node The node to remove
      * @param timestamp The timestamp to apply the delete operation.  Any edges connected to this node with a timestmap
      * <= the specified time will be removed from the graph
      * @return
      */
-    Observable<Id> deleteNode(Id node, long timestamp);
+    Observable<Id> markNode( Id node, long timestamp );
 
     /**
      * Get all versions of this edge where versions <= max version

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
index eb49711..d73f767 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
@@ -36,7 +36,7 @@ public interface GraphManagerFactory
      *
      * @param collectionScope The context to use when creating the graph manager
      */
-    public GraphManager createEdgeManager( ApplicationScope collectionScope );
+    GraphManager createEdgeManager( ApplicationScope collectionScope );
 
     void invalidate();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
index 114440f..38f62b5 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
@@ -59,6 +59,12 @@ public interface SearchByEdge {
     long getMaxTimestamp();
 
     /**
+     * Return true if we should filter edges marked for deletion
+     * @return
+     */
+    boolean filterMarked();
+
+    /**
      * The optional start parameter.  All edges emitted with be > the specified start edge.
      * This is useful for paging.  Simply use the last value returned in the previous call in the start parameter
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
index 5e47ae0..f213b00 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
@@ -65,6 +65,12 @@ public interface SearchByEdgeType {
      */
     Order getOrder();
 
+    /**
+     * Return true to filter marked edges from the results
+     * @return
+     */
+    boolean filterMarked();
+
 
     /**
      * Options for ordering.  By default, we want to perform descending for common use cases and read speed.  This is our our data

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 6cdaef0..4b628d1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -28,8 +28,6 @@ import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
 import org.apache.usergrid.persistence.core.migration.data.MigrationRelationship;
 import org.apache.usergrid.persistence.core.migration.data.VersionedMigrationSet;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
-import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteListener;
@@ -196,15 +194,6 @@ public abstract class GraphModule extends AbstractModule {
     }
 
 
-    @Inject
-    @Singleton
-    @Provides
-    @GraphTaskExecutor
-    public TaskExecutor graphTaskExecutor( final GraphFig graphFig ) {
-        return new NamedTaskExecutorImpl( "graphTaskExecutor", graphFig.getShardAuditWorkerCount(),
-                graphFig.getShardAuditWorkerQueueSize() );
-    }
-
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 9c0c62d..9a8a00f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -120,12 +120,9 @@ public class GraphManagerImpl implements GraphManager {
     @Inject
     public GraphManagerImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
                              final EdgeSerialization storageEdgeSerialization,
-                             final NodeSerialization nodeSerialization,
-                             final GraphFig graphFig,
-                             final EdgeDeleteListener edgeDeleteListener,
-                             final NodeDeleteListener nodeDeleteListener,
-                             final ApplicationScope scope,
-                             MetricsFactory metricsFactory) {
+                             final NodeSerialization nodeSerialization, final GraphFig graphFig,
+                             final EdgeDeleteListener edgeDeleteListener, final NodeDeleteListener nodeDeleteListener,
+                             final ApplicationScope scope, MetricsFactory metricsFactory ) {
 
 
         ValidationUtils.validateApplicationScope( scope );
@@ -146,36 +143,34 @@ public class GraphManagerImpl implements GraphManager {
 
         this.edgeDeleteSubcriber = MetricSubscriber.INSTANCE;
         this.nodeDelete = MetricSubscriber.INSTANCE;
-        this.writeEdgeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "write.edge.meter");
-        this.writeEdgeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "write.edge.timer");
-        this.deleteEdgeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "delete.edge.meter");
-        this.deleteEdgeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "delete.edge.timer");
-        this.deleteNodeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "delete.node.meter");
-        this.deleteNodeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "delete.node.timer");
-        this.loadEdgesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.from.meter");
-        this.loadEdgesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.from.timer");
-        this.loadEdgesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.to.meter");
-        this.loadEdgesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.to.timer");
-        this.loadEdgesVersionsMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.versions.meter");
-        this.loadEdgesVersionsTimer = metricsFactory.getTimer(GraphManagerImpl.class,"load.versions.timer");
-        this.loadEdgesFromSourceByTypeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.from.type.meter");
-        this.loadEdgesFromSourceByTypeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.from.type.timer");
-        this.loadEdgesToTargetByTypeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.to.type.meter");
-        this.loadEdgesToTargetByTypeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.to.type.timer");
-
-        this.getEdgeTypesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.edge.from.timer");
-        this.getEdgeTypesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.edge.from.meter");
-
-        this.getIdTypesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.idtype.from.timer");
-        this.getIdTypesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.idtype.from.meter");
-
-        this.getEdgeTypesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.edge.to.timer");
-        this.getEdgeTypesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.edge.to.meter");
-
-        this.getIdTypesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class, "get.idtype.to.timer");
-        this.getIdTypesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.idtype.to.meter");
-
-
+        this.writeEdgeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "write.edge.meter" );
+        this.writeEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "write.edge.timer" );
+        this.deleteEdgeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "delete.edge.meter" );
+        this.deleteEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "delete.edge.timer" );
+        this.deleteNodeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "delete.node.meter" );
+        this.deleteNodeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "delete.node.timer" );
+        this.loadEdgesFromSourceMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.from.meter" );
+        this.loadEdgesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.from.timer" );
+        this.loadEdgesToTargetMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.to.meter" );
+        this.loadEdgesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.to.timer" );
+        this.loadEdgesVersionsMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.versions.meter" );
+        this.loadEdgesVersionsTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.versions.timer" );
+        this.loadEdgesFromSourceByTypeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.from.type.meter" );
+        this.loadEdgesFromSourceByTypeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.from.type.timer" );
+        this.loadEdgesToTargetByTypeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.to.type.meter" );
+        this.loadEdgesToTargetByTypeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.to.type.timer" );
+
+        this.getEdgeTypesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.edge.from.timer" );
+        this.getEdgeTypesFromSourceMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.edge.from.meter" );
+
+        this.getIdTypesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.idtype.from.timer" );
+        this.getIdTypesFromSourceMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.idtype.from.meter" );
+
+        this.getEdgeTypesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.edge.to.timer" );
+        this.getEdgeTypesToTargetMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.edge.to.meter" );
+
+        this.getIdTypesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.idtype.to.timer" );
+        this.getIdTypesToTargetMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.idtype.to.meter" );
     }
 
 
@@ -209,41 +204,39 @@ public class GraphManagerImpl implements GraphManager {
 
                 return edge;
             }
-        } )
-            .doOnEach(new Action1<Notification<? super Edge>>() {
-                @Override
-                public void call(Notification<? super Edge> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).doOnEach( new Action1<Notification<? super Edge>>() {
+            @Override
+            public void call( Notification<? super Edge> notification ) {
+                meter.mark();
+            }
+        } ).doOnCompleted( new Action0() {
+            @Override
+            public void call() {
+                timer.stop();
+            }
+        } );
     }
 
 
     @Override
-    public Observable<Edge> deleteEdge( final Edge edge ) {
-        GraphValidation.validateEdge(edge);
+    public Observable<Edge> markEdge( final Edge edge ) {
+        GraphValidation.validateEdge( edge );
 
-        final MarkedEdge markedEdge = new SimpleMarkedEdge(edge, true);
+        final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, true );
 
         final Timer.Context timer = deleteEdgeTimer.time();
         final Meter meter = deleteEdgeMeter;
-        return Observable.just(markedEdge).map(new Func1<MarkedEdge, Edge>() {
+        return Observable.just( markedEdge ).map( new Func1<MarkedEdge, Edge>() {
             @Override
-            public Edge call(final MarkedEdge edge) {
+            public Edge call( final MarkedEdge edge ) {
 
                 final UUID timestamp = UUIDGenerator.newTimeUUID();
 
 
-                final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge(scope, edge, timestamp);
+                final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge, timestamp );
 
 
-                LOG.debug("Marking edge {} as deleted to commit log", edge);
+                LOG.debug( "Marking edge {} as deleted to commit log", edge );
                 try {
                     edgeMutation.execute();
                 }
@@ -254,73 +247,50 @@ public class GraphManagerImpl implements GraphManager {
 
                 //HystrixCassandra.async( edgeDeleteListener.receive( scope, markedEdge,
                 // timestamp )).subscribeOn( Schedulers.io() ).subscribe( edgeDeleteSubcriber );
-                edgeDeleteListener.receive(scope, markedEdge, timestamp).subscribeOn(Schedulers.io())
-                    .subscribe(edgeDeleteSubcriber);
+                edgeDeleteListener.receive( scope, markedEdge, timestamp ).subscribeOn( Schedulers.io() )
+                                  .subscribe( edgeDeleteSubcriber );
 
 
                 return edge;
             }
-        })
-            .doOnEach(new Action1<Notification<? super Edge>>() {
-                @Override
-                public void call(Notification<? super Edge> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).doOnEach( notification -> {
+            meter.mark();
+        } ).doOnCompleted( () -> timer.stop() );
     }
 
 
     @Override
-    public Observable<Id> deleteNode( final Id node, final long timestamp ) {
+    public Observable<Id> markNode( final Id node, final long timestamp ) {
         final Timer.Context timer = deleteNodeTimer.time();
         final Meter meter = deleteNodeMeter;
-        return Observable.just( node ).map( new Func1<Id, Id>() {
-            @Override
-            public Id call( final Id id ) {
+        return Observable.just( node ).map( id -> {
 
-                //mark the node as deleted
+            //mark the node as deleted
 
 
-                final UUID eventTimestamp = UUIDGenerator.newTimeUUID();
+            final UUID eventTimestamp = UUIDGenerator.newTimeUUID();
 
-                final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, timestamp );
+            final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, timestamp );
 
 
-                LOG.debug( "Marking node {} as deleted to node mark", node );
-                try {
-                    nodeMutation.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( "Unable to execute mutation", e );
-                }
+            LOG.debug( "Marking node {} as deleted to node mark", node );
+            try {
+                nodeMutation.execute();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to execute mutation", e );
+            }
 
 
-                //HystrixCassandra.async(nodeDeleteListener.receive(scope, id, eventTimestamp  )).subscribeOn(
-                // Schedulers.io() ).subscribe( nodeDelete );
-                nodeDeleteListener.receive( scope, id, eventTimestamp ).subscribeOn( Schedulers.io() )
-                                  .subscribe( nodeDelete );
+            //HystrixCassandra.async(nodeDeleteListener.receive(scope, id, eventTimestamp  )).subscribeOn(
+            // Schedulers.io() ).subscribe( nodeDelete );
+            nodeDeleteListener.receive( scope, id, eventTimestamp ).subscribeOn( Schedulers.io() )
+                              .subscribe( nodeDelete );
 
-                return id;
-            }
-        } )
-            .doOnEach(new Action1<Notification<? super Id>>() {
-                @Override
-                public void call(Notification<? super Id> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+            return id;
+        } ).doOnEach( notification -> {
+            meter.mark();
+        } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -333,20 +303,11 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<MarkedEdge> getIterator() {
                 return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge );
             }
-        } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(searchByEdge.getMaxTimestamp()))
-                         .cast(Edge.class)
-            .doOnEach(new Action1<Notification<? super Edge>>() {
-                @Override
-                public void call(Notification<? super Edge> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).buffer( graphFig.getScanPageSize() )
+                         .flatMap( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(),  searchByEdge.filterMarked() ) )
+                         .cast( Edge.class ).doOnEach( notification -> {
+                meter.mark();
+            } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -359,20 +320,11 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<MarkedEdge> getIterator() {
                 return storageEdgeSerialization.getEdgesFromSource( scope, search );
             }
-        } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp()))
-                         .cast(Edge.class)
-            .doOnEach(new Action1<Notification<? super Edge>>() {
-                @Override
-                public void call(Notification<? super Edge> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).buffer( graphFig.getScanPageSize() )
+                         .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(),  search.filterMarked() ) ).cast( Edge.class )
+                         .doOnEach( notification -> {
+                             meter.mark();
+                         } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -385,20 +337,11 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<MarkedEdge> getIterator() {
                 return storageEdgeSerialization.getEdgesToTarget( scope, search );
             }
-        } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp()))
-                         .cast(Edge.class)
-            .doOnEach(new Action1<Notification<? super Edge>>() {
-                @Override
-                public void call(Notification<? super Edge> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).buffer( graphFig.getScanPageSize() )
+                         .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ).cast( Edge.class )
+                         .doOnEach( notification -> {
+                             meter.mark();
+                         } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -411,21 +354,12 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<MarkedEdge> getIterator() {
                 return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
             }
-        } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp()))
+        } ).buffer( graphFig.getScanPageSize() )
+                         .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(),  search.filterMarked() ) )
 
-                         .cast(Edge.class)
-            .doOnEach(new Action1<Notification<? super Edge>>() {
-                @Override
-                public void call(Notification<? super Edge> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+                         .cast( Edge.class ).doOnEach( notification -> {
+                meter.mark();
+            } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -438,20 +372,11 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<MarkedEdge> getIterator() {
                 return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );
             }
-        } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp()))
-                         .cast(Edge.class)
-            .doOnEach(new Action1<Notification<? super Edge>>() {
-                @Override
-                public void call(Notification<? super Edge> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).buffer( graphFig.getScanPageSize() )
+                         .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(),  search.filterMarked() ) ).cast( Edge.class )
+                         .doOnEach( notification -> {
+                             meter.mark();
+                         } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -464,19 +389,9 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
             }
-        } )
-            .doOnEach(new Action1<Notification<? super String>>() {
-                @Override
-                public void call(Notification<? super String> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).doOnEach( notification -> {
+            meter.mark();
+        } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -489,19 +404,9 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
             }
-        } )
-            .doOnEach(new Action1<Notification<? super String>>() {
-                @Override
-                public void call(Notification<? super String> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).doOnEach( notification -> {
+            meter.mark();
+        } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -514,19 +419,9 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
             }
-        } )
-            .doOnEach(new Action1<Notification<? super String>>() {
-                @Override
-                public void call(Notification<? super String> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).doOnEach( notification -> {
+            meter.mark();
+        } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -539,19 +434,9 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
             }
-        } )
-            .doOnEach(new Action1<Notification<? super String>>() {
-                @Override
-                public void call(Notification<? super String> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).doOnEach( notification -> {
+            meter.mark();
+        } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -561,10 +446,12 @@ public class GraphManagerImpl implements GraphManager {
     private class EdgeBufferFilter implements Func1<List<MarkedEdge>, Observable<MarkedEdge>> {
 
         private final long maxVersion;
+        private final boolean filterMarked;
 
 
-        private EdgeBufferFilter( final long maxVersion ) {
+        private EdgeBufferFilter( final long maxVersion, final boolean filterMarked ) {
             this.maxVersion = maxVersion;
+            this.filterMarked = filterMarked;
         }
 
 
@@ -579,57 +466,36 @@ public class GraphManagerImpl implements GraphManager {
         public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
 
             final Map<Id, Long> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges );
-            return Observable.from( markedEdges ).filter( new EdgeFilter( this.maxVersion, markedVersions ) );
-        }
-    }
-
-
-    /**
-     * Filter the returned values based on the max uuid and if it's been marked for deletion or not
-     */
-    private static class EdgeFilter implements Func1<MarkedEdge, Boolean> {
-
-        private final long maxTimestamp;
-
-        private final Map<Id, Long> markCache;
-
-
-        private EdgeFilter( final long maxTimestamp, Map<Id, Long> markCache ) {
-            this.maxTimestamp = maxTimestamp;
-            this.markCache = markCache;
-        }
-
-
-        @Override
-        public Boolean call( final MarkedEdge edge ) {
-
+            final long maxTimestamp = maxVersion;
 
-            final long edgeTimestamp = edge.getTimestamp();
+            return Observable.from( markedEdges ).filter( edge -> {
+                final long edgeTimestamp = edge.getTimestamp();
 
-            //our edge needs to not be deleted and have a version that's > max Version
-            if ( edge.isDeleted() || Long.compare( edgeTimestamp, maxTimestamp ) > 0 ) {
-                return false;
-            }
+                //our edge needs to not be deleted and have a version that's > max Version
+                if ( edge.isDeleted() || Long.compare( edgeTimestamp, maxTimestamp ) > 0 ) {
+                    return false;
+                }
 
 
-            final Long sourceTimestamp = markCache.get( edge.getSourceNode() );
+                final Long sourceTimestamp = markedVersions.get( edge.getSourceNode() );
 
-            //the source Id has been marked for deletion.  It's version is <= to the marked version for deletion,
-            // so we need to discard it
-            if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) {
-                return false;
-            }
+                //the source Id has been marked for deletion.  It's version is <= to the marked version for deletion,
+                // so we need to discard it
+                if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) {
+                    return false;
+                }
 
-            final Long targetTimestamp = markCache.get( edge.getTargetNode() );
+                final Long targetTimestamp = markedVersions.get( edge.getTargetNode() );
 
-            //the target Id has been marked for deletion.  It's version is <= to the marked version for deletion,
-            // so we need to discard it
-            if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) {
-                return false;
-            }
+                //the target Id has been marked for deletion.  It's version is <= to the marked version for deletion,
+                // so we need to discard it
+                if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) {
+                    return false;
+                }
 
 
-            return true;
+                return true;
+            } );
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
index a28a0bb..9a23caf 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
@@ -33,7 +33,6 @@ import com.google.common.base.Preconditions;
 
 /**
  * Simple bean implementation of search by edge
- *
  */
 public class SimpleSearchByEdge implements SearchByEdge {
 
@@ -43,24 +42,44 @@ public class SimpleSearchByEdge implements SearchByEdge {
     private final long maxTimestamp;
     private final Optional<Edge> last;
     private final SearchByEdgeType.Order order;
+    private final boolean filterMarked;
 
 
     /**
      * Create the search modules
+     *
      * @param sourceNode The source node of the edge
      * @param targetNode The target node of the edge
      * @param type The edge type
      * @param maxTimestamp The maximum timestamp to seek from
      * @param last The value to start seeking from.  Must be >= this value
      */
-    public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp, final SearchByEdgeType.Order order, final Optional<Edge> last ) {
+    public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp,
+                               final SearchByEdgeType.Order order, final Optional<Edge> last ) {
+        this( sourceNode, type, targetNode, maxTimestamp, order, last, true );
+    }
+
+
+    /**
+     * Create the search modules
+     *
+     * @param sourceNode The source node of the edge
+     * @param type The edge type
+     * @param targetNode The target node of the edge
+     * @param maxTimestamp The maximum timestamp to seek from
+     * @param last The value to start seeking from.  Must be >= this value
+     */
+    public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp,
+                               final SearchByEdgeType.Order order, final Optional<Edge> last,
+                               final boolean filterMarked ) {
 
-        ValidationUtils.verifyIdentity(sourceNode);
-        ValidationUtils.verifyIdentity(targetNode);
-        ValidationUtils.verifyString(type, "type");
-        GraphValidation.validateTimestamp(maxTimestamp, "maxTimestamp");
-        Preconditions.checkNotNull(order, "order must not be null");
-        Preconditions.checkNotNull(last, "last can never be null");
+
+        ValidationUtils.verifyIdentity( sourceNode );
+        ValidationUtils.verifyIdentity( targetNode );
+        ValidationUtils.verifyString( type, "type" );
+        GraphValidation.validateTimestamp( maxTimestamp, "maxTimestamp" );
+        Preconditions.checkNotNull( order, "order must not be null" );
+        Preconditions.checkNotNull( last, "last can never be null" );
 
 
         this.sourceNode = sourceNode;
@@ -69,6 +88,7 @@ public class SimpleSearchByEdge implements SearchByEdge {
         this.maxTimestamp = maxTimestamp;
         this.order = order;
         this.last = last;
+        this.filterMarked = filterMarked;
     }
 
 
@@ -97,6 +117,10 @@ public class SimpleSearchByEdge implements SearchByEdge {
 
 
     @Override
+    public boolean filterMarked() { return filterMarked; }
+
+
+    @Override
     public Optional<Edge> last() {
         return last;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
index 1687162..9392dbc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
@@ -41,6 +41,7 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
     private final long maxTimestamp;
     private final Optional<Edge> last;
     private final Order order;
+    private final boolean filterMarked;
 
 
     /**
@@ -55,7 +56,7 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
      * //TODO, make last an optional
      */
     public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order, final Edge last ) {
-        this(node, type, maxTimestamp, order, Optional.fromNullable(last));
+        this(node, type, maxTimestamp, order, Optional.fromNullable(last), true);
     }
 
 
@@ -70,7 +71,24 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
      *
      * //TODO, make last an optional
      */
-    public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order, final Optional<Edge> last ) {
+    public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order,
+                                   final Optional<Edge> last ) {
+        this( node, type, maxTimestamp, order, last, true );
+    }
+
+
+    /**
+     * Create the search modules
+     * @param node The node to search from
+     * @param type The edge type
+     * @param maxTimestamp The maximum timestamp to return
+     * @param order The order order.  Descending is most efficient
+     * @param last The value to start seeking from.  Must be >= this value
+     * @param filterMarked
+     */
+    public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order,
+                                   final Optional<Edge> last, final boolean filterMarked ) {
+
 
         Preconditions.checkNotNull( order, "order is required");
         ValidationUtils.verifyIdentity( node );
@@ -84,6 +102,7 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
         this.maxTimestamp = maxTimestamp;
         this.order = order;
         this.last = last;
+        this.filterMarked = filterMarked;
     }
 
 
@@ -118,6 +137,12 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
 
 
     @Override
+    public boolean filterMarked() {
+        return filterMarked;
+    }
+
+
+    @Override
     public boolean equals( final Object o ) {
         if ( this == o ) {
             return true;