You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/12 21:22:24 UTC

[1/4] git commit: Checkpoint for delete listener and tests

Repository: incubator-usergrid
Updated Branches:
  refs/heads/asyncqueue dd37909fd -> 78dc432d1


Checkpoint for delete listener and tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0a256bb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0a256bb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0a256bb5

Branch: refs/heads/asyncqueue
Commit: 0a256bb57fb376e0efff9ab13636e95960b9b27b
Parents: dd37909
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Mar 10 19:11:21 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Mar 10 19:11:21 2014 -0700

----------------------------------------------------------------------
 .../persistence/collection/rx/ParallelTest.java | 218 +++++++++++++++++++
 .../graph/consistency/AsyncProcessorImpl.java   |   3 +-
 .../graph/consistency/TimeoutTask.java          |   2 +-
 .../graph/impl/EdgeDeleteListener.java          |  12 +-
 .../graph/impl/NodeDeleteListener.java          | 166 +++++++++++---
 .../graph/consistency/AsyncProcessorTest.java   |  14 +-
 .../graph/impl/NodeDeleteListenerTest.java      | 217 ++++++++++++++++++
 7 files changed, 589 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a256bb5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
new file mode 100644
index 0000000..5d72ae6
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.rx;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+import com.netflix.config.ConfigurationManager;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+import rx.functions.FuncN;
+import rx.schedulers.Schedulers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Tests that provides examples of how to perform more complex RX operations
+ */
+public class ParallelTest {
+
+    private static final Logger logger = LoggerFactory.getLogger( ParallelTest.class );
+
+//
+//    private static final HystrixCommandGroupKey GROUP_KEY = HystrixCommandGroupKey.Factory.asKey( "TEST_KEY" );
+//
+//
+//    public static final String THREAD_POOL_SIZE = CommandUtils.getThreadPoolCoreSize( GROUP_KEY.name() );
+//
+//    public static final String THREAD_POOL_QUEUE = CommandUtils.getThreadPoolMaxQueueSize( GROUP_KEY.name() );
+
+
+    /**
+     * An example of how an observable that requires a "fan out" then join should execute.
+     */
+    @Test(timeout = 5000)
+    public void concurrentFunctions() {
+        final String input = "input";
+
+        final int size = 100;
+        //since we start at index 0
+        final int expected = size - 1;
+
+
+        /**
+         * QUESTION Using this thread blocks indefinitely.  The execution of the Hystrix command happens on the
+         * computation
+         * Thread if this is used
+         */
+        //        final Scheduler scheduler = Schedulers.threadPoolForComputation();
+
+        //use the I/O scheduler to allow enough thread, otherwise our pool will be the same size as the # of cores
+        final Scheduler scheduler = Schedulers.io();
+
+        //set our size equal
+//        ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE, size );
+        //        ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE, 10 );
+
+        //reject requests we have to queue
+//        ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_QUEUE, -1 );
+
+        //latch used to make each thread block to prove correctness
+        final CountDownLatch latch = new CountDownLatch( size );
+
+
+        final Multiset<String> set = HashMultiset.create();
+
+
+        //create our observable and execute it in the I/O pool since we'll be doing I/O operations
+
+        /**
+         *  QUESTION: Should this use the computation scheduler since all operations (except the hystrix command) are
+         *  non blocking?
+         */
+
+        final Observable<String> observable = Observable.from( input ).observeOn( scheduler );
+
+
+        Observable<Integer> thing = observable.mapMany( new Func1<String, Observable<Integer>>() {
+
+            @Override
+            public Observable<Integer> call( final String s ) {
+                List<Observable<Integer>> functions = new ArrayList<Observable<Integer>>();
+
+                logger.info( "Creating new set of observables in thread {}", Thread.currentThread().getName() );
+
+                for ( int i = 0; i < size; i++ ) {
+
+
+                    final int index = i;
+
+                    //create a new observable and execute the function on it.  These should happen in parallel when
+                    // a subscription occurs
+
+                    /**
+                     * QUESTION: Should this again be the process thread, not the I/O
+                     */
+                    Observable<String> newObservable = Observable.from( input ).subscribeOn( scheduler );
+
+                    Observable<Integer> transformed = newObservable.map( new Func1<String, Integer>() {
+
+                        @Override
+                        public Integer call( final String s ) {
+
+                            final String threadName = Thread.currentThread().getName();
+
+                            logger.info( "Invoking parallel task in thread {}", threadName );
+
+//                            /**
+//                             * Simulate a Hystrix command making a call to an external resource.  Invokes
+//                             * the Hystrix command immediately as the function is invoked.  This is currently
+//                             * how we have to call Cassandra.
+//                             *
+//                             * TODO This needs to be re-written and evaluated once this PR is released https://github.com/Netflix/Hystrix/pull/209
+//                             */
+//                            return new HystrixCommand<Integer>( GROUP_KEY ) {
+//                                @Override
+//                                protected Integer run() throws Exception {
+//
+//                                    final String threadName = Thread.currentThread().getName();
+//
+//                                    logger.info( "Invoking hystrix task in thread {}", threadName );
+
+
+                                    set.add( threadName );
+
+                                    latch.countDown();
+
+                                    try {
+                                        latch.await();
+                                    }
+                                    catch ( InterruptedException e ) {
+                                        throw new RuntimeException( "Interrupted", e );
+                                    }
+
+//                                    assertTrue( isExecutedInThread() );
+//
+//                                    return index;
+//                                }
+//                            }.execute();
+
+                            return index;
+                        }
+                    } );
+
+                    functions.add( transformed );
+                }
+
+                /**
+                 * Execute the functions above and zip the results together
+                 */
+                Observable<Integer> zipped = Observable.zip( functions, new FuncN<Integer>() {
+
+                    @Override
+                    public Integer call( final Object... args ) {
+
+                        logger.info( "Invoking zip in thread {}", Thread.currentThread().getName() );
+
+                        assertEquals( size, args.length );
+
+                        for ( int i = 0; i < args.length; i++ ) {
+                            assertEquals( "Indexes are returned in order", i, args[i] );
+                        }
+
+                        //just return our string
+                        return ( Integer ) args[args.length - 1];
+                    }
+                } );
+
+                return zipped;
+            }
+        } );
+
+
+        final Integer last = thing.toBlockingObservable().last();
+
+
+        assertEquals( expected, last.intValue() );
+
+        assertEquals( size, set.size() );
+
+        /**
+         * Ensure only 1 entry per thread
+         */
+        for ( String entry : set.elementSet() ) {
+            assertEquals( 1, set.count( entry ) );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a256bb5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index d962a88..b949d55 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -3,7 +3,6 @@ package org.apache.usergrid.persistence.graph.consistency;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -103,7 +102,7 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
         } ).subscribe( new Action1<AsynchronousMessage<T>>() {
             @Override
             public void call( final AsynchronousMessage<T> asynchronousMessage ) {
-                //To change body of implemented methods use File | Settings | File Templates.
+             //no op
             }
         } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a256bb5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
index 6607724..fbfad3e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
@@ -32,7 +32,7 @@ public class TimeoutTask<T> implements Action1<Scheduler.Inner> {
          * We purposefully loop through a tight loop.  If we have anything to process, we need to do so
          * Once we run out of items to process, this thread will sleep and the timer will fire
          */
-        while(!inner.isUnsubscribed()) {
+        while(!inner.isUnsubscribed() && graphFig.getTimeoutReadSize() > 0) {
 
             Iterator<AsynchronousMessage<T>> timeouts = getTimeouts();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a256bb5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
index 6feded3..85e4b4f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -79,7 +79,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
                                 new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
                                         edge.getVersion(), null ) );
                     }
-                } ).doOnEach( new Action1<MarkedEdge>() {
+                } ).doOnNext( new Action1<MarkedEdge>() {
                     @Override
                     public void call( final MarkedEdge markedEdge ) {
                         final MutationBatch delete = edgeSerialization.deleteEdge( scope, markedEdge );
@@ -93,7 +93,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
                 Observable<Integer> sourceIdType = edgeManager.loadEdgesFromSourceByType(
                         new SimpleSearchByIdType( edge.getSourceNode(), edge.getType(), maxVersion,
                                 edge.getTargetNode().getType(), null ) ).take( 2 ).count()
-                                                              .doOnEach( new Action1<Integer>() {
+                                                              .doOnNext( new Action1<Integer>() {
                                                                   @Override
                                                                   public void call( final Integer count ) {
                                                                       //There's nothing to do,
@@ -114,7 +114,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
                 Observable<Integer> targetIdType = edgeManager.loadEdgesToTargetByType(
                         new SimpleSearchByIdType( edge.getTargetNode(), edge.getType(), maxVersion,
                                 edge.getSourceNode().getType(), null ) ).take( 2 ).count()
-                                                              .doOnEach( new Action1<Integer>() {
+                                                              .doOnNext( new Action1<Integer>() {
                                                                   @Override
                                                                   public void call( final Integer count ) {
                                                                       //There's nothing to do,
@@ -136,7 +136,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
                 // we can't delete it
                 Observable<Integer> sourceType = edgeManager.loadEdgesFromSource(
                         new SimpleSearchByEdgeType( edge.getSourceNode(), edge.getType(), maxVersion, null ) ).take( 2 )
-                                                            .count().doOnEach( new Action1<Integer>() {
+                                                            .count().doOnNext( new Action1<Integer>() {
                             @Override
                             public void call( final Integer count ) {
                                 //There's nothing to do,
@@ -146,6 +146,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
                                 if ( count == 1 ) {
                                     final MutationBatch delete =
                                             edgeMetadataSerialization.removeEdgeTypeFromSource( scope, edge );
+                                    batch.mergeShallow( delete );
                                 }
                             }
                         } );
@@ -153,7 +154,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
 
                 Observable<Integer> targetType = edgeManager.loadEdgesToTarget(
                         new SimpleSearchByEdgeType( edge.getTargetNode(), edge.getType(), maxVersion, null ) ).take( 2 )
-                                                            .count().doOnEach( new Action1<Integer>() {
+                                                            .count().doOnNext( new Action1<Integer>() {
                             @Override
                             public void call( final Integer count ) {
                                 //There's nothing to do,
@@ -163,6 +164,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
                                 if ( count == 1 ) {
                                     final MutationBatch delete =
                                             edgeMetadataSerialization.removeEdgeTypeToTarget( scope, edge );
+                                    batch.mergeShallow( delete );
                                 }
                             }
                         } );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a256bb5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index aabc572..fbd5d08 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -2,9 +2,11 @@ package org.apache.usergrid.persistence.graph.impl;
 
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.SearchEdgeType;
@@ -19,8 +21,10 @@ import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;
 import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
-import rx.Notification;
 import rx.Observable;
 import rx.functions.Action1;
 import rx.functions.Func1;
@@ -34,6 +38,8 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
     private final NodeSerialization nodeSerialization;
     private final EdgeSerialization edgeSerialization;
     private final EdgeMetadataSerialization edgeMetadataSerialization;
+    private final GraphFig graphFig;
+    private final Keyspace keyspace;
 
 
     /**
@@ -41,12 +47,15 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
      */
     @Inject
     public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
-                               final EdgeMetadataSerialization edgeMetadataSerialization, @NodeDelete final AsyncProcessor nodeDelete) {
+                               final EdgeMetadataSerialization edgeMetadataSerialization, final GraphFig graphFig,
+                               final Keyspace keyspace, @NodeDelete final AsyncProcessor nodeDelete ) {
 
 
         this.nodeSerialization = nodeSerialization;
         this.edgeSerialization = edgeSerialization;
         this.edgeMetadataSerialization = edgeMetadataSerialization;
+        this.graphFig = graphFig;
+        this.keyspace = keyspace;
         nodeDelete.addListener( this );
     }
 
@@ -63,33 +72,83 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
             public Optional<UUID> call( final Id id ) {
                 return nodeSerialization.getMaxVersion( scope, node );
             }
-        } ).flatMap( new Func1<Optional<UUID>, Observable<MarkedEdge>>() {
-            @Override
-            public Observable<MarkedEdge> call( final Optional<UUID> uuidOptional ) {
-
-                return getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
-                        .flatMap( new Func1<String, Observable<MarkedEdge>>() {
-                            @Override
-                            public Observable<MarkedEdge> call( final String edgeType ) {
+        } ) //only continue if it's present.  Otherwise stop
+                .takeWhile( new Func1<Optional<UUID>, Boolean>() {
+                    @Override
+                    public Boolean call( final Optional<UUID> uuidOptional ) {
+                        return uuidOptional.isPresent();
+                    }
+                } )
+
+                //delete source and targets in parallel and merge them into a single observable
+                .flatMap( new Func1<Optional<UUID>, Observable<List<MarkedEdge>>>() {
+                    @Override
+                    public Observable<List<MarkedEdge>> call( final Optional<UUID> uuidOptional ) {
+
+                        //get all edges pointing to the target node and buffer then into groups for deletion
+                        Observable<List<MarkedEdge>> targetEdges =
+                                getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
+                                        .flatMap( new Func1<String, Observable<List<MarkedEdge>>>() {
+                                            @Override
+                                            public Observable<List<MarkedEdge>> call( final String edgeType ) {
+                                                return loadEdgesToTarget( scope,
+                                                        new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(),
+                                                                null ) ).buffer( graphFig.getScanPageSize() );
+                                            }
+                                        } );
 
-                                //for each edge type, we want to search all edges < this version to the node and
-                                // delete them. We might want to batch this up for efficiency
-                                return loadEdgesToTarget( scope,
-                                        new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(), null ) )
 
-                                        .doOnEach( new Action1<MarkedEdge>() {
+                        //get all edges pointing to the source node and buffer them into groups for deletion
+                        Observable<List<MarkedEdge>> sourceEdges =
+                                getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null ) )
+                                        .flatMap( new Func1<String, Observable<List<MarkedEdge>>>() {
                                             @Override
-                                            public void call( final MarkedEdge markedEdge ) {
-                                                edgeSerialization.deleteEdge( scope, markedEdge );
+                                            public Observable<List<MarkedEdge>> call( final String edgeType ) {
+                                                return loadEdgesFromSource( scope,
+                                                        new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(),
+                                                                null ) ).buffer( graphFig.getScanPageSize() );
                                             }
                                         } );
-                            }
-                        } );
-            }
-        } ).map( new Func1<MarkedEdge, EdgeEvent<Id>>() {
+
+
+                        return Observable.merge( targetEdges, sourceEdges );
+                    }
+                } ).doOnNext( new Action1<List<MarkedEdge>>() {
+                    @Override
+                    public void call( final List<MarkedEdge> markedEdges ) {
+                        MutationBatch batch = keyspace.prepareMutationBatch();
+
+
+                        for ( MarkedEdge marked : markedEdges ) {
+                            final MutationBatch edgeBatch = edgeSerialization.deleteEdge( scope, marked );
+                            batch.mergeShallow( edgeBatch );
+                        }
+
+                        try {
+                            batch.execute();
+                        }
+                        catch ( ConnectionException e ) {
+                            throw new RuntimeException( "Unable to execute mutation" );
+                        }
+                    }
+                } ).map( new Func1<List<MarkedEdge>, EdgeEvent<Id>>() {
+                    @Override
+                    public EdgeEvent<Id> call( final List<MarkedEdge> markedEdges ) {
+                        return edgeEvent;
+                    }
+                } );
+    }
+
+
+    /**
+     * Get all existing edge types to the target node
+     */
+    private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
+
+        return Observable.create( new ObservableIterator<String>() {
             @Override
-            public EdgeEvent<Id> call( final MarkedEdge edge ) {
-                return edgeEvent;
+            protected Iterator<String> getIterator() {
+                return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
             }
         } );
     }
@@ -97,16 +156,13 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
 
     /**
      * Get all existing edge types to the target node
-     * @param scope
-     * @param search
-     * @return
      */
-    private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
+    private Observable<String> getEdgesTypesFromSource( final OrganizationScope scope, final SearchEdgeType search ) {
 
         return Observable.create( new ObservableIterator<String>() {
             @Override
             protected Iterator<String> getIterator() {
-                return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
+                return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
             }
         } );
     }
@@ -114,9 +170,6 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
 
     /**
      * Load all edges pointing to this target
-     * @param scope
-     * @param search
-     * @return
      */
     private Observable<MarkedEdge> loadEdgesToTarget( final OrganizationScope scope, final SearchByEdgeType search ) {
 
@@ -127,4 +180,55 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
             }
         } );
     }
+
+
+    /**
+     * Load all edges pointing to this target
+     */
+    private Observable<MarkedEdge> loadEdgesFromSource( final OrganizationScope scope, final SearchByEdgeType search ) {
+
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+                return edgeSerialization.getEdgesFromSource( scope, search );
+            }
+        } );
+    }
+
+
+    private class Delete implements Action1<List<MarkedEdge>> {
+
+        private final OrganizationScope scope;
+
+
+        private Delete( final OrganizationScope scope ) {
+            this.scope = scope;
+        }
+
+
+        @Override
+        public void call( final List<MarkedEdge> markedEdges ) {
+
+            //do nothing
+            if ( markedEdges.size() == 0 ) {
+                return;
+            }
+
+            Iterator<MarkedEdge> marked = markedEdges.iterator();
+
+            MutationBatch batch = edgeSerialization.deleteEdge( scope, marked.next() );
+
+            while ( marked.hasNext() ) {
+                final MutationBatch newDelete = edgeSerialization.deleteEdge( scope, marked.next() );
+                batch.mergeShallow( newDelete );
+            }
+
+            try {
+                batch.execute();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to execute batch mutation", e );
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a256bb5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
index 9822cc5..3652757 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
@@ -29,9 +29,11 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import org.apache.usergrid.persistence.graph.GraphFig;
+
 import rx.Observable;
-import rx.concurrency.Schedulers;
 import rx.functions.Action1;
+import rx.schedulers.Schedulers;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -274,7 +276,11 @@ public class AsyncProcessorTest {
      */
     public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue ) {
 
-        AsyncProcessorImpl<T> processor = new AsyncProcessorImpl( queue, Schedulers.threadPoolForIO() );
+        GraphFig fig = mock(GraphFig.class);
+
+        when(fig.getScanPageSize()).thenReturn( 0 );
+
+        AsyncProcessorImpl<T> processor = new AsyncProcessorImpl( queue, Schedulers.io(), fig );
 
 
         return processor;
@@ -305,7 +311,7 @@ public class AsyncProcessorTest {
         @Override
         public Observable<TestEvent> receive( final TestEvent event ) {
 
-            return Observable.from( event ).doOnEach( new Action1<TestEvent>() {
+            return Observable.from( event ).doOnNext( new Action1<TestEvent>() {
                 @Override
                 public void call( final TestEvent testEvent ) {
                     events.push( testEvent );
@@ -342,7 +348,7 @@ public class AsyncProcessorTest {
 
         @Override
         public Observable<TestEvent> receive( final TestEvent event ) {
-            return Observable.from( event ).doOnEach( new Action1<TestEvent>() {
+            return Observable.from( event ).doOnNext( new Action1<TestEvent>() {
                 @Override
                 public void call( final TestEvent testEvent ) {
                     events.push( testEvent );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a256bb5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
new file mode 100644
index 0000000..2d55698
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -0,0 +1,217 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.EdgeManager;
+import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchIdType;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ *
+ *
+ */
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
+public class NodeDeleteListenerTest {
+
+    @ClassRule
+    public static CassandraRule rule = new CassandraRule();
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    @Inject
+    protected NodeDeleteListener deleteListener;
+
+    @Inject
+    protected EdgeSerialization edgeSerialization;
+
+    @Inject
+    protected EdgeMetadataSerialization edgeMetadataSerialization;
+
+    @Inject
+    protected NodeSerialization nodeSerialization;
+
+    @Inject
+    protected EdgeManagerFactory emf;
+
+
+    protected OrganizationScope scope;
+
+
+    @Before
+    public void setup() {
+        scope = mock( OrganizationScope.class );
+
+        Id orgId = mock( Id.class );
+
+        when( orgId.getType() ).thenReturn( "organization" );
+        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+        when( scope.getOrganization() ).thenReturn( orgId );
+    }
+
+
+    /**
+     * Simple test case that tests a single edge and removing the node.  The other target node should be removed as well
+     * since it has no other targets
+     */
+    @Test
+    public void testNoDeletionMarked() {
+
+        EdgeManager em = emf.createEdgeManager( scope );
+
+        Edge edge = createEdge( "source", "test", "target" );
+
+        //write the edge
+        Edge last = em.writeEdge( edge ).toBlockingObservable().last();
+
+        assertEquals( edge, last );
+
+        Id sourceNode = edge.getSourceNode();
+
+        Id targetNode = edge.getTargetNode();
+
+
+        EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, sourceNode );
+
+        EdgeEvent<Id> event = deleteListener.receive( deleteEvent ).toBlockingObservable().lastOrDefault( null );
+
+        assertNull( "Mark was not set, no delete should be executed", event );
+
+
+        deleteEvent = new EdgeEvent<Id>( scope, targetNode );
+
+        event = deleteListener.receive( deleteEvent ).toBlockingObservable().lastOrDefault( null );
+
+        assertNull( "Mark was not set, no delete should be executed", event );
+    }
+
+
+    /**
+     * Simple test case that tests a single edge and removing the node.  The other target node should be removed as well
+     * since it has no other targets
+     */
+    @Test
+    public void testRemoveSourceNode() throws ConnectionException {
+
+        EdgeManager em = emf.createEdgeManager( scope );
+
+        Edge edge = createEdge( "source", "test", "target" );
+
+        //write the edge
+        Edge last = em.writeEdge( edge ).toBlockingObservable().last();
+
+
+        assertEquals( edge, last );
+
+        Id sourceNode = edge.getSourceNode();
+
+        Id targetNode = edge.getTargetNode();
+
+
+        //mark the node so
+        UUID deleteVersion = UUIDGenerator.newTimeUUID();
+
+        nodeSerialization.mark( scope, sourceNode, deleteVersion ).execute();
+
+        EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, sourceNode );
+
+
+        EdgeEvent<Id> event = deleteListener.receive( deleteEvent ).toBlockingObservable().last();
+
+        assertEquals( deleteEvent, event );
+
+        //now verify we can't get any of the info back
+
+        UUID now = UUIDGenerator.newTimeUUID();
+
+
+        Iterator<MarkedEdge> returned = edgeSerialization
+                .getEdgesFromSource( scope, createSearchByEdge( sourceNode, edge.getType(), now, null ) );
+
+        //no edge from source node should be returned
+        assertFalse( "No source should be returned", returned.hasNext() );
+
+        //validate it's not returned by the
+
+        returned = edgeSerialization
+                .getEdgesToTarget( scope, createSearchByEdge( targetNode, edge.getType(), now, null ) );
+
+        assertFalse( "No target should be returned", returned.hasNext() );
+
+        //no types from source
+
+        Iterator<String> types =
+                edgeMetadataSerialization.getEdgeTypesFromSource( scope, createSearchEdge( sourceNode, null ) );
+
+        assertFalse( types.hasNext() );
+
+        //no types to target
+
+        types = edgeMetadataSerialization.getEdgeTypesToTarget( scope, createSearchEdge( targetNode, null ) );
+
+        assertFalse( types.hasNext() );
+
+
+        //no target types from source
+
+        Iterator<String> idTypes = edgeMetadataSerialization
+                .getIdTypesFromSource( scope, createSearchIdType( sourceNode, edge.getType(), null ) );
+
+        assertFalse( idTypes.hasNext() );
+
+        //no source types to target
+        idTypes = edgeMetadataSerialization
+                .getIdTypesToTarget( scope, createSearchIdType( targetNode, edge.getType(), null ) );
+
+        assertFalse( idTypes.hasNext() );
+    }
+
+
+    /**
+     * Simple test case that tests a single edge and removing the node.  The  source node should be removed as well
+     * since it has no other targets
+     */
+    @Test
+    public void testRemoveTargetNode() {
+
+    }
+}


[3/4] git commit: Working meta data cleanup

Posted by sn...@apache.org.
Working meta data cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4dd96aa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4dd96aa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4dd96aa7

Branch: refs/heads/asyncqueue
Commit: 4dd96aa7783ae4058e42ab75689f688ce3ce6e90
Parents: e8568ba
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 12 12:25:17 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 12 12:25:17 2014 -0700

----------------------------------------------------------------------
 stack/corepersistence/collection/pom.xml        |   8 +-
 .../usergrid/persistence/graph/GraphFig.java    |   6 +
 .../persistence/graph/guice/GraphModule.java    |   5 +
 .../graph/impl/NodeDeleteListener.java          |  12 +-
 .../persistence/graph/impl/stage/EdgeAsync.java |  62 +++++
 .../graph/impl/stage/EdgeAsyncImpl.java         | 255 +++++++++++++++++++
 .../graph/impl/NodeDeleteListenerTest.java      |   1 +
 .../graph/impl/stage/EdgeAsyncTest.java         | 154 +++++++++++
 8 files changed, 496 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index 95aea45..7a8e780 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -9,6 +9,7 @@
     <slf4j.version>1.7.2</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
     <chop.version>1.0</chop.version>
+    <rx.version>0.17.0</rx.version>
   </properties>
 
   <parent>
@@ -161,8 +162,13 @@
     <dependency>
         <groupId>com.netflix.rxjava</groupId>
         <artifactId>rxjava-core</artifactId>
-        <version>0.17.0-RC7</version>
+        <version>${rx.version}</version>
     </dependency>
+    <dependency>
+           <groupId>com.netflix.rxjava</groupId>
+           <artifactId>rxjava-math</artifactId>
+           <version>${rx.version}</version>
+       </dependency>
 
 
     <!--<dependency>-->

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 3f87d14..64ca397 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -15,6 +15,8 @@ public interface GraphFig extends GuicyFig {
 
     public static final String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size";
 
+    public static final String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size";
+
 
     public static final String TIMEOUT_SIZE = "usergrid.graph.timeout.page.size";
 
@@ -50,6 +52,10 @@ public interface GraphFig extends GuicyFig {
     @Key( TIMEOUT_TASK_TIME )
     long getTaskLoopTime();
 
+    @Default("10")
+    @Key(REPAIR_CONCURRENT_SIZE)
+    int getRepairConcurrentSize();
+
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index ed9448b..a18e741 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -34,6 +34,8 @@ import org.apache.usergrid.persistence.graph.consistency.LocalTimeoutQueue;
 import org.apache.usergrid.persistence.graph.consistency.TimeoutQueue;
 import org.apache.usergrid.persistence.graph.impl.CollectionIndexObserver;
 import org.apache.usergrid.persistence.graph.impl.EdgeManagerImpl;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeAsync;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeAsyncImpl;
 import org.apache.usergrid.persistence.graph.serialization.CassandraConfig;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
@@ -105,6 +107,9 @@ public class GraphModule extends AbstractModule {
         bind(AsyncProcessor.class).annotatedWith( EdgeDelete.class ).to( AsyncProcessorImpl.class );
         bind(AsyncProcessor.class).annotatedWith( EdgeWrite.class ).to( AsyncProcessorImpl.class );
         bind(AsyncProcessor.class).annotatedWith( NodeDelete.class ).to( AsyncProcessorImpl.class );
+
+        //Edge stages
+        bind( EdgeAsync.class).to( EdgeAsyncImpl.class );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index 1d2124a..58862f4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -40,10 +40,10 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
 
 
     private final NodeSerialization nodeSerialization;
-//    private final EdgeSerialization edgeSerialization;
-//    private final EdgeMetadataSerialization edgeMetadataSerialization;
+    private final EdgeSerialization edgeSerialization;
+    private final EdgeMetadataSerialization edgeMetadataSerialization;
     private final GraphFig graphFig;
-//    private final Keyspace keyspace;
+    private final Keyspace keyspace;
 
     private final Scheduler scheduler;
     private final EdgeManagerFactory edgeManagerFactory;
@@ -61,10 +61,10 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
 
 
         this.nodeSerialization = nodeSerialization;
-//        this.edgeSerialization = edgeSerialization;
-//        this.edgeMetadataSerialization = edgeMetadataSerialization;
+        this.edgeSerialization = edgeSerialization;
+        this.edgeMetadataSerialization = edgeMetadataSerialization;
         this.graphFig = graphFig;
-//        this.keyspace = keyspace;
+        this.keyspace = keyspace;
         this.scheduler = scheduler;
         this.edgeManagerFactory = edgeManagerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
new file mode 100644
index 0000000..90c1bfe
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.MutationBatch;
+
+import rx.Observable;
+
+
+/**
+ * Creates a mutation which will remove obsolete
+ *
+ */
+public interface EdgeAsync {
+
+    /**
+     * Validate that the source types can be cleaned for the given info
+     * @param scope The scope to use
+     * @param sourceId The source Id to use
+     * @param edgeType The edge type
+     * @param version The max version to clean
+     * @return The mutation with the operations
+     */
+    public Observable<Integer> cleanSources(OrganizationScope scope, Id sourceId, String edgeType, UUID version);
+
+
+    /**
+     *
+     * Remove all source id types that are empty, as well as the edge type if there are no more edges for it
+     * @param scope The scope to use
+     * @param targetId The target Id to use
+     * @param edgeType The edge type
+     * @param version The max version to clean
+     * @return  The mutation with the operations
+     */
+    public Observable<Integer> clearTargets( OrganizationScope scope, Id targetId, String edgeType, UUID version );
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
new file mode 100644
index 0000000..cb2cf6d
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.SearchEdgeType;
+import org.apache.usergrid.persistence.graph.SearchIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+import org.apache.usergrid.persistence.graph.serialization.util.EdgeUtils;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.observables.MathObservable;
+
+
+/**
+ * Implementation of the cleanup of edge meta data
+ */
+@Singleton
+public class EdgeAsyncImpl implements EdgeAsync {
+
+
+    private final EdgeMetadataSerialization edgeMetadataSerialization;
+    private final EdgeSerialization edgeSerialization;
+    private final Keyspace keyspace;
+    private final GraphFig graphFig;
+    private final Scheduler scheduler;
+
+
+    @Inject
+    public EdgeAsyncImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
+                          final EdgeSerialization edgeSerialization, final Keyspace keyspace, final GraphFig graphFig,
+                          final Scheduler scheduler ) {
+        this.edgeMetadataSerialization = edgeMetadataSerialization;
+        this.edgeSerialization = edgeSerialization;
+        this.keyspace = keyspace;
+        this.graphFig = graphFig;
+        this.scheduler = scheduler;
+    }
+
+
+    @Override
+    public Observable<Integer> cleanSources( final OrganizationScope scope, final Id sourceId, final String edgeType,
+                                       final UUID version ) {
+
+
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+
+    @Override
+    public Observable<Integer> clearTargets( final OrganizationScope scope, final Id targetId, final String edgeType,
+                                             final UUID version ) {
+
+        ValidationUtils.validateOrganizationScope( scope );
+        ValidationUtils.verifyIdentity( targetId );
+        Preconditions.checkNotNull( edgeType, "edge type is required" );
+        Preconditions.checkNotNull( version, "version is required" );
+
+
+
+        return loadEdgeIdsToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) )
+                .buffer( graphFig.getRepairConcurrentSize() )
+                        //buffer them into concurrent groups based on the concurrent repair size
+                .flatMap( new Func1<List<String>, Observable<Integer>>() {
+
+                    @Override
+                    public Observable<Integer> call( final List<String> types ) {
+
+
+                        final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                        final List<Observable<Integer>> checks = new ArrayList<Observable<Integer>>( types.size() );
+
+                        //for each id type, check if the exist in parallel to increase processing speed
+                        for ( final String sourceIdType : types ) {
+
+                            final SearchByIdType searchData =   new SimpleSearchByIdType( targetId, edgeType, version, sourceIdType, null );
+
+                            Observable<Integer> search = getEdgesToTargetBySourceType( scope, searchData
+                                    )
+                                    .distinctUntilChanged( new Func1<MarkedEdge, Id>() {
+
+                                        //get distinct by source node
+                                        @Override
+                                        public Id call( final MarkedEdge markedEdge ) {
+                                            return markedEdge.getSourceNode();
+                                        }
+                                    } ).take( 1 ).count().doOnNext( new Action1<Integer>() {
+
+                                        @Override
+                                        public void call( final Integer count ) {
+                                            /**
+                                             * we only want to delete if no edges are in this class. If there are
+                                             * still edges
+                                             * we must retain the information in order to keep our index structure
+                                             * correct for edge
+                                             * iteration
+                                             **/
+                                            if ( count != 0 ) {
+                                                return;
+                                            }
+
+
+                                            batch.mergeShallow( edgeMetadataSerialization
+                                                    .removeIdTypeToTarget( scope, targetId, edgeType, sourceIdType,
+                                                            version ) );
+                                        }
+                                    } );
+
+                            checks.add( search );
+                        }
+
+
+                        /**
+                         * Sum up the total number of edges we had, then execute the mutation if we have anything to do
+                         */
+                        return MathObservable.sumInteger(Observable.merge( checks )).doOnNext( new Action1<Integer>() {
+                            @Override
+                            public void call( final Integer count ) {
+
+                                if(batch.isEmpty()){
+                                    return;
+                                }
+
+                                try {
+                                    batch.execute();
+                                }
+                                catch ( ConnectionException e ) {
+                                    throw new RuntimeException( "Unable to execute mutation", e );
+                                }
+                            }
+                        } );
+
+                    }
+
+                } )
+                .map( new Func1<Integer, Integer>() {
+                    @Override
+                    public Integer call( final Integer subTypes ) {
+
+                        /**
+                         * We can only execute deleting this type if no sub types were deleted
+                         */
+                        if(subTypes != 0){
+                            return subTypes;
+                        }
+
+                        try {
+                            edgeMetadataSerialization.removeEdgeTypeToTarget( scope, targetId, edgeType, version )
+                                                         .execute();
+                        }
+                        catch ( ConnectionException e ) {
+                            throw new RuntimeException( "Unable to execute mutation" );
+                        }
+
+                        return subTypes;
+                    }
+                } )
+                //if we get no edges, emit a 0 so the caller knows nothing was deleted
+                .defaultIfEmpty( 0 );
+    }
+
+
+    /**
+     * Get all existing edge types to the target node
+     */
+    private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
+
+        return Observable.create( new ObservableIterator<String>() {
+            @Override
+            protected Iterator<String> getIterator() {
+                return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
+            }
+        } ).subscribeOn( scheduler );
+    }
+
+
+    private Observable<MarkedEdge> getEdgesToTargetBySourceType( final OrganizationScope scope,
+                                                                 final SearchByIdType search ) {
+
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+                return edgeSerialization.getEdgesToTargetBySourceType( scope, search );
+            }
+        } ).subscribeOn( scheduler );
+    }
+
+
+    private Observable<String> loadEdgeIdsToTarget( final OrganizationScope scope, final SearchIdType search ) {
+        return Observable.create( new ObservableIterator<String>() {
+            @Override
+            protected Iterator<String> getIterator() {
+                return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
+            }
+        } ).subscribeOn( scheduler );
+    }
+
+
+    /**
+     * Load all edges pointing to this target
+     */
+    private Observable<MarkedEdge> loadEdgesToTarget( final OrganizationScope scope, final SearchByEdgeType search ) {
+
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+                return edgeSerialization.getEdgesToTarget( scope, search );
+            }
+        } ).subscribeOn( scheduler );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index 5b4899a..82aecff 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -186,6 +186,7 @@ public class NodeDeleteListenerTest {
 
         assertFalse( types.hasNext() );
 
+
         //no types to target
 
         types = edgeMetadataSerialization.getEdgeTypesToTarget( scope, createSearchEdge( targetNode, null ) );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
new file mode 100644
index 0000000..c17676b
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ *
+ *
+ */
+@RunWith(JukitoRunner.class)
+@UseModules({ TestGraphModule.class })
+public class EdgeAsyncTest {
+
+
+    @ClassRule
+    public static CassandraRule rule = new CassandraRule();
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    @Inject
+    protected NodeSerialization serialization;
+
+    @Inject
+    protected EdgeAsync edgeAsync;
+
+    @Inject
+    protected EdgeSerialization edgeSerialization;
+
+    @Inject
+    protected EdgeMetadataSerialization edgeMetadataSerialization;
+
+    protected OrganizationScope scope;
+
+
+    @Before
+    public void setup() {
+        scope = mock( OrganizationScope.class );
+
+        Id orgId = mock( Id.class );
+
+        when( orgId.getType() ).thenReturn( "organization" );
+        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+        when( scope.getOrganization() ).thenReturn( orgId );
+    }
+
+
+    @Test
+    public void cleanTargetNoEdgesNoMeta(){
+       //do no writes, then execute a cleanup with no meta data
+
+        final Id targetId = createId ("target" );
+        final String test = "test";
+        final UUID version = UUIDGenerator.newTimeUUID();
+
+        int value = edgeAsync.clearTargets( scope, targetId, test, version ).toBlockingObservable().single();
+
+        assertEquals("No subtypes found", 0, value);
+    }
+
+    @Test
+    public void cleanTargetSingleEge() throws ConnectionException {
+        Edge edge = createEdge( "source", "test", "target" );
+
+        edgeSerialization.writeEdge( scope, edge ).execute();
+
+        edgeMetadataSerialization.writeEdge( scope, edge ).execute();
+
+        int value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() ).toBlockingObservable().single();
+
+        assertEquals("No subtypes removed, edge exists", 1, value);
+
+        //now delete the edge
+
+        edgeSerialization.deleteEdge( scope, edge ).execute();
+
+        value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() ).toBlockingObservable().single();
+
+        assertEquals("Single subtype should be removed", 0, value);
+
+        //now verify they're gone
+
+        Iterator<String> edgeTypes = edgeMetadataSerialization.getEdgeTypesToTarget( scope,
+                new SimpleSearchEdgeType( edge.getTargetNode(), null ) );
+
+        assertFalse("No edge types exist", edgeTypes.hasNext());
+
+
+        Iterator<String> sourceTypes = edgeMetadataSerialization.getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getTargetNode(), edge.getType(), null ) );
+
+        assertFalse("No edge types exist", sourceTypes.hasNext());
+
+
+
+    }
+
+
+}


[4/4] git commit: Passing tests for meta data cleanup.

Posted by sn...@apache.org.
Passing tests for meta data cleanup.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/78dc432d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/78dc432d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/78dc432d

Branch: refs/heads/asyncqueue
Commit: 78dc432d1ed80ca96776657ced8961fee34265d5
Parents: 4dd96aa
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 12 12:57:08 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 12 12:57:08 2014 -0700

----------------------------------------------------------------------
 .../graph/impl/stage/EdgeAsyncImpl.java         | 180 ++++++++++---------
 .../graph/impl/stage/EdgeAsyncTest.java         | 165 +++++++++++++++--
 2 files changed, 242 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/78dc432d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
index cb2cf6d..ba21bf0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
@@ -38,7 +38,6 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
-import org.apache.usergrid.persistence.graph.serialization.util.EdgeUtils;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Preconditions;
@@ -83,7 +82,7 @@ public class EdgeAsyncImpl implements EdgeAsync {
 
     @Override
     public Observable<Integer> cleanSources( final OrganizationScope scope, final Id sourceId, final String edgeType,
-                                       final UUID version ) {
+                                             final UUID version ) {
 
 
         return null;  //To change body of implemented methods use File | Settings | File Templates.
@@ -100,107 +99,114 @@ public class EdgeAsyncImpl implements EdgeAsync {
         Preconditions.checkNotNull( version, "version is required" );
 
 
+        Observable<Integer> deleteCounts =
+                loadEdgeIdsToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) )
+                        .buffer( graphFig.getRepairConcurrentSize() )
+                                //buffer them into concurrent groups based on the concurrent repair size
+                        .flatMap( new Func1<List<String>, Observable<Integer>>() {
 
-        return loadEdgeIdsToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) )
-                .buffer( graphFig.getRepairConcurrentSize() )
-                        //buffer them into concurrent groups based on the concurrent repair size
-                .flatMap( new Func1<List<String>, Observable<Integer>>() {
-
-                    @Override
-                    public Observable<Integer> call( final List<String> types ) {
+                            @Override
+                            public Observable<Integer> call( final List<String> types ) {
 
 
-                        final MutationBatch batch = keyspace.prepareMutationBatch();
+                                final MutationBatch batch = keyspace.prepareMutationBatch();
 
-                        final List<Observable<Integer>> checks = new ArrayList<Observable<Integer>>( types.size() );
+                                final List<Observable<Integer>> checks =
+                                        new ArrayList<Observable<Integer>>( types.size() );
 
-                        //for each id type, check if the exist in parallel to increase processing speed
-                        for ( final String sourceIdType : types ) {
+                                //for each id type, check if the exist in parallel to increase processing speed
+                                for ( final String sourceIdType : types ) {
 
-                            final SearchByIdType searchData =   new SimpleSearchByIdType( targetId, edgeType, version, sourceIdType, null );
+                                    final SearchByIdType searchData =
+                                            new SimpleSearchByIdType( targetId, edgeType, version, sourceIdType, null );
 
-                            Observable<Integer> search = getEdgesToTargetBySourceType( scope, searchData
-                                    )
-                                    .distinctUntilChanged( new Func1<MarkedEdge, Id>() {
+                                    Observable<Integer> search = getEdgesToTargetBySourceType( scope, searchData )
+                                            .distinctUntilChanged( new Func1<MarkedEdge, Id>() {
 
-                                        //get distinct by source node
-                                        @Override
-                                        public Id call( final MarkedEdge markedEdge ) {
-                                            return markedEdge.getSourceNode();
-                                        }
-                                    } ).take( 1 ).count().doOnNext( new Action1<Integer>() {
+                                                //get distinct by source node
+                                                @Override
+                                                public Id call( final MarkedEdge markedEdge ) {
+                                                    return markedEdge.getSourceNode();
+                                                }
+                                            } ).take( 1 ).count().doOnNext( new Action1<Integer>() {
 
-                                        @Override
-                                        public void call( final Integer count ) {
-                                            /**
-                                             * we only want to delete if no edges are in this class. If there are
-                                             * still edges
-                                             * we must retain the information in order to keep our index structure
-                                             * correct for edge
-                                             * iteration
-                                             **/
-                                            if ( count != 0 ) {
-                                                return;
-                                            }
+                                                @Override
+                                                public void call( final Integer count ) {
+                                                    /**
+                                                     * we only want to delete if no edges are in this class. If there
+                                                     * are
+                                                     * still edges
+                                                     * we must retain the information in order to keep our index
+                                                     * structure
+                                                     * correct for edge
+                                                     * iteration
+                                                     **/
+                                                    if ( count != 0 ) {
+                                                        return;
+                                                    }
 
 
-                                            batch.mergeShallow( edgeMetadataSerialization
-                                                    .removeIdTypeToTarget( scope, targetId, edgeType, sourceIdType,
-                                                            version ) );
-                                        }
-                                    } );
+                                                    batch.mergeShallow( edgeMetadataSerialization
+                                                            .removeIdTypeToTarget( scope, targetId, edgeType,
+                                                                    sourceIdType, version ) );
+                                                }
+                                            } );
 
-                            checks.add( search );
-                        }
+                                    checks.add( search );
+                                }
 
 
-                        /**
-                         * Sum up the total number of edges we had, then execute the mutation if we have anything to do
-                         */
-                        return MathObservable.sumInteger(Observable.merge( checks )).doOnNext( new Action1<Integer>() {
-                            @Override
-                            public void call( final Integer count ) {
+                                /**
+                                 * Sum up the total number of edges we had, then execute the mutation if we have
+                                 * anything to do
+                                 */
+                                return MathObservable.sumInteger( Observable.merge( checks ) )
+                                                     .doOnNext( new Action1<Integer>() {
+                                                         @Override
+                                                         public void call( final Integer count ) {
+
+                                                             if ( batch.isEmpty() ) {
+                                                                 return;
+                                                             }
+
+                                                             try {
+                                                                 batch.execute();
+                                                             }
+                                                             catch ( ConnectionException e ) {
+                                                                 throw new RuntimeException(
+                                                                         "Unable to execute mutation", e );
+                                                             }
+                                                         }
+                                                     } );
+                            }
+                        } )
+                                //if we get no edges, emit a 0 so the caller knows nothing was deleted
+                        .defaultIfEmpty( 0 );
 
-                                if(batch.isEmpty()){
-                                    return;
-                                }
 
-                                try {
-                                    batch.execute();
-                                }
-                                catch ( ConnectionException e ) {
-                                    throw new RuntimeException( "Unable to execute mutation", e );
-                                }
-                            }
-                        } );
-
-                    }
-
-                } )
-                .map( new Func1<Integer, Integer>() {
-                    @Override
-                    public Integer call( final Integer subTypes ) {
-
-                        /**
-                         * We can only execute deleting this type if no sub types were deleted
-                         */
-                        if(subTypes != 0){
-                            return subTypes;
-                        }
-
-                        try {
-                            edgeMetadataSerialization.removeEdgeTypeToTarget( scope, targetId, edgeType, version )
-                                                         .execute();
-                        }
-                        catch ( ConnectionException e ) {
-                            throw new RuntimeException( "Unable to execute mutation" );
-                        }
-
-                        return subTypes;
-                    }
-                } )
-                //if we get no edges, emit a 0 so the caller knows nothing was deleted
-                .defaultIfEmpty( 0 );
+        //sum up everything emitted by sub types.  If there's no edges existing for all sub types,
+        // then we can safely remove them
+        return MathObservable.sumInteger( deleteCounts ).map( new Func1<Integer, Integer>() {
+            @Override
+            public Integer call( final Integer subTypes ) {
+
+                /**
+                 * We can only execute deleting this type if no sub types were deleted
+                 */
+                if ( subTypes != 0 ) {
+                    return subTypes;
+                }
+
+                try {
+                    edgeMetadataSerialization.removeEdgeTypeToTarget( scope, targetId, edgeType, version ).execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to execute mutation" );
+                }
+
+                return subTypes;
+            }
+        } );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/78dc432d/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
index c17676b..5f57e64 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
@@ -20,7 +20,9 @@
 package org.apache.usergrid.persistence.graph.impl.stage;
 
 
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Set;
 import java.util.UUID;
 
 import org.jukito.JukitoRunner;
@@ -35,6 +37,7 @@ import org.apache.usergrid.persistence.collection.OrganizationScope;
 import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
 import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
@@ -59,8 +62,8 @@ import static org.mockito.Mockito.when;
  *
  *
  */
-@RunWith(JukitoRunner.class)
-@UseModules({ TestGraphModule.class })
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
 public class EdgeAsyncTest {
 
 
@@ -85,6 +88,9 @@ public class EdgeAsyncTest {
     @Inject
     protected EdgeMetadataSerialization edgeMetadataSerialization;
 
+    @Inject
+    protected GraphFig graphFig;
+
     protected OrganizationScope scope;
 
 
@@ -102,53 +108,180 @@ public class EdgeAsyncTest {
 
 
     @Test
-    public void cleanTargetNoEdgesNoMeta(){
-       //do no writes, then execute a cleanup with no meta data
+    public void cleanTargetNoEdgesNoMeta() {
+        //do no writes, then execute a cleanup with no meta data
 
-        final Id targetId = createId ("target" );
+        final Id targetId = createId( "target" );
         final String test = "test";
         final UUID version = UUIDGenerator.newTimeUUID();
 
         int value = edgeAsync.clearTargets( scope, targetId, test, version ).toBlockingObservable().single();
 
-        assertEquals("No subtypes found", 0, value);
+        assertEquals( "No subtypes found", 0, value );
     }
 
+
     @Test
-    public void cleanTargetSingleEge() throws ConnectionException {
+    public void cleanTargetSingleEdge() throws ConnectionException {
         Edge edge = createEdge( "source", "test", "target" );
 
         edgeSerialization.writeEdge( scope, edge ).execute();
 
         edgeMetadataSerialization.writeEdge( scope, edge ).execute();
 
-        int value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() ).toBlockingObservable().single();
+        int value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
+                             .toBlockingObservable().single();
 
-        assertEquals("No subtypes removed, edge exists", 1, value);
+        assertEquals( "No subtypes removed, edge exists", 1, value );
 
         //now delete the edge
 
         edgeSerialization.deleteEdge( scope, edge ).execute();
 
-        value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() ).toBlockingObservable().single();
+        value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
+                         .toBlockingObservable().single();
 
-        assertEquals("Single subtype should be removed", 0, value);
+        assertEquals( "Single subtype should be removed", 0, value );
 
         //now verify they're gone
 
-        Iterator<String> edgeTypes = edgeMetadataSerialization.getEdgeTypesToTarget( scope,
-                new SimpleSearchEdgeType( edge.getTargetNode(), null ) );
+        Iterator<String> edgeTypes = edgeMetadataSerialization
+                .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge.getTargetNode(), null ) );
+
+        assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+        Iterator<String> sourceTypes = edgeMetadataSerialization
+                .getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getTargetNode(), edge.getType(), null ) );
+
+        assertFalse( "No edge types exist", sourceTypes.hasNext() );
+    }
+
+
+    @Test
+    public void cleanTargetMultipleEdge() throws ConnectionException {
+
+        Id targetId = createId( "target" );
+
+        Edge edge1 = createEdge( createId("source1"), "test", targetId);
+
+
+
+        edgeSerialization.writeEdge( scope, edge1 ).execute();
+
+        edgeMetadataSerialization.writeEdge( scope, edge1 ).execute();
+
+        Edge edge2 = createEdge( createId("source2"), "test", targetId );
+
+        edgeSerialization.writeEdge( scope, edge2 ).execute();
+
+        edgeMetadataSerialization.writeEdge( scope, edge2 ).execute();
+
+        Edge edge3 = createEdge( createId("source3"), "test", targetId );
+
+        edgeSerialization.writeEdge( scope, edge3 ).execute();
+
+        edgeMetadataSerialization.writeEdge( scope, edge3 ).execute();
+
+
+        UUID cleanupVersion = UUIDGenerator.newTimeUUID();
+
+        int value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+                             .toBlockingObservable().single();
+
+        assertEquals( "No subtypes removed, edges exist", 3, value );
+
+        //now delete the edge
+
+        edgeSerialization.deleteEdge( scope, edge1 ).execute();
+
+        value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+                         .toBlockingObservable().single();
+
+        assertEquals( "No subtypes removed, edges exist", 2, value );
+
+        edgeSerialization.deleteEdge( scope, edge2 ).execute();
 
-        assertFalse("No edge types exist", edgeTypes.hasNext());
+        value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+                         .toBlockingObservable().single();
 
+        assertEquals( "No subtypes removed, edges exist", 1, value );
 
-        Iterator<String> sourceTypes = edgeMetadataSerialization.getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getTargetNode(), edge.getType(), null ) );
+        edgeSerialization.deleteEdge( scope, edge3 ).execute();
 
-        assertFalse("No edge types exist", sourceTypes.hasNext());
+        value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+                         .toBlockingObservable().single();
 
 
+        assertEquals( "Single subtype should be removed", 0, value );
 
+        //now verify they're gone
+
+        Iterator<String> edgeTypes = edgeMetadataSerialization
+                .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge1.getTargetNode(), null ) );
+
+        assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+        Iterator<String> sourceTypes = edgeMetadataSerialization
+                .getIdTypesToTarget( scope, new SimpleSearchIdType( edge1.getTargetNode(), edge1.getType(), null ) );
+
+        assertFalse( "No edge types exist", sourceTypes.hasNext() );
     }
 
 
+    @Test
+    public void cleanTargetMultipleEdgeBuffer() throws ConnectionException {
+
+        final Id targetId = createId( "target" );
+        final String edgeType = "test";
+
+        final int size =  graphFig.getRepairConcurrentSize()*2;
+
+        Set<Edge> writtenEdges = new HashSet<Edge>();
+
+
+        for(int i = 0; i < size; i ++){
+            Edge edge = createEdge( createId("source"+i), edgeType, targetId);
+
+            edgeSerialization.writeEdge( scope, edge ).execute();
+
+            edgeMetadataSerialization.writeEdge( scope, edge ).execute();
+
+            writtenEdges.add( edge );
+        }
+
+
+        UUID cleanupVersion = UUIDGenerator.newTimeUUID();
+
+        int value = edgeAsync.clearTargets( scope, targetId, edgeType, cleanupVersion )
+                             .toBlockingObservable().single();
+
+        assertEquals( "No subtypes removed, edges exist", size, value );
+
+        //now delete the edge
+
+        for(Edge created: writtenEdges){
+            edgeSerialization.deleteEdge( scope, created ).execute();
+        }
+
+
+        value = edgeAsync.clearTargets( scope, targetId, edgeType, cleanupVersion )
+                                     .toBlockingObservable().single();
+
+        assertEquals( "Subtypes removed", 0, value );
+
+        //now verify they're gone
+
+        Iterator<String> edgeTypes = edgeMetadataSerialization
+                .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( targetId, null ) );
+
+        assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+        Iterator<String> sourceTypes = edgeMetadataSerialization
+                .getIdTypesToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) );
+
+        assertFalse( "No edge types exist", sourceTypes.hasNext() );
+    }
 }


[2/4] git commit: Checkpoint before refactor

Posted by sn...@apache.org.
Checkpoint before refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e8568ba4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e8568ba4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e8568ba4

Branch: refs/heads/asyncqueue
Commit: e8568ba48cb45251eab41723d53ac57a5844bc3f
Parents: 0a256bb
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 11 10:42:35 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 11 10:42:35 2014 -0700

----------------------------------------------------------------------
 .../graph/impl/EdgeDeleteListener.java          |   4 +-
 .../persistence/graph/impl/EdgeEvent.java       |  13 +-
 .../graph/impl/NodeDeleteListener.java          | 175 +++++++++++++++----
 .../EdgeMetadataSerialization.java              |  76 +++++++-
 .../impl/EdgeMetadataSerializationImpl.java     |  44 ++++-
 .../graph/impl/NodeDeleteListenerTest.java      |   8 +-
 6 files changed, 261 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
index 85e4b4f..e0ceafc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -44,8 +44,8 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
     @Inject
     public EdgeDeleteListener( final EdgeSerialization edgeSerialization,
                                final EdgeMetadataSerialization edgeMetadataSerialization,
-                               final EdgeManagerFactory edgeManagerFactory, final Keyspace keyspace, @EdgeDelete
-                                   final AsyncProcessor edgeDelete ) {
+                               final EdgeManagerFactory edgeManagerFactory, final Keyspace keyspace,
+                               @EdgeDelete final AsyncProcessor edgeDelete ) {
         this.edgeSerialization = edgeSerialization;
         this.edgeMetadataSerialization = edgeMetadataSerialization;
         this.edgeManagerFactory = edgeManagerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
index d4b6f91..b2ba686 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
@@ -1,6 +1,8 @@
 package org.apache.usergrid.persistence.graph.impl;
 
 
+import java.util.UUID;
+
 import org.apache.usergrid.persistence.collection.OrganizationScope;
 
 
@@ -12,11 +14,13 @@ public class EdgeEvent<T> {
 
     private final OrganizationScope organizationScope;
     private final T data;
+    private final UUID version;
 
 
-    public EdgeEvent( final OrganizationScope organizationScope, final T data ) {
+    public EdgeEvent( final OrganizationScope organizationScope, final UUID version, final T data ) {
         this.organizationScope = organizationScope;
         this.data = data;
+        this.version = version;
     }
 
 
@@ -25,7 +29,14 @@ public class EdgeEvent<T> {
     }
 
 
+    public UUID getVersion() {
+        return version;
+    }
+
+
     public T getData() {
         return data;
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index fbd5d08..1d2124a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -6,6 +6,7 @@ import java.util.List;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -26,6 +27,8 @@ import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Action0;
 import rx.functions.Action1;
 import rx.functions.Func1;
 
@@ -35,11 +38,15 @@ import rx.functions.Func1;
  */
 public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEvent<Id>> {
 
+
     private final NodeSerialization nodeSerialization;
-    private final EdgeSerialization edgeSerialization;
-    private final EdgeMetadataSerialization edgeMetadataSerialization;
+//    private final EdgeSerialization edgeSerialization;
+//    private final EdgeMetadataSerialization edgeMetadataSerialization;
     private final GraphFig graphFig;
-    private final Keyspace keyspace;
+//    private final Keyspace keyspace;
+
+    private final Scheduler scheduler;
+    private final EdgeManagerFactory edgeManagerFactory;
 
 
     /**
@@ -48,14 +55,19 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
     @Inject
     public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
                                final EdgeMetadataSerialization edgeMetadataSerialization, final GraphFig graphFig,
-                               final Keyspace keyspace, @NodeDelete final AsyncProcessor nodeDelete ) {
+                               final Keyspace keyspace, final Scheduler scheduler,
+                               final EdgeManagerFactory edgeManagerFactory,
+                               @NodeDelete final AsyncProcessor nodeDelete ) {
 
 
         this.nodeSerialization = nodeSerialization;
-        this.edgeSerialization = edgeSerialization;
-        this.edgeMetadataSerialization = edgeMetadataSerialization;
+//        this.edgeSerialization = edgeSerialization;
+//        this.edgeMetadataSerialization = edgeMetadataSerialization;
         this.graphFig = graphFig;
-        this.keyspace = keyspace;
+//        this.keyspace = keyspace;
+        this.scheduler = scheduler;
+        this.edgeManagerFactory = edgeManagerFactory;
+
         nodeDelete.addListener( this );
     }
 
@@ -65,9 +77,10 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
 
         final Id node = edgeEvent.getData();
         final OrganizationScope scope = edgeEvent.getOrganizationScope();
+        final UUID version = edgeEvent.getVersion();
 
 
-        return Observable.from( node ).map( new Func1<Id, Optional<UUID>>() {
+        return Observable.from( node ).subscribeOn( scheduler ).map( new Func1<Id, Optional<UUID>>() {
             @Override
             public Optional<UUID> call( final Id id ) {
                 return nodeSerialization.getMaxVersion( scope, node );
@@ -80,60 +93,148 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
                     }
                 } )
 
-                //delete source and targets in parallel and merge them into a single observable
+                        //delete source and targets in parallel and merge them into a single observable
                 .flatMap( new Func1<Optional<UUID>, Observable<List<MarkedEdge>>>() {
                     @Override
                     public Observable<List<MarkedEdge>> call( final Optional<UUID> uuidOptional ) {
 
                         //get all edges pointing to the target node and buffer then into groups for deletion
-                        Observable<List<MarkedEdge>> targetEdges =
+                        Observable<MarkedEdge> targetEdges =
                                 getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
-                                        .flatMap( new Func1<String, Observable<List<MarkedEdge>>>() {
+                                        .flatMap( new Func1<String, Observable<MarkedEdge>>() {
                                             @Override
-                                            public Observable<List<MarkedEdge>> call( final String edgeType ) {
+                                            public Observable<MarkedEdge> call( final String edgeType ) {
                                                 return loadEdgesToTarget( scope,
                                                         new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(),
-                                                                null ) ).buffer( graphFig.getScanPageSize() );
+                                                                null ) );
                                             }
                                         } );
 
 
                         //get all edges pointing to the source node and buffer them into groups for deletion
-                        Observable<List<MarkedEdge>> sourceEdges =
+                        Observable<MarkedEdge> sourceEdges =
                                 getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null ) )
-                                        .flatMap( new Func1<String, Observable<List<MarkedEdge>>>() {
+                                        .flatMap( new Func1<String, Observable<MarkedEdge>>() {
                                             @Override
-                                            public Observable<List<MarkedEdge>> call( final String edgeType ) {
+                                            public Observable<MarkedEdge> call( final String edgeType ) {
                                                 return loadEdgesFromSource( scope,
                                                         new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(),
-                                                                null ) ).buffer( graphFig.getScanPageSize() );
+                                                                null ) );
                                             }
                                         } );
 
 
-                        return Observable.merge( targetEdges, sourceEdges );
+                        //each time an edge is emitted, delete it via batch mutation since we'll already be buffered
+                        return Observable.merge( targetEdges, sourceEdges ).buffer( graphFig.getScanPageSize() )
+                                         .doOnNext( new Action1<List<MarkedEdge>>() {
+                                             @Override
+                                             public void call( final List<MarkedEdge> markedEdges ) {
+                                                 MutationBatch batch = keyspace.prepareMutationBatch();
+
+
+                                                 for ( MarkedEdge marked : markedEdges ) {
+                                                     batch.mergeShallow(
+                                                             edgeSerialization.deleteEdge( scope, marked ) );
+                                                 }
+
+                                                 try {
+                                                     batch.execute();
+                                                 }
+                                                 catch ( ConnectionException e ) {
+                                                     throw new RuntimeException( "Unable to execute mutation" );
+                                                 }
+                                             }
+                                         } );
                     }
-                } ).doOnNext( new Action1<List<MarkedEdge>>() {
+                } ).last().flatMap( new Func1<List<MarkedEdge>, Observable<String>>() {
                     @Override
-                    public void call( final List<MarkedEdge> markedEdges ) {
-                        MutationBatch batch = keyspace.prepareMutationBatch();
+                    public Observable<String> call( final List<MarkedEdge> markedEdges ) {
+
+                        //delete all meta for edges to this node
+                        Observable<String> targets =
+                                getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
+                                        .doOnNext( new Action1<String>() {
+                                            @Override
+                                            public void call( final String type ) {
+
+                                                final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                                                final Iterator<String> types = edgeMetadataSerialization
+                                                        .getIdTypesToTarget( scope,
+                                                                new SimpleSearchIdType( node, type, null ) );
 
+                                                while ( types.hasNext() ) {
+                                                    batch.mergeShallow( edgeMetadataSerialization
+                                                            .removeIdTypeToTarget( scope, node, type, types.next(),
+                                                                    version ) );
+                                                }
 
-                        for ( MarkedEdge marked : markedEdges ) {
-                            final MutationBatch edgeBatch = edgeSerialization.deleteEdge( scope, marked );
-                            batch.mergeShallow( edgeBatch );
-                        }
+                                                batch.mergeShallow( edgeMetadataSerialization
+                                                        .removeEdgeTypeToTarget( scope, node, type, version ) );
+
+                                                try {
+                                                    batch.execute();
+                                                }
+                                                catch ( ConnectionException e ) {
+                                                    throw new RuntimeException( "Unable to execute cassandra call" );
+                                                }
+                                            }
+                                        } );
 
-                        try {
-                            batch.execute();
-                        }
-                        catch ( ConnectionException e ) {
-                            throw new RuntimeException( "Unable to execute mutation" );
-                        }
+                        //delete all meta for edges from this node
+                        Observable<String> sources =
+                                getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null ) )
+                                        .doOnNext( new Action1<String>() {
+                                            @Override
+                                            public void call( final String type ) {
+
+                                                final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                                                final Iterator<String> types = edgeMetadataSerialization
+                                                        .getIdTypesFromSource( scope,
+                                                                new SimpleSearchIdType( node, type, null ) );
+
+                                                while ( types.hasNext() ) {
+                                                    batch.mergeShallow( edgeMetadataSerialization
+                                                            .removeIdTypeFromSource( scope, node, type, types.next(),
+                                                                    version ) );
+                                                }
+
+                                                batch.mergeShallow( edgeMetadataSerialization
+                                                        .removeEdgeTypeFromSource( scope, node, type, version ) );
+
+                                                try {
+                                                    batch.execute();
+                                                }
+                                                catch ( ConnectionException e ) {
+                                                    throw new RuntimeException( "Unable to execute cassandra call" );
+                                                }
+                                            }
+                                        } );
+
+
+                        //no op, just zip them up  so we can do our action on our last
+                        return Observable.merge( targets, sources )
+                                //when we're done emitting, merge them
+                                .doOnCompleted( new Action0() {
+                                    @Override
+                                    public void call() {
+                                        try {
+                                            nodeSerialization.delete( scope, node, edgeEvent.getVersion() ).execute();
+                                        }
+                                        catch ( ConnectionException e ) {
+                                            throw new RuntimeException( "Unable to execute mutation" );
+                                        }
+                                    }
+                                } );
                     }
-                } ).map( new Func1<List<MarkedEdge>, EdgeEvent<Id>>() {
+                } )
+                        //when we're completed, we need to delete our id from types, then remove our mark
+
+                        //return our event to the caller
+                .map( new Func1<String, EdgeEvent<Id>>() {
                     @Override
-                    public EdgeEvent<Id> call( final List<MarkedEdge> markedEdges ) {
+                    public EdgeEvent<Id> call( final String mark ) {
                         return edgeEvent;
                     }
                 } );
@@ -150,7 +251,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
             }
-        } );
+        } ).subscribeOn( scheduler );
     }
 
 
@@ -164,7 +265,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
             }
-        } );
+        } ).subscribeOn( scheduler );
     }
 
 
@@ -178,7 +279,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesToTarget( scope, search );
             }
-        } );
+        } ).subscribeOn( scheduler );
     }
 
 
@@ -192,7 +293,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesFromSource( scope, search );
             }
-        } );
+        } ).subscribeOn( scheduler );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
index 8806b92..5fa6c7c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
@@ -21,11 +21,13 @@ package org.apache.usergrid.persistence.graph.serialization;
 
 
 import java.util.Iterator;
+import java.util.UUID;
 
 import org.apache.usergrid.persistence.collection.OrganizationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.SearchEdgeType;
 import org.apache.usergrid.persistence.graph.SearchIdType;
+import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.netflix.astyanax.MutationBatch;
 
@@ -42,8 +44,8 @@ public interface EdgeMetadataSerialization {
     MutationBatch writeEdge( OrganizationScope scope, Edge edge );
 
     /**
-     * Remove all meta data from the source to the target type.  The caller must ensure that this is the last
-     * edge with this type at version <= edge version
+     * Remove all meta data from the source to the target type.  The caller must ensure that this is the last edge with
+     * this type at version <= edge version
      *
      * @param scope The org scope
      * @param edge The edge to remove
@@ -54,8 +56,20 @@ public interface EdgeMetadataSerialization {
 
 
     /**
-     * Remove all meta data from the source to the target type.  The caller must ensure that this is the last
-          * edge with this type at version <= edge version
+     * Remove the edge type from the source with the specified version
+     *
+     * @param scope Organization scope
+     * @param sourceNode Source node
+     * @param type The edge type
+     * @param version The version to use on the delete
+     *
+     * @return A mutation batch to use on issuing the delelete
+     */
+    MutationBatch removeEdgeTypeFromSource( OrganizationScope scope, Id sourceNode, String type, UUID version );
+
+    /**
+     * Remove all meta data from the source to the target type.  The caller must ensure that this is the last edge with
+     * this type at version <= edge version
      *
      * @param scope The org scope
      * @param edge The edge to remove
@@ -64,9 +78,25 @@ public interface EdgeMetadataSerialization {
      */
     MutationBatch removeIdTypeFromSource( OrganizationScope scope, Edge edge );
 
+
     /**
-     * Remove all meta data from the target to the source type.  The caller must ensure that this is the last
-          * edge with this type at version <= edge version
+     * Remove all meta data from the source to the target type.  The caller must ensure that this is the last edge with
+     * this type at version <= edge version
+     *
+     * @param scope Organization scope
+     * @param sourceNode Source node
+     * @param type The edge type
+     * @param idType The idType to use
+     * @param version The version to use on the delete
+     *
+     * @return a mutation batch with the delete operations
+     */
+    MutationBatch removeIdTypeFromSource( OrganizationScope scope, Id sourceNode, String type, String idType,
+                                          UUID version );
+
+    /**
+     * Remove all meta data from the target to the source type.  The caller must ensure that this is the last edge with
+     * this type at version <= edge version
      *
      * @param scope The org scope
      * @param edge The edge to remove
@@ -77,8 +107,22 @@ public interface EdgeMetadataSerialization {
 
 
     /**
-     * Remove all meta data from the target to the source type.  The caller must ensure that this is the last
-          * edge with this type at version <= edge version
+     * Remove all meta data from the target to the source type.  The caller must ensure that this is the last edge with
+     * this type at version <= edge version
+     *
+     * @param scope Organization scope
+     * @param targetNode Source node
+     * @param type The edge type
+     * @param version The version to use on the delete
+     *
+     * @return A mutation batch to use on issuing the delelete
+     */
+    MutationBatch removeEdgeTypeToTarget( OrganizationScope scope, Id targetNode, String type, UUID version );
+
+
+    /**
+     * Remove all meta data from the target to the source type.  The caller must ensure that this is the last edge with
+     * this type at version <= edge version
      *
      * @param scope The org scope
      * @param edge The edge to remove
@@ -87,6 +131,22 @@ public interface EdgeMetadataSerialization {
      */
     MutationBatch removeIdTypeToTarget( OrganizationScope scope, Edge edge );
 
+
+    /**
+     * Remove all meta data from the target to the source type.  The caller must ensure that this is the last edge with
+     * this type at version <= edge version
+     *
+     * @param scope Organization scope
+     * @param targetNode Source node
+     * @param type The edge type
+     * @param idType The idType to use
+     * @param version The version to use on the delete
+     *
+     * @return a mutation batch with the delete operations
+     */
+    MutationBatch removeIdTypeToTarget( OrganizationScope scope, Id targetNode, String type, String idType,
+                                        UUID version );
+
     /**
      * Get all edge types from the given source node
      *

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
index b6028ea..49a8be6 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
@@ -175,27 +175,55 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
 
     @Override
     public MutationBatch removeEdgeTypeFromSource( final OrganizationScope scope, final Edge edge ) {
-        return removeEdgeType( scope, edge.getSourceNode(), edge.getType(), edge.getVersion(), CF_SOURCE_EDGE_TYPES );
+        return removeEdgeTypeFromSource( scope, edge.getSourceNode(), edge.getType(), edge.getVersion() );
+    }
+
+
+    @Override
+    public MutationBatch removeEdgeTypeFromSource( final OrganizationScope scope, final Id sourceNode,
+                                                   final String type, final UUID version ) {
+        return removeEdgeType( scope, sourceNode, type, version, CF_SOURCE_EDGE_TYPES );
     }
 
 
     @Override
     public MutationBatch removeIdTypeFromSource( final OrganizationScope scope, final Edge edge ) {
-        return removeIdType( scope, edge.getSourceNode(), edge.getTargetNode(), edge.getType(), edge.getVersion(),
-                CF_SOURCE_EDGE_ID_TYPES );
+        return removeIdTypeFromSource( scope, edge.getSourceNode(), edge.getType(),  edge.getTargetNode().getType(),
+                edge.getVersion() );
+    }
+
+
+    @Override
+    public MutationBatch removeIdTypeFromSource( final OrganizationScope scope, final Id sourceNode, final String type,
+                                                 final String idType, final UUID version ) {
+        return removeIdType( scope, sourceNode, idType, type, version,
+                        CF_SOURCE_EDGE_ID_TYPES );
     }
 
 
     @Override
     public MutationBatch removeEdgeTypeToTarget( final OrganizationScope scope, final Edge edge ) {
-        return removeEdgeType( scope, edge.getTargetNode(), edge.getType(), edge.getVersion(), CF_TARGET_EDGE_TYPES );
+        return removeEdgeTypeToTarget( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() );
     }
 
 
+    @Override
+    public MutationBatch removeEdgeTypeToTarget( final OrganizationScope scope, final Id targetNode, final String type,
+                                                 final UUID version ) {
+        return removeEdgeType( scope, targetNode,type, version, CF_TARGET_EDGE_TYPES );
+    }
+
 
     @Override
     public MutationBatch removeIdTypeToTarget( final OrganizationScope scope, final Edge edge ) {
-        return removeIdType( scope, edge.getTargetNode(), edge.getSourceNode(), edge.getType(), edge.getVersion(),
+        return removeIdTypeToTarget(scope, edge.getTargetNode(), edge.getType(),  edge.getSourceNode().getType(), edge.getVersion());
+    }
+
+
+    @Override
+    public MutationBatch removeIdTypeToTarget( final OrganizationScope scope, final Id targetNode, final String type,
+                                               final String idType, final UUID version ) {
+        return removeIdType( scope, targetNode, idType,type, version,
                 CF_TARGET_EDGE_ID_TYPES );
     }
 
@@ -234,14 +262,14 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
      *
      * @param scope The scope to use
      * @param rowId The id to use in the row key
-     * @param colId The id type to use in the column
+     * @param idType The id type to use in the column
      * @param edgeType The edge type to use in the column
      * @param version The version to use on the column
      * @param cf The column family to use
      *
      * @return A populated mutation with the remove operations
      */
-    private MutationBatch removeIdType( final OrganizationScope scope, final Id rowId, final Id colId,
+    private MutationBatch removeIdType( final OrganizationScope scope, final Id rowId, final String idType,
                                         final String edgeType, final UUID version,
                                         final MultiTennantColumnFamily<OrganizationScope, EdgeIdTypeKey, String> cf ) {
 
@@ -256,7 +284,7 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
                 new ScopedRowKey<OrganizationScope, EdgeIdTypeKey>( scope, new EdgeIdTypeKey( rowId, edgeType ) );
 
 
-        batch.withRow( cf, rowKey ).setTimestamp( timestamp ).deleteColumn( colId.getType() );
+        batch.withRow( cf, rowKey ).setTimestamp( timestamp ).deleteColumn( idType );
 
         return batch;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index 2d55698..5b4899a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -109,15 +109,17 @@ public class NodeDeleteListenerTest {
 
         Id targetNode = edge.getTargetNode();
 
+        UUID version = UUIDGenerator.newTimeUUID();
 
-        EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, sourceNode );
+
+        EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, version, sourceNode );
 
         EdgeEvent<Id> event = deleteListener.receive( deleteEvent ).toBlockingObservable().lastOrDefault( null );
 
         assertNull( "Mark was not set, no delete should be executed", event );
 
 
-        deleteEvent = new EdgeEvent<Id>( scope, targetNode );
+        deleteEvent = new EdgeEvent<Id>( scope, version, targetNode );
 
         event = deleteListener.receive( deleteEvent ).toBlockingObservable().lastOrDefault( null );
 
@@ -152,7 +154,7 @@ public class NodeDeleteListenerTest {
 
         nodeSerialization.mark( scope, sourceNode, deleteVersion ).execute();
 
-        EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, sourceNode );
+        EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, deleteVersion, sourceNode );
 
 
         EdgeEvent<Id> event = deleteListener.receive( deleteEvent ).toBlockingObservable().last();