You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/12 21:22:26 UTC

[3/4] git commit: Working meta data cleanup

Working meta data cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4dd96aa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4dd96aa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4dd96aa7

Branch: refs/heads/asyncqueue
Commit: 4dd96aa7783ae4058e42ab75689f688ce3ce6e90
Parents: e8568ba
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 12 12:25:17 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 12 12:25:17 2014 -0700

----------------------------------------------------------------------
 stack/corepersistence/collection/pom.xml        |   8 +-
 .../usergrid/persistence/graph/GraphFig.java    |   6 +
 .../persistence/graph/guice/GraphModule.java    |   5 +
 .../graph/impl/NodeDeleteListener.java          |  12 +-
 .../persistence/graph/impl/stage/EdgeAsync.java |  62 +++++
 .../graph/impl/stage/EdgeAsyncImpl.java         | 255 +++++++++++++++++++
 .../graph/impl/NodeDeleteListenerTest.java      |   1 +
 .../graph/impl/stage/EdgeAsyncTest.java         | 154 +++++++++++
 8 files changed, 496 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index 95aea45..7a8e780 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -9,6 +9,7 @@
     <slf4j.version>1.7.2</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
     <chop.version>1.0</chop.version>
+    <rx.version>0.17.0</rx.version>
   </properties>
 
   <parent>
@@ -161,8 +162,13 @@
     <dependency>
         <groupId>com.netflix.rxjava</groupId>
         <artifactId>rxjava-core</artifactId>
-        <version>0.17.0-RC7</version>
+        <version>${rx.version}</version>
     </dependency>
+    <dependency>
+           <groupId>com.netflix.rxjava</groupId>
+           <artifactId>rxjava-math</artifactId>
+           <version>${rx.version}</version>
+       </dependency>
 
 
     <!--<dependency>-->

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 3f87d14..64ca397 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -15,6 +15,8 @@ public interface GraphFig extends GuicyFig {
 
     public static final String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size";
 
+    public static final String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size";
+
 
     public static final String TIMEOUT_SIZE = "usergrid.graph.timeout.page.size";
 
@@ -50,6 +52,10 @@ public interface GraphFig extends GuicyFig {
     @Key( TIMEOUT_TASK_TIME )
     long getTaskLoopTime();
 
+    @Default("10")
+    @Key(REPAIR_CONCURRENT_SIZE)
+    int getRepairConcurrentSize();
+
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index ed9448b..a18e741 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -34,6 +34,8 @@ import org.apache.usergrid.persistence.graph.consistency.LocalTimeoutQueue;
 import org.apache.usergrid.persistence.graph.consistency.TimeoutQueue;
 import org.apache.usergrid.persistence.graph.impl.CollectionIndexObserver;
 import org.apache.usergrid.persistence.graph.impl.EdgeManagerImpl;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeAsync;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeAsyncImpl;
 import org.apache.usergrid.persistence.graph.serialization.CassandraConfig;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
@@ -105,6 +107,9 @@ public class GraphModule extends AbstractModule {
         bind(AsyncProcessor.class).annotatedWith( EdgeDelete.class ).to( AsyncProcessorImpl.class );
         bind(AsyncProcessor.class).annotatedWith( EdgeWrite.class ).to( AsyncProcessorImpl.class );
         bind(AsyncProcessor.class).annotatedWith( NodeDelete.class ).to( AsyncProcessorImpl.class );
+
+        //Edge stages
+        bind( EdgeAsync.class).to( EdgeAsyncImpl.class );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index 1d2124a..58862f4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -40,10 +40,10 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
 
 
     private final NodeSerialization nodeSerialization;
-//    private final EdgeSerialization edgeSerialization;
-//    private final EdgeMetadataSerialization edgeMetadataSerialization;
+    private final EdgeSerialization edgeSerialization;
+    private final EdgeMetadataSerialization edgeMetadataSerialization;
     private final GraphFig graphFig;
-//    private final Keyspace keyspace;
+    private final Keyspace keyspace;
 
     private final Scheduler scheduler;
     private final EdgeManagerFactory edgeManagerFactory;
@@ -61,10 +61,10 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
 
 
         this.nodeSerialization = nodeSerialization;
-//        this.edgeSerialization = edgeSerialization;
-//        this.edgeMetadataSerialization = edgeMetadataSerialization;
+        this.edgeSerialization = edgeSerialization;
+        this.edgeMetadataSerialization = edgeMetadataSerialization;
         this.graphFig = graphFig;
-//        this.keyspace = keyspace;
+        this.keyspace = keyspace;
         this.scheduler = scheduler;
         this.edgeManagerFactory = edgeManagerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
new file mode 100644
index 0000000..90c1bfe
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.MutationBatch;
+
+import rx.Observable;
+
+
+/**
+ * Creates a mutation which will remove obsolete
+ *
+ */
+public interface EdgeAsync {
+
+    /**
+     * Validate that the source types can be cleaned for the given info
+     * @param scope The scope to use
+     * @param sourceId The source Id to use
+     * @param edgeType The edge type
+     * @param version The max version to clean
+     * @return The mutation with the operations
+     */
+    public Observable<Integer> cleanSources(OrganizationScope scope, Id sourceId, String edgeType, UUID version);
+
+
+    /**
+     *
+     * Remove all source id types that are empty, as well as the edge type if there are no more edges for it
+     * @param scope The scope to use
+     * @param targetId The target Id to use
+     * @param edgeType The edge type
+     * @param version The max version to clean
+     * @return  The mutation with the operations
+     */
+    public Observable<Integer> clearTargets( OrganizationScope scope, Id targetId, String edgeType, UUID version );
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
new file mode 100644
index 0000000..cb2cf6d
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.SearchEdgeType;
+import org.apache.usergrid.persistence.graph.SearchIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+import org.apache.usergrid.persistence.graph.serialization.util.EdgeUtils;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.observables.MathObservable;
+
+
+/**
+ * Implementation of the cleanup of edge meta data
+ */
+@Singleton
+public class EdgeAsyncImpl implements EdgeAsync {
+
+
+    private final EdgeMetadataSerialization edgeMetadataSerialization;
+    private final EdgeSerialization edgeSerialization;
+    private final Keyspace keyspace;
+    private final GraphFig graphFig;
+    private final Scheduler scheduler;
+
+
+    @Inject
+    public EdgeAsyncImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
+                          final EdgeSerialization edgeSerialization, final Keyspace keyspace, final GraphFig graphFig,
+                          final Scheduler scheduler ) {
+        this.edgeMetadataSerialization = edgeMetadataSerialization;
+        this.edgeSerialization = edgeSerialization;
+        this.keyspace = keyspace;
+        this.graphFig = graphFig;
+        this.scheduler = scheduler;
+    }
+
+
+    @Override
+    public Observable<Integer> cleanSources( final OrganizationScope scope, final Id sourceId, final String edgeType,
+                                       final UUID version ) {
+
+
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+
+    @Override
+    public Observable<Integer> clearTargets( final OrganizationScope scope, final Id targetId, final String edgeType,
+                                             final UUID version ) {
+
+        ValidationUtils.validateOrganizationScope( scope );
+        ValidationUtils.verifyIdentity( targetId );
+        Preconditions.checkNotNull( edgeType, "edge type is required" );
+        Preconditions.checkNotNull( version, "version is required" );
+
+
+
+        return loadEdgeIdsToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) )
+                .buffer( graphFig.getRepairConcurrentSize() )
+                        //buffer them into concurrent groups based on the concurrent repair size
+                .flatMap( new Func1<List<String>, Observable<Integer>>() {
+
+                    @Override
+                    public Observable<Integer> call( final List<String> types ) {
+
+
+                        final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                        final List<Observable<Integer>> checks = new ArrayList<Observable<Integer>>( types.size() );
+
+                        //for each id type, check if the exist in parallel to increase processing speed
+                        for ( final String sourceIdType : types ) {
+
+                            final SearchByIdType searchData =   new SimpleSearchByIdType( targetId, edgeType, version, sourceIdType, null );
+
+                            Observable<Integer> search = getEdgesToTargetBySourceType( scope, searchData
+                                    )
+                                    .distinctUntilChanged( new Func1<MarkedEdge, Id>() {
+
+                                        //get distinct by source node
+                                        @Override
+                                        public Id call( final MarkedEdge markedEdge ) {
+                                            return markedEdge.getSourceNode();
+                                        }
+                                    } ).take( 1 ).count().doOnNext( new Action1<Integer>() {
+
+                                        @Override
+                                        public void call( final Integer count ) {
+                                            /**
+                                             * we only want to delete if no edges are in this class. If there are
+                                             * still edges
+                                             * we must retain the information in order to keep our index structure
+                                             * correct for edge
+                                             * iteration
+                                             **/
+                                            if ( count != 0 ) {
+                                                return;
+                                            }
+
+
+                                            batch.mergeShallow( edgeMetadataSerialization
+                                                    .removeIdTypeToTarget( scope, targetId, edgeType, sourceIdType,
+                                                            version ) );
+                                        }
+                                    } );
+
+                            checks.add( search );
+                        }
+
+
+                        /**
+                         * Sum up the total number of edges we had, then execute the mutation if we have anything to do
+                         */
+                        return MathObservable.sumInteger(Observable.merge( checks )).doOnNext( new Action1<Integer>() {
+                            @Override
+                            public void call( final Integer count ) {
+
+                                if(batch.isEmpty()){
+                                    return;
+                                }
+
+                                try {
+                                    batch.execute();
+                                }
+                                catch ( ConnectionException e ) {
+                                    throw new RuntimeException( "Unable to execute mutation", e );
+                                }
+                            }
+                        } );
+
+                    }
+
+                } )
+                .map( new Func1<Integer, Integer>() {
+                    @Override
+                    public Integer call( final Integer subTypes ) {
+
+                        /**
+                         * We can only execute deleting this type if no sub types were deleted
+                         */
+                        if(subTypes != 0){
+                            return subTypes;
+                        }
+
+                        try {
+                            edgeMetadataSerialization.removeEdgeTypeToTarget( scope, targetId, edgeType, version )
+                                                         .execute();
+                        }
+                        catch ( ConnectionException e ) {
+                            throw new RuntimeException( "Unable to execute mutation" );
+                        }
+
+                        return subTypes;
+                    }
+                } )
+                //if we get no edges, emit a 0 so the caller knows nothing was deleted
+                .defaultIfEmpty( 0 );
+    }
+
+
+    /**
+     * Get all existing edge types to the target node
+     */
+    private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
+
+        return Observable.create( new ObservableIterator<String>() {
+            @Override
+            protected Iterator<String> getIterator() {
+                return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
+            }
+        } ).subscribeOn( scheduler );
+    }
+
+
+    private Observable<MarkedEdge> getEdgesToTargetBySourceType( final OrganizationScope scope,
+                                                                 final SearchByIdType search ) {
+
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+                return edgeSerialization.getEdgesToTargetBySourceType( scope, search );
+            }
+        } ).subscribeOn( scheduler );
+    }
+
+
+    private Observable<String> loadEdgeIdsToTarget( final OrganizationScope scope, final SearchIdType search ) {
+        return Observable.create( new ObservableIterator<String>() {
+            @Override
+            protected Iterator<String> getIterator() {
+                return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
+            }
+        } ).subscribeOn( scheduler );
+    }
+
+
+    /**
+     * Load all edges pointing to this target
+     */
+    private Observable<MarkedEdge> loadEdgesToTarget( final OrganizationScope scope, final SearchByEdgeType search ) {
+
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+                return edgeSerialization.getEdgesToTarget( scope, search );
+            }
+        } ).subscribeOn( scheduler );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index 5b4899a..82aecff 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -186,6 +186,7 @@ public class NodeDeleteListenerTest {
 
         assertFalse( types.hasNext() );
 
+
         //no types to target
 
         types = edgeMetadataSerialization.getEdgeTypesToTarget( scope, createSearchEdge( targetNode, null ) );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
new file mode 100644
index 0000000..c17676b
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ *
+ *
+ */
+@RunWith(JukitoRunner.class)
+@UseModules({ TestGraphModule.class })
+public class EdgeAsyncTest {
+
+
+    @ClassRule
+    public static CassandraRule rule = new CassandraRule();
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    @Inject
+    protected NodeSerialization serialization;
+
+    @Inject
+    protected EdgeAsync edgeAsync;
+
+    @Inject
+    protected EdgeSerialization edgeSerialization;
+
+    @Inject
+    protected EdgeMetadataSerialization edgeMetadataSerialization;
+
+    protected OrganizationScope scope;
+
+
+    @Before
+    public void setup() {
+        scope = mock( OrganizationScope.class );
+
+        Id orgId = mock( Id.class );
+
+        when( orgId.getType() ).thenReturn( "organization" );
+        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+        when( scope.getOrganization() ).thenReturn( orgId );
+    }
+
+
+    @Test
+    public void cleanTargetNoEdgesNoMeta(){
+       //do no writes, then execute a cleanup with no meta data
+
+        final Id targetId = createId ("target" );
+        final String test = "test";
+        final UUID version = UUIDGenerator.newTimeUUID();
+
+        int value = edgeAsync.clearTargets( scope, targetId, test, version ).toBlockingObservable().single();
+
+        assertEquals("No subtypes found", 0, value);
+    }
+
+    @Test
+    public void cleanTargetSingleEge() throws ConnectionException {
+        Edge edge = createEdge( "source", "test", "target" );
+
+        edgeSerialization.writeEdge( scope, edge ).execute();
+
+        edgeMetadataSerialization.writeEdge( scope, edge ).execute();
+
+        int value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() ).toBlockingObservable().single();
+
+        assertEquals("No subtypes removed, edge exists", 1, value);
+
+        //now delete the edge
+
+        edgeSerialization.deleteEdge( scope, edge ).execute();
+
+        value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() ).toBlockingObservable().single();
+
+        assertEquals("Single subtype should be removed", 0, value);
+
+        //now verify they're gone
+
+        Iterator<String> edgeTypes = edgeMetadataSerialization.getEdgeTypesToTarget( scope,
+                new SimpleSearchEdgeType( edge.getTargetNode(), null ) );
+
+        assertFalse("No edge types exist", edgeTypes.hasNext());
+
+
+        Iterator<String> sourceTypes = edgeMetadataSerialization.getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getTargetNode(), edge.getType(), null ) );
+
+        assertFalse("No edge types exist", sourceTypes.hasNext());
+
+
+
+    }
+
+
+}