You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/25 16:21:10 UTC
[11/43] git commit: Working meta data cleanup
Working meta data cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4dd96aa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4dd96aa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4dd96aa7
Branch: refs/heads/two-dot-o
Commit: 4dd96aa7783ae4058e42ab75689f688ce3ce6e90
Parents: e8568ba
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 12 12:25:17 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 12 12:25:17 2014 -0700
----------------------------------------------------------------------
stack/corepersistence/collection/pom.xml | 8 +-
.../usergrid/persistence/graph/GraphFig.java | 6 +
.../persistence/graph/guice/GraphModule.java | 5 +
.../graph/impl/NodeDeleteListener.java | 12 +-
.../persistence/graph/impl/stage/EdgeAsync.java | 62 +++++
.../graph/impl/stage/EdgeAsyncImpl.java | 255 +++++++++++++++++++
.../graph/impl/NodeDeleteListenerTest.java | 1 +
.../graph/impl/stage/EdgeAsyncTest.java | 154 +++++++++++
8 files changed, 496 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index 95aea45..7a8e780 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -9,6 +9,7 @@
<slf4j.version>1.7.2</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<chop.version>1.0</chop.version>
+ <rx.version>0.17.0</rx.version>
</properties>
<parent>
@@ -161,8 +162,13 @@
<dependency>
<groupId>com.netflix.rxjava</groupId>
<artifactId>rxjava-core</artifactId>
- <version>0.17.0-RC7</version>
+ <version>${rx.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.netflix.rxjava</groupId>
+ <artifactId>rxjava-math</artifactId>
+ <version>${rx.version}</version>
+ </dependency>
<!--<dependency>-->
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 3f87d14..64ca397 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -15,6 +15,8 @@ public interface GraphFig extends GuicyFig {
public static final String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size";
+ public static final String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size";
+
public static final String TIMEOUT_SIZE = "usergrid.graph.timeout.page.size";
@@ -50,6 +52,10 @@ public interface GraphFig extends GuicyFig {
@Key( TIMEOUT_TASK_TIME )
long getTaskLoopTime();
+ @Default("10")
+ @Key(REPAIR_CONCURRENT_SIZE)
+ int getRepairConcurrentSize();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index ed9448b..a18e741 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -34,6 +34,8 @@ import org.apache.usergrid.persistence.graph.consistency.LocalTimeoutQueue;
import org.apache.usergrid.persistence.graph.consistency.TimeoutQueue;
import org.apache.usergrid.persistence.graph.impl.CollectionIndexObserver;
import org.apache.usergrid.persistence.graph.impl.EdgeManagerImpl;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeAsync;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeAsyncImpl;
import org.apache.usergrid.persistence.graph.serialization.CassandraConfig;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
@@ -105,6 +107,9 @@ public class GraphModule extends AbstractModule {
bind(AsyncProcessor.class).annotatedWith( EdgeDelete.class ).to( AsyncProcessorImpl.class );
bind(AsyncProcessor.class).annotatedWith( EdgeWrite.class ).to( AsyncProcessorImpl.class );
bind(AsyncProcessor.class).annotatedWith( NodeDelete.class ).to( AsyncProcessorImpl.class );
+
+ //Edge stages
+ bind( EdgeAsync.class).to( EdgeAsyncImpl.class );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index 1d2124a..58862f4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -40,10 +40,10 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
private final NodeSerialization nodeSerialization;
-// private final EdgeSerialization edgeSerialization;
-// private final EdgeMetadataSerialization edgeMetadataSerialization;
+ private final EdgeSerialization edgeSerialization;
+ private final EdgeMetadataSerialization edgeMetadataSerialization;
private final GraphFig graphFig;
-// private final Keyspace keyspace;
+ private final Keyspace keyspace;
private final Scheduler scheduler;
private final EdgeManagerFactory edgeManagerFactory;
@@ -61,10 +61,10 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
this.nodeSerialization = nodeSerialization;
-// this.edgeSerialization = edgeSerialization;
-// this.edgeMetadataSerialization = edgeMetadataSerialization;
+ this.edgeSerialization = edgeSerialization;
+ this.edgeMetadataSerialization = edgeMetadataSerialization;
this.graphFig = graphFig;
-// this.keyspace = keyspace;
+ this.keyspace = keyspace;
this.scheduler = scheduler;
this.edgeManagerFactory = edgeManagerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
new file mode 100644
index 0000000..90c1bfe
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.MutationBatch;
+
+import rx.Observable;
+
+
+/**
+ * Creates a mutation which will remove obsolete
+ *
+ */
+public interface EdgeAsync {
+
+ /**
+ * Validate that the source types can be cleaned for the given info
+ * @param scope The scope to use
+ * @param sourceId The source Id to use
+ * @param edgeType The edge type
+ * @param version The max version to clean
+ * @return The mutation with the operations
+ */
+ public Observable<Integer> cleanSources(OrganizationScope scope, Id sourceId, String edgeType, UUID version);
+
+
+ /**
+ *
+ * Remove all source id types that are empty, as well as the edge type if there are no more edges for it
+ * @param scope The scope to use
+ * @param targetId The target Id to use
+ * @param edgeType The edge type
+ * @param version The max version to clean
+ * @return The mutation with the operations
+ */
+ public Observable<Integer> clearTargets( OrganizationScope scope, Id targetId, String edgeType, UUID version );
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
new file mode 100644
index 0000000..cb2cf6d
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.SearchEdgeType;
+import org.apache.usergrid.persistence.graph.SearchIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+import org.apache.usergrid.persistence.graph.serialization.util.EdgeUtils;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.observables.MathObservable;
+
+
+/**
+ * Implementation of the cleanup of edge meta data
+ */
+@Singleton
+public class EdgeAsyncImpl implements EdgeAsync {
+
+
+ private final EdgeMetadataSerialization edgeMetadataSerialization;
+ private final EdgeSerialization edgeSerialization;
+ private final Keyspace keyspace;
+ private final GraphFig graphFig;
+ private final Scheduler scheduler;
+
+
+ @Inject
+ public EdgeAsyncImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
+ final EdgeSerialization edgeSerialization, final Keyspace keyspace, final GraphFig graphFig,
+ final Scheduler scheduler ) {
+ this.edgeMetadataSerialization = edgeMetadataSerialization;
+ this.edgeSerialization = edgeSerialization;
+ this.keyspace = keyspace;
+ this.graphFig = graphFig;
+ this.scheduler = scheduler;
+ }
+
+
+ @Override
+ public Observable<Integer> cleanSources( final OrganizationScope scope, final Id sourceId, final String edgeType,
+ final UUID version ) {
+
+
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+ @Override
+ public Observable<Integer> clearTargets( final OrganizationScope scope, final Id targetId, final String edgeType,
+ final UUID version ) {
+
+ ValidationUtils.validateOrganizationScope( scope );
+ ValidationUtils.verifyIdentity( targetId );
+ Preconditions.checkNotNull( edgeType, "edge type is required" );
+ Preconditions.checkNotNull( version, "version is required" );
+
+
+
+ return loadEdgeIdsToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) )
+ .buffer( graphFig.getRepairConcurrentSize() )
+ //buffer them into concurrent groups based on the concurrent repair size
+ .flatMap( new Func1<List<String>, Observable<Integer>>() {
+
+ @Override
+ public Observable<Integer> call( final List<String> types ) {
+
+
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ final List<Observable<Integer>> checks = new ArrayList<Observable<Integer>>( types.size() );
+
+ //for each id type, check if the exist in parallel to increase processing speed
+ for ( final String sourceIdType : types ) {
+
+ final SearchByIdType searchData = new SimpleSearchByIdType( targetId, edgeType, version, sourceIdType, null );
+
+ Observable<Integer> search = getEdgesToTargetBySourceType( scope, searchData
+ )
+ .distinctUntilChanged( new Func1<MarkedEdge, Id>() {
+
+ //get distinct by source node
+ @Override
+ public Id call( final MarkedEdge markedEdge ) {
+ return markedEdge.getSourceNode();
+ }
+ } ).take( 1 ).count().doOnNext( new Action1<Integer>() {
+
+ @Override
+ public void call( final Integer count ) {
+ /**
+ * we only want to delete if no edges are in this class. If there are
+ * still edges
+ * we must retain the information in order to keep our index structure
+ * correct for edge
+ * iteration
+ **/
+ if ( count != 0 ) {
+ return;
+ }
+
+
+ batch.mergeShallow( edgeMetadataSerialization
+ .removeIdTypeToTarget( scope, targetId, edgeType, sourceIdType,
+ version ) );
+ }
+ } );
+
+ checks.add( search );
+ }
+
+
+ /**
+ * Sum up the total number of edges we had, then execute the mutation if we have anything to do
+ */
+ return MathObservable.sumInteger(Observable.merge( checks )).doOnNext( new Action1<Integer>() {
+ @Override
+ public void call( final Integer count ) {
+
+ if(batch.isEmpty()){
+ return;
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation", e );
+ }
+ }
+ } );
+
+ }
+
+ } )
+ .map( new Func1<Integer, Integer>() {
+ @Override
+ public Integer call( final Integer subTypes ) {
+
+ /**
+ * We can only execute deleting this type if no sub types were deleted
+ */
+ if(subTypes != 0){
+ return subTypes;
+ }
+
+ try {
+ edgeMetadataSerialization.removeEdgeTypeToTarget( scope, targetId, edgeType, version )
+ .execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation" );
+ }
+
+ return subTypes;
+ }
+ } )
+ //if we get no edges, emit a 0 so the caller knows nothing was deleted
+ .defaultIfEmpty( 0 );
+ }
+
+
+ /**
+ * Get all existing edge types to the target node
+ */
+ private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
+
+ return Observable.create( new ObservableIterator<String>() {
+ @Override
+ protected Iterator<String> getIterator() {
+ return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
+ }
+ } ).subscribeOn( scheduler );
+ }
+
+
+ private Observable<MarkedEdge> getEdgesToTargetBySourceType( final OrganizationScope scope,
+ final SearchByIdType search ) {
+
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeSerialization.getEdgesToTargetBySourceType( scope, search );
+ }
+ } ).subscribeOn( scheduler );
+ }
+
+
+ private Observable<String> loadEdgeIdsToTarget( final OrganizationScope scope, final SearchIdType search ) {
+ return Observable.create( new ObservableIterator<String>() {
+ @Override
+ protected Iterator<String> getIterator() {
+ return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
+ }
+ } ).subscribeOn( scheduler );
+ }
+
+
+ /**
+ * Load all edges pointing to this target
+ */
+ private Observable<MarkedEdge> loadEdgesToTarget( final OrganizationScope scope, final SearchByEdgeType search ) {
+
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeSerialization.getEdgesToTarget( scope, search );
+ }
+ } ).subscribeOn( scheduler );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index 5b4899a..82aecff 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -186,6 +186,7 @@ public class NodeDeleteListenerTest {
assertFalse( types.hasNext() );
+
//no types to target
types = edgeMetadataSerialization.getEdgeTypesToTarget( scope, createSearchEdge( targetNode, null ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
new file mode 100644
index 0000000..c17676b
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ *
+ *
+ */
+@RunWith(JukitoRunner.class)
+@UseModules({ TestGraphModule.class })
+public class EdgeAsyncTest {
+
+
+ @ClassRule
+ public static CassandraRule rule = new CassandraRule();
+
+
+ @Inject
+ @Rule
+ public MigrationManagerRule migrationManagerRule;
+
+
+ @Inject
+ protected NodeSerialization serialization;
+
+ @Inject
+ protected EdgeAsync edgeAsync;
+
+ @Inject
+ protected EdgeSerialization edgeSerialization;
+
+ @Inject
+ protected EdgeMetadataSerialization edgeMetadataSerialization;
+
+ protected OrganizationScope scope;
+
+
+ @Before
+ public void setup() {
+ scope = mock( OrganizationScope.class );
+
+ Id orgId = mock( Id.class );
+
+ when( orgId.getType() ).thenReturn( "organization" );
+ when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+ when( scope.getOrganization() ).thenReturn( orgId );
+ }
+
+
+ @Test
+ public void cleanTargetNoEdgesNoMeta(){
+ //do no writes, then execute a cleanup with no meta data
+
+ final Id targetId = createId ("target" );
+ final String test = "test";
+ final UUID version = UUIDGenerator.newTimeUUID();
+
+ int value = edgeAsync.clearTargets( scope, targetId, test, version ).toBlockingObservable().single();
+
+ assertEquals("No subtypes found", 0, value);
+ }
+
+ @Test
+ public void cleanTargetSingleEge() throws ConnectionException {
+ Edge edge = createEdge( "source", "test", "target" );
+
+ edgeSerialization.writeEdge( scope, edge ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge ).execute();
+
+ int value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() ).toBlockingObservable().single();
+
+ assertEquals("No subtypes removed, edge exists", 1, value);
+
+ //now delete the edge
+
+ edgeSerialization.deleteEdge( scope, edge ).execute();
+
+ value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() ).toBlockingObservable().single();
+
+ assertEquals("Single subtype should be removed", 0, value);
+
+ //now verify they're gone
+
+ Iterator<String> edgeTypes = edgeMetadataSerialization.getEdgeTypesToTarget( scope,
+ new SimpleSearchEdgeType( edge.getTargetNode(), null ) );
+
+ assertFalse("No edge types exist", edgeTypes.hasNext());
+
+
+ Iterator<String> sourceTypes = edgeMetadataSerialization.getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getTargetNode(), edge.getType(), null ) );
+
+ assertFalse("No edge types exist", sourceTypes.hasNext());
+
+
+
+ }
+
+
+}