You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/12 21:22:24 UTC
[1/4] git commit: Checkpoint for delete listener and tests
Repository: incubator-usergrid
Updated Branches:
refs/heads/asyncqueue dd37909fd -> 78dc432d1
Checkpoint for delete listener and tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0a256bb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0a256bb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0a256bb5
Branch: refs/heads/asyncqueue
Commit: 0a256bb57fb376e0efff9ab13636e95960b9b27b
Parents: dd37909
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Mar 10 19:11:21 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Mar 10 19:11:21 2014 -0700
----------------------------------------------------------------------
.../persistence/collection/rx/ParallelTest.java | 218 +++++++++++++++++++
.../graph/consistency/AsyncProcessorImpl.java | 3 +-
.../graph/consistency/TimeoutTask.java | 2 +-
.../graph/impl/EdgeDeleteListener.java | 12 +-
.../graph/impl/NodeDeleteListener.java | 166 +++++++++++---
.../graph/consistency/AsyncProcessorTest.java | 14 +-
.../graph/impl/NodeDeleteListenerTest.java | 217 ++++++++++++++++++
7 files changed, 589 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a256bb5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
new file mode 100644
index 0000000..5d72ae6
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.rx;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+import com.netflix.config.ConfigurationManager;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+import rx.functions.FuncN;
+import rx.schedulers.Schedulers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Tests that provides examples of how to perform more complex RX operations
+ */
+public class ParallelTest {
+
+ private static final Logger logger = LoggerFactory.getLogger( ParallelTest.class );
+
+//
+// private static final HystrixCommandGroupKey GROUP_KEY = HystrixCommandGroupKey.Factory.asKey( "TEST_KEY" );
+//
+//
+// public static final String THREAD_POOL_SIZE = CommandUtils.getThreadPoolCoreSize( GROUP_KEY.name() );
+//
+// public static final String THREAD_POOL_QUEUE = CommandUtils.getThreadPoolMaxQueueSize( GROUP_KEY.name() );
+
+
+ /**
+ * An example of how an observable that requires a "fan out" then join should execute.
+ */
+ @Test(timeout = 5000)
+ public void concurrentFunctions() {
+ final String input = "input";
+
+ final int size = 100;
+ //since we start at index 0
+ final int expected = size - 1;
+
+
+ /**
+ * QUESTION Using this thread blocks indefinitely. The execution of the Hystrix command happens on the
+ * computation
+ * Thread if this is used
+ */
+ // final Scheduler scheduler = Schedulers.threadPoolForComputation();
+
+ //use the I/O scheduler to allow enough thread, otherwise our pool will be the same size as the # of cores
+ final Scheduler scheduler = Schedulers.io();
+
+ //set our size equal
+// ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE, size );
+ // ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE, 10 );
+
+ //reject requests we have to queue
+// ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_QUEUE, -1 );
+
+ //latch used to make each thread block to prove correctness
+ final CountDownLatch latch = new CountDownLatch( size );
+
+
+ final Multiset<String> set = HashMultiset.create();
+
+
+ //create our observable and execute it in the I/O pool since we'll be doing I/O operations
+
+ /**
+ * QUESTION: Should this use the computation scheduler since all operations (except the hystrix command) are
+ * non blocking?
+ */
+
+ final Observable<String> observable = Observable.from( input ).observeOn( scheduler );
+
+
+ Observable<Integer> thing = observable.mapMany( new Func1<String, Observable<Integer>>() {
+
+ @Override
+ public Observable<Integer> call( final String s ) {
+ List<Observable<Integer>> functions = new ArrayList<Observable<Integer>>();
+
+ logger.info( "Creating new set of observables in thread {}", Thread.currentThread().getName() );
+
+ for ( int i = 0; i < size; i++ ) {
+
+
+ final int index = i;
+
+ //create a new observable and execute the function on it. These should happen in parallel when
+ // a subscription occurs
+
+ /**
+ * QUESTION: Should this again be the process thread, not the I/O
+ */
+ Observable<String> newObservable = Observable.from( input ).subscribeOn( scheduler );
+
+ Observable<Integer> transformed = newObservable.map( new Func1<String, Integer>() {
+
+ @Override
+ public Integer call( final String s ) {
+
+ final String threadName = Thread.currentThread().getName();
+
+ logger.info( "Invoking parallel task in thread {}", threadName );
+
+// /**
+// * Simulate a Hystrix command making a call to an external resource. Invokes
+// * the Hystrix command immediately as the function is invoked. This is currently
+// * how we have to call Cassandra.
+// *
+// * TODO This needs to be re-written and evaluated once this PR is released https://github.com/Netflix/Hystrix/pull/209
+// */
+// return new HystrixCommand<Integer>( GROUP_KEY ) {
+// @Override
+// protected Integer run() throws Exception {
+//
+// final String threadName = Thread.currentThread().getName();
+//
+// logger.info( "Invoking hystrix task in thread {}", threadName );
+
+
+ set.add( threadName );
+
+ latch.countDown();
+
+ try {
+ latch.await();
+ }
+ catch ( InterruptedException e ) {
+ throw new RuntimeException( "Interrupted", e );
+ }
+
+// assertTrue( isExecutedInThread() );
+//
+// return index;
+// }
+// }.execute();
+
+ return index;
+ }
+ } );
+
+ functions.add( transformed );
+ }
+
+ /**
+ * Execute the functions above and zip the results together
+ */
+ Observable<Integer> zipped = Observable.zip( functions, new FuncN<Integer>() {
+
+ @Override
+ public Integer call( final Object... args ) {
+
+ logger.info( "Invoking zip in thread {}", Thread.currentThread().getName() );
+
+ assertEquals( size, args.length );
+
+ for ( int i = 0; i < args.length; i++ ) {
+ assertEquals( "Indexes are returned in order", i, args[i] );
+ }
+
+ //just return our string
+ return ( Integer ) args[args.length - 1];
+ }
+ } );
+
+ return zipped;
+ }
+ } );
+
+
+ final Integer last = thing.toBlockingObservable().last();
+
+
+ assertEquals( expected, last.intValue() );
+
+ assertEquals( size, set.size() );
+
+ /**
+ * Ensure only 1 entry per thread
+ */
+ for ( String entry : set.elementSet() ) {
+ assertEquals( 1, set.count( entry ) );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a256bb5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index d962a88..b949d55 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -3,7 +3,6 @@ package org.apache.usergrid.persistence.graph.consistency;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -103,7 +102,7 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
} ).subscribe( new Action1<AsynchronousMessage<T>>() {
@Override
public void call( final AsynchronousMessage<T> asynchronousMessage ) {
- //To change body of implemented methods use File | Settings | File Templates.
+ //no op
}
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a256bb5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
index 6607724..fbfad3e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
@@ -32,7 +32,7 @@ public class TimeoutTask<T> implements Action1<Scheduler.Inner> {
* We purposefully loop through a tight loop. If we have anything to process, we need to do so
* Once we run out of items to process, this thread will sleep and the timer will fire
*/
- while(!inner.isUnsubscribed()) {
+ while(!inner.isUnsubscribed() && graphFig.getTimeoutReadSize() > 0) {
Iterator<AsynchronousMessage<T>> timeouts = getTimeouts();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a256bb5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
index 6feded3..85e4b4f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -79,7 +79,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
edge.getVersion(), null ) );
}
- } ).doOnEach( new Action1<MarkedEdge>() {
+ } ).doOnNext( new Action1<MarkedEdge>() {
@Override
public void call( final MarkedEdge markedEdge ) {
final MutationBatch delete = edgeSerialization.deleteEdge( scope, markedEdge );
@@ -93,7 +93,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
Observable<Integer> sourceIdType = edgeManager.loadEdgesFromSourceByType(
new SimpleSearchByIdType( edge.getSourceNode(), edge.getType(), maxVersion,
edge.getTargetNode().getType(), null ) ).take( 2 ).count()
- .doOnEach( new Action1<Integer>() {
+ .doOnNext( new Action1<Integer>() {
@Override
public void call( final Integer count ) {
//There's nothing to do,
@@ -114,7 +114,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
Observable<Integer> targetIdType = edgeManager.loadEdgesToTargetByType(
new SimpleSearchByIdType( edge.getTargetNode(), edge.getType(), maxVersion,
edge.getSourceNode().getType(), null ) ).take( 2 ).count()
- .doOnEach( new Action1<Integer>() {
+ .doOnNext( new Action1<Integer>() {
@Override
public void call( final Integer count ) {
//There's nothing to do,
@@ -136,7 +136,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
// we can't delete it
Observable<Integer> sourceType = edgeManager.loadEdgesFromSource(
new SimpleSearchByEdgeType( edge.getSourceNode(), edge.getType(), maxVersion, null ) ).take( 2 )
- .count().doOnEach( new Action1<Integer>() {
+ .count().doOnNext( new Action1<Integer>() {
@Override
public void call( final Integer count ) {
//There's nothing to do,
@@ -146,6 +146,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
if ( count == 1 ) {
final MutationBatch delete =
edgeMetadataSerialization.removeEdgeTypeFromSource( scope, edge );
+ batch.mergeShallow( delete );
}
}
} );
@@ -153,7 +154,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
Observable<Integer> targetType = edgeManager.loadEdgesToTarget(
new SimpleSearchByEdgeType( edge.getTargetNode(), edge.getType(), maxVersion, null ) ).take( 2 )
- .count().doOnEach( new Action1<Integer>() {
+ .count().doOnNext( new Action1<Integer>() {
@Override
public void call( final Integer count ) {
//There's nothing to do,
@@ -163,6 +164,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
if ( count == 1 ) {
final MutationBatch delete =
edgeMetadataSerialization.removeEdgeTypeToTarget( scope, edge );
+ batch.mergeShallow( delete );
}
}
} );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a256bb5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index aabc572..fbd5d08 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -2,9 +2,11 @@ package org.apache.usergrid.persistence.graph.impl;
import java.util.Iterator;
+import java.util.List;
import java.util.UUID;
import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.SearchEdgeType;
@@ -19,8 +21,10 @@ import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Optional;
import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import rx.Notification;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
@@ -34,6 +38,8 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
private final NodeSerialization nodeSerialization;
private final EdgeSerialization edgeSerialization;
private final EdgeMetadataSerialization edgeMetadataSerialization;
+ private final GraphFig graphFig;
+ private final Keyspace keyspace;
/**
@@ -41,12 +47,15 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
*/
@Inject
public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
- final EdgeMetadataSerialization edgeMetadataSerialization, @NodeDelete final AsyncProcessor nodeDelete) {
+ final EdgeMetadataSerialization edgeMetadataSerialization, final GraphFig graphFig,
+ final Keyspace keyspace, @NodeDelete final AsyncProcessor nodeDelete ) {
this.nodeSerialization = nodeSerialization;
this.edgeSerialization = edgeSerialization;
this.edgeMetadataSerialization = edgeMetadataSerialization;
+ this.graphFig = graphFig;
+ this.keyspace = keyspace;
nodeDelete.addListener( this );
}
@@ -63,33 +72,83 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
public Optional<UUID> call( final Id id ) {
return nodeSerialization.getMaxVersion( scope, node );
}
- } ).flatMap( new Func1<Optional<UUID>, Observable<MarkedEdge>>() {
- @Override
- public Observable<MarkedEdge> call( final Optional<UUID> uuidOptional ) {
-
- return getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
- .flatMap( new Func1<String, Observable<MarkedEdge>>() {
- @Override
- public Observable<MarkedEdge> call( final String edgeType ) {
+ } ) //only continue if it's present. Otherwise stop
+ .takeWhile( new Func1<Optional<UUID>, Boolean>() {
+ @Override
+ public Boolean call( final Optional<UUID> uuidOptional ) {
+ return uuidOptional.isPresent();
+ }
+ } )
+
+ //delete source and targets in parallel and merge them into a single observable
+ .flatMap( new Func1<Optional<UUID>, Observable<List<MarkedEdge>>>() {
+ @Override
+ public Observable<List<MarkedEdge>> call( final Optional<UUID> uuidOptional ) {
+
+ //get all edges pointing to the target node and buffer then into groups for deletion
+ Observable<List<MarkedEdge>> targetEdges =
+ getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
+ .flatMap( new Func1<String, Observable<List<MarkedEdge>>>() {
+ @Override
+ public Observable<List<MarkedEdge>> call( final String edgeType ) {
+ return loadEdgesToTarget( scope,
+ new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(),
+ null ) ).buffer( graphFig.getScanPageSize() );
+ }
+ } );
- //for each edge type, we want to search all edges < this version to the node and
- // delete them. We might want to batch this up for efficiency
- return loadEdgesToTarget( scope,
- new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(), null ) )
- .doOnEach( new Action1<MarkedEdge>() {
+ //get all edges pointing to the source node and buffer them into groups for deletion
+ Observable<List<MarkedEdge>> sourceEdges =
+ getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null ) )
+ .flatMap( new Func1<String, Observable<List<MarkedEdge>>>() {
@Override
- public void call( final MarkedEdge markedEdge ) {
- edgeSerialization.deleteEdge( scope, markedEdge );
+ public Observable<List<MarkedEdge>> call( final String edgeType ) {
+ return loadEdgesFromSource( scope,
+ new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(),
+ null ) ).buffer( graphFig.getScanPageSize() );
}
} );
- }
- } );
- }
- } ).map( new Func1<MarkedEdge, EdgeEvent<Id>>() {
+
+
+ return Observable.merge( targetEdges, sourceEdges );
+ }
+ } ).doOnNext( new Action1<List<MarkedEdge>>() {
+ @Override
+ public void call( final List<MarkedEdge> markedEdges ) {
+ MutationBatch batch = keyspace.prepareMutationBatch();
+
+
+ for ( MarkedEdge marked : markedEdges ) {
+ final MutationBatch edgeBatch = edgeSerialization.deleteEdge( scope, marked );
+ batch.mergeShallow( edgeBatch );
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation" );
+ }
+ }
+ } ).map( new Func1<List<MarkedEdge>, EdgeEvent<Id>>() {
+ @Override
+ public EdgeEvent<Id> call( final List<MarkedEdge> markedEdges ) {
+ return edgeEvent;
+ }
+ } );
+ }
+
+
+ /**
+ * Get all existing edge types to the target node
+ */
+ private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
+
+ return Observable.create( new ObservableIterator<String>() {
@Override
- public EdgeEvent<Id> call( final MarkedEdge edge ) {
- return edgeEvent;
+ protected Iterator<String> getIterator() {
+ return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
}
} );
}
@@ -97,16 +156,13 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
/**
* Get all existing edge types to the target node
- * @param scope
- * @param search
- * @return
*/
- private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
+ private Observable<String> getEdgesTypesFromSource( final OrganizationScope scope, final SearchEdgeType search ) {
return Observable.create( new ObservableIterator<String>() {
@Override
protected Iterator<String> getIterator() {
- return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
+ return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
}
} );
}
@@ -114,9 +170,6 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
/**
* Load all edges pointing to this target
- * @param scope
- * @param search
- * @return
*/
private Observable<MarkedEdge> loadEdgesToTarget( final OrganizationScope scope, final SearchByEdgeType search ) {
@@ -127,4 +180,55 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
}
} );
}
+
+
+ /**
+ * Load all edges pointing to this target
+ */
+ private Observable<MarkedEdge> loadEdgesFromSource( final OrganizationScope scope, final SearchByEdgeType search ) {
+
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeSerialization.getEdgesFromSource( scope, search );
+ }
+ } );
+ }
+
+
+ private class Delete implements Action1<List<MarkedEdge>> {
+
+ private final OrganizationScope scope;
+
+
+ private Delete( final OrganizationScope scope ) {
+ this.scope = scope;
+ }
+
+
+ @Override
+ public void call( final List<MarkedEdge> markedEdges ) {
+
+ //do nothing
+ if ( markedEdges.size() == 0 ) {
+ return;
+ }
+
+ Iterator<MarkedEdge> marked = markedEdges.iterator();
+
+ MutationBatch batch = edgeSerialization.deleteEdge( scope, marked.next() );
+
+ while ( marked.hasNext() ) {
+ final MutationBatch newDelete = edgeSerialization.deleteEdge( scope, marked.next() );
+ batch.mergeShallow( newDelete );
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute batch mutation", e );
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a256bb5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
index 9822cc5..3652757 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
@@ -29,9 +29,11 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.apache.usergrid.persistence.graph.GraphFig;
+
import rx.Observable;
-import rx.concurrency.Schedulers;
import rx.functions.Action1;
+import rx.schedulers.Schedulers;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -274,7 +276,11 @@ public class AsyncProcessorTest {
*/
public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue ) {
- AsyncProcessorImpl<T> processor = new AsyncProcessorImpl( queue, Schedulers.threadPoolForIO() );
+ GraphFig fig = mock(GraphFig.class);
+
+ when(fig.getScanPageSize()).thenReturn( 0 );
+
+ AsyncProcessorImpl<T> processor = new AsyncProcessorImpl( queue, Schedulers.io(), fig );
return processor;
@@ -305,7 +311,7 @@ public class AsyncProcessorTest {
@Override
public Observable<TestEvent> receive( final TestEvent event ) {
- return Observable.from( event ).doOnEach( new Action1<TestEvent>() {
+ return Observable.from( event ).doOnNext( new Action1<TestEvent>() {
@Override
public void call( final TestEvent testEvent ) {
events.push( testEvent );
@@ -342,7 +348,7 @@ public class AsyncProcessorTest {
@Override
public Observable<TestEvent> receive( final TestEvent event ) {
- return Observable.from( event ).doOnEach( new Action1<TestEvent>() {
+ return Observable.from( event ).doOnNext( new Action1<TestEvent>() {
@Override
public void call( final TestEvent testEvent ) {
events.push( testEvent );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a256bb5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
new file mode 100644
index 0000000..2d55698
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -0,0 +1,217 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.EdgeManager;
+import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchIdType;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ *
+ *
+ */
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
+public class NodeDeleteListenerTest {
+
+ @ClassRule
+ public static CassandraRule rule = new CassandraRule();
+
+
+ @Inject
+ @Rule
+ public MigrationManagerRule migrationManagerRule;
+
+
+ @Inject
+ protected NodeDeleteListener deleteListener;
+
+ @Inject
+ protected EdgeSerialization edgeSerialization;
+
+ @Inject
+ protected EdgeMetadataSerialization edgeMetadataSerialization;
+
+ @Inject
+ protected NodeSerialization nodeSerialization;
+
+ @Inject
+ protected EdgeManagerFactory emf;
+
+
+ protected OrganizationScope scope;
+
+
+ @Before
+ public void setup() {
+ scope = mock( OrganizationScope.class );
+
+ Id orgId = mock( Id.class );
+
+ when( orgId.getType() ).thenReturn( "organization" );
+ when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+ when( scope.getOrganization() ).thenReturn( orgId );
+ }
+
+
+ /**
+ * Simple test case that tests a single edge and removing the node. The other target node should be removed as well
+ * since it has no other targets
+ */
+ @Test
+ public void testNoDeletionMarked() {
+
+ EdgeManager em = emf.createEdgeManager( scope );
+
+ Edge edge = createEdge( "source", "test", "target" );
+
+ //write the edge
+ Edge last = em.writeEdge( edge ).toBlockingObservable().last();
+
+ assertEquals( edge, last );
+
+ Id sourceNode = edge.getSourceNode();
+
+ Id targetNode = edge.getTargetNode();
+
+
+ EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, sourceNode );
+
+ EdgeEvent<Id> event = deleteListener.receive( deleteEvent ).toBlockingObservable().lastOrDefault( null );
+
+ assertNull( "Mark was not set, no delete should be executed", event );
+
+
+ deleteEvent = new EdgeEvent<Id>( scope, targetNode );
+
+ event = deleteListener.receive( deleteEvent ).toBlockingObservable().lastOrDefault( null );
+
+ assertNull( "Mark was not set, no delete should be executed", event );
+ }
+
+
+ /**
+ * Simple test case that tests a single edge and removing the node. The other target node should be removed as well
+ * since it has no other targets
+ */
+ @Test
+ public void testRemoveSourceNode() throws ConnectionException {
+
+ EdgeManager em = emf.createEdgeManager( scope );
+
+ Edge edge = createEdge( "source", "test", "target" );
+
+ //write the edge
+ Edge last = em.writeEdge( edge ).toBlockingObservable().last();
+
+
+ assertEquals( edge, last );
+
+ Id sourceNode = edge.getSourceNode();
+
+ Id targetNode = edge.getTargetNode();
+
+
+ //mark the node so
+ UUID deleteVersion = UUIDGenerator.newTimeUUID();
+
+ nodeSerialization.mark( scope, sourceNode, deleteVersion ).execute();
+
+ EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, sourceNode );
+
+
+ EdgeEvent<Id> event = deleteListener.receive( deleteEvent ).toBlockingObservable().last();
+
+ assertEquals( deleteEvent, event );
+
+ //now verify we can't get any of the info back
+
+ UUID now = UUIDGenerator.newTimeUUID();
+
+
+ Iterator<MarkedEdge> returned = edgeSerialization
+ .getEdgesFromSource( scope, createSearchByEdge( sourceNode, edge.getType(), now, null ) );
+
+ //no edge from source node should be returned
+ assertFalse( "No source should be returned", returned.hasNext() );
+
+ //validate it's not returned by the
+
+ returned = edgeSerialization
+ .getEdgesToTarget( scope, createSearchByEdge( targetNode, edge.getType(), now, null ) );
+
+ assertFalse( "No target should be returned", returned.hasNext() );
+
+ //no types from source
+
+ Iterator<String> types =
+ edgeMetadataSerialization.getEdgeTypesFromSource( scope, createSearchEdge( sourceNode, null ) );
+
+ assertFalse( types.hasNext() );
+
+ //no types to target
+
+ types = edgeMetadataSerialization.getEdgeTypesToTarget( scope, createSearchEdge( targetNode, null ) );
+
+ assertFalse( types.hasNext() );
+
+
+ //no target types from source
+
+ Iterator<String> idTypes = edgeMetadataSerialization
+ .getIdTypesFromSource( scope, createSearchIdType( sourceNode, edge.getType(), null ) );
+
+ assertFalse( idTypes.hasNext() );
+
+ //no source types to target
+ idTypes = edgeMetadataSerialization
+ .getIdTypesToTarget( scope, createSearchIdType( targetNode, edge.getType(), null ) );
+
+ assertFalse( idTypes.hasNext() );
+ }
+
+
+ /**
+ * Simple test case that tests a single edge and removing the node. The source node should be removed as well
+ * since it has no other targets
+ */
+ @Test
+ public void testRemoveTargetNode() {
+
+ }
+}
[3/4] git commit: Working meta data cleanup
Posted by sn...@apache.org.
Working meta data cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4dd96aa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4dd96aa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4dd96aa7
Branch: refs/heads/asyncqueue
Commit: 4dd96aa7783ae4058e42ab75689f688ce3ce6e90
Parents: e8568ba
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 12 12:25:17 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 12 12:25:17 2014 -0700
----------------------------------------------------------------------
stack/corepersistence/collection/pom.xml | 8 +-
.../usergrid/persistence/graph/GraphFig.java | 6 +
.../persistence/graph/guice/GraphModule.java | 5 +
.../graph/impl/NodeDeleteListener.java | 12 +-
.../persistence/graph/impl/stage/EdgeAsync.java | 62 +++++
.../graph/impl/stage/EdgeAsyncImpl.java | 255 +++++++++++++++++++
.../graph/impl/NodeDeleteListenerTest.java | 1 +
.../graph/impl/stage/EdgeAsyncTest.java | 154 +++++++++++
8 files changed, 496 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index 95aea45..7a8e780 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -9,6 +9,7 @@
<slf4j.version>1.7.2</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<chop.version>1.0</chop.version>
+ <rx.version>0.17.0</rx.version>
</properties>
<parent>
@@ -161,8 +162,13 @@
<dependency>
<groupId>com.netflix.rxjava</groupId>
<artifactId>rxjava-core</artifactId>
- <version>0.17.0-RC7</version>
+ <version>${rx.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.netflix.rxjava</groupId>
+ <artifactId>rxjava-math</artifactId>
+ <version>${rx.version}</version>
+ </dependency>
<!--<dependency>-->
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 3f87d14..64ca397 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -15,6 +15,8 @@ public interface GraphFig extends GuicyFig {
public static final String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size";
+ public static final String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size";
+
public static final String TIMEOUT_SIZE = "usergrid.graph.timeout.page.size";
@@ -50,6 +52,10 @@ public interface GraphFig extends GuicyFig {
@Key( TIMEOUT_TASK_TIME )
long getTaskLoopTime();
+ @Default("10")
+ @Key(REPAIR_CONCURRENT_SIZE)
+ int getRepairConcurrentSize();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index ed9448b..a18e741 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -34,6 +34,8 @@ import org.apache.usergrid.persistence.graph.consistency.LocalTimeoutQueue;
import org.apache.usergrid.persistence.graph.consistency.TimeoutQueue;
import org.apache.usergrid.persistence.graph.impl.CollectionIndexObserver;
import org.apache.usergrid.persistence.graph.impl.EdgeManagerImpl;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeAsync;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeAsyncImpl;
import org.apache.usergrid.persistence.graph.serialization.CassandraConfig;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
@@ -105,6 +107,9 @@ public class GraphModule extends AbstractModule {
bind(AsyncProcessor.class).annotatedWith( EdgeDelete.class ).to( AsyncProcessorImpl.class );
bind(AsyncProcessor.class).annotatedWith( EdgeWrite.class ).to( AsyncProcessorImpl.class );
bind(AsyncProcessor.class).annotatedWith( NodeDelete.class ).to( AsyncProcessorImpl.class );
+
+ //Edge stages
+ bind( EdgeAsync.class).to( EdgeAsyncImpl.class );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index 1d2124a..58862f4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -40,10 +40,10 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
private final NodeSerialization nodeSerialization;
-// private final EdgeSerialization edgeSerialization;
-// private final EdgeMetadataSerialization edgeMetadataSerialization;
+ private final EdgeSerialization edgeSerialization;
+ private final EdgeMetadataSerialization edgeMetadataSerialization;
private final GraphFig graphFig;
-// private final Keyspace keyspace;
+ private final Keyspace keyspace;
private final Scheduler scheduler;
private final EdgeManagerFactory edgeManagerFactory;
@@ -61,10 +61,10 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
this.nodeSerialization = nodeSerialization;
-// this.edgeSerialization = edgeSerialization;
-// this.edgeMetadataSerialization = edgeMetadataSerialization;
+ this.edgeSerialization = edgeSerialization;
+ this.edgeMetadataSerialization = edgeMetadataSerialization;
this.graphFig = graphFig;
-// this.keyspace = keyspace;
+ this.keyspace = keyspace;
this.scheduler = scheduler;
this.edgeManagerFactory = edgeManagerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
new file mode 100644
index 0000000..90c1bfe
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.MutationBatch;
+
+import rx.Observable;
+
+
+/**
+ * Creates a mutation which will remove obsolete
+ *
+ */
+public interface EdgeAsync {
+
+ /**
+ * Validate that the source types can be cleaned for the given info
+ * @param scope The scope to use
+ * @param sourceId The source Id to use
+ * @param edgeType The edge type
+ * @param version The max version to clean
+ * @return The mutation with the operations
+ */
+ public Observable<Integer> cleanSources(OrganizationScope scope, Id sourceId, String edgeType, UUID version);
+
+
+ /**
+ *
+ * Remove all source id types that are empty, as well as the edge type if there are no more edges for it
+ * @param scope The scope to use
+ * @param targetId The target Id to use
+ * @param edgeType The edge type
+ * @param version The max version to clean
+ * @return The mutation with the operations
+ */
+ public Observable<Integer> clearTargets( OrganizationScope scope, Id targetId, String edgeType, UUID version );
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
new file mode 100644
index 0000000..cb2cf6d
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.SearchEdgeType;
+import org.apache.usergrid.persistence.graph.SearchIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+import org.apache.usergrid.persistence.graph.serialization.util.EdgeUtils;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.observables.MathObservable;
+
+
+/**
+ * Implementation of the cleanup of edge meta data
+ */
+@Singleton
+public class EdgeAsyncImpl implements EdgeAsync {
+
+
+ private final EdgeMetadataSerialization edgeMetadataSerialization;
+ private final EdgeSerialization edgeSerialization;
+ private final Keyspace keyspace;
+ private final GraphFig graphFig;
+ private final Scheduler scheduler;
+
+
+ @Inject
+ public EdgeAsyncImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
+ final EdgeSerialization edgeSerialization, final Keyspace keyspace, final GraphFig graphFig,
+ final Scheduler scheduler ) {
+ this.edgeMetadataSerialization = edgeMetadataSerialization;
+ this.edgeSerialization = edgeSerialization;
+ this.keyspace = keyspace;
+ this.graphFig = graphFig;
+ this.scheduler = scheduler;
+ }
+
+
+ @Override
+ public Observable<Integer> cleanSources( final OrganizationScope scope, final Id sourceId, final String edgeType,
+ final UUID version ) {
+
+
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+ @Override
+ public Observable<Integer> clearTargets( final OrganizationScope scope, final Id targetId, final String edgeType,
+ final UUID version ) {
+
+ ValidationUtils.validateOrganizationScope( scope );
+ ValidationUtils.verifyIdentity( targetId );
+ Preconditions.checkNotNull( edgeType, "edge type is required" );
+ Preconditions.checkNotNull( version, "version is required" );
+
+
+
+ return loadEdgeIdsToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) )
+ .buffer( graphFig.getRepairConcurrentSize() )
+ //buffer them into concurrent groups based on the concurrent repair size
+ .flatMap( new Func1<List<String>, Observable<Integer>>() {
+
+ @Override
+ public Observable<Integer> call( final List<String> types ) {
+
+
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ final List<Observable<Integer>> checks = new ArrayList<Observable<Integer>>( types.size() );
+
+ //for each id type, check if the exist in parallel to increase processing speed
+ for ( final String sourceIdType : types ) {
+
+ final SearchByIdType searchData = new SimpleSearchByIdType( targetId, edgeType, version, sourceIdType, null );
+
+ Observable<Integer> search = getEdgesToTargetBySourceType( scope, searchData
+ )
+ .distinctUntilChanged( new Func1<MarkedEdge, Id>() {
+
+ //get distinct by source node
+ @Override
+ public Id call( final MarkedEdge markedEdge ) {
+ return markedEdge.getSourceNode();
+ }
+ } ).take( 1 ).count().doOnNext( new Action1<Integer>() {
+
+ @Override
+ public void call( final Integer count ) {
+ /**
+ * we only want to delete if no edges are in this class. If there are
+ * still edges
+ * we must retain the information in order to keep our index structure
+ * correct for edge
+ * iteration
+ **/
+ if ( count != 0 ) {
+ return;
+ }
+
+
+ batch.mergeShallow( edgeMetadataSerialization
+ .removeIdTypeToTarget( scope, targetId, edgeType, sourceIdType,
+ version ) );
+ }
+ } );
+
+ checks.add( search );
+ }
+
+
+ /**
+ * Sum up the total number of edges we had, then execute the mutation if we have anything to do
+ */
+ return MathObservable.sumInteger(Observable.merge( checks )).doOnNext( new Action1<Integer>() {
+ @Override
+ public void call( final Integer count ) {
+
+ if(batch.isEmpty()){
+ return;
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation", e );
+ }
+ }
+ } );
+
+ }
+
+ } )
+ .map( new Func1<Integer, Integer>() {
+ @Override
+ public Integer call( final Integer subTypes ) {
+
+ /**
+ * We can only execute deleting this type if no sub types were deleted
+ */
+ if(subTypes != 0){
+ return subTypes;
+ }
+
+ try {
+ edgeMetadataSerialization.removeEdgeTypeToTarget( scope, targetId, edgeType, version )
+ .execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation" );
+ }
+
+ return subTypes;
+ }
+ } )
+ //if we get no edges, emit a 0 so the caller knows nothing was deleted
+ .defaultIfEmpty( 0 );
+ }
+
+
+ /**
+ * Get all existing edge types to the target node
+ */
+ private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
+
+ return Observable.create( new ObservableIterator<String>() {
+ @Override
+ protected Iterator<String> getIterator() {
+ return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
+ }
+ } ).subscribeOn( scheduler );
+ }
+
+
+ private Observable<MarkedEdge> getEdgesToTargetBySourceType( final OrganizationScope scope,
+ final SearchByIdType search ) {
+
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeSerialization.getEdgesToTargetBySourceType( scope, search );
+ }
+ } ).subscribeOn( scheduler );
+ }
+
+
+ private Observable<String> loadEdgeIdsToTarget( final OrganizationScope scope, final SearchIdType search ) {
+ return Observable.create( new ObservableIterator<String>() {
+ @Override
+ protected Iterator<String> getIterator() {
+ return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
+ }
+ } ).subscribeOn( scheduler );
+ }
+
+
+ /**
+ * Load all edges pointing to this target
+ */
+ private Observable<MarkedEdge> loadEdgesToTarget( final OrganizationScope scope, final SearchByEdgeType search ) {
+
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeSerialization.getEdgesToTarget( scope, search );
+ }
+ } ).subscribeOn( scheduler );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index 5b4899a..82aecff 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -186,6 +186,7 @@ public class NodeDeleteListenerTest {
assertFalse( types.hasNext() );
+
//no types to target
types = edgeMetadataSerialization.getEdgeTypesToTarget( scope, createSearchEdge( targetNode, null ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4dd96aa7/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
new file mode 100644
index 0000000..c17676b
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ *
+ *
+ */
+@RunWith(JukitoRunner.class)
+@UseModules({ TestGraphModule.class })
+public class EdgeAsyncTest {
+
+
+ @ClassRule
+ public static CassandraRule rule = new CassandraRule();
+
+
+ @Inject
+ @Rule
+ public MigrationManagerRule migrationManagerRule;
+
+
+ @Inject
+ protected NodeSerialization serialization;
+
+ @Inject
+ protected EdgeAsync edgeAsync;
+
+ @Inject
+ protected EdgeSerialization edgeSerialization;
+
+ @Inject
+ protected EdgeMetadataSerialization edgeMetadataSerialization;
+
+ protected OrganizationScope scope;
+
+
+ @Before
+ public void setup() {
+ scope = mock( OrganizationScope.class );
+
+ Id orgId = mock( Id.class );
+
+ when( orgId.getType() ).thenReturn( "organization" );
+ when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+ when( scope.getOrganization() ).thenReturn( orgId );
+ }
+
+
+ @Test
+ public void cleanTargetNoEdgesNoMeta(){
+ //do no writes, then execute a cleanup with no meta data
+
+ final Id targetId = createId ("target" );
+ final String test = "test";
+ final UUID version = UUIDGenerator.newTimeUUID();
+
+ int value = edgeAsync.clearTargets( scope, targetId, test, version ).toBlockingObservable().single();
+
+ assertEquals("No subtypes found", 0, value);
+ }
+
+ @Test
+ public void cleanTargetSingleEge() throws ConnectionException {
+ Edge edge = createEdge( "source", "test", "target" );
+
+ edgeSerialization.writeEdge( scope, edge ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge ).execute();
+
+ int value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() ).toBlockingObservable().single();
+
+ assertEquals("No subtypes removed, edge exists", 1, value);
+
+ //now delete the edge
+
+ edgeSerialization.deleteEdge( scope, edge ).execute();
+
+ value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() ).toBlockingObservable().single();
+
+ assertEquals("Single subtype should be removed", 0, value);
+
+ //now verify they're gone
+
+ Iterator<String> edgeTypes = edgeMetadataSerialization.getEdgeTypesToTarget( scope,
+ new SimpleSearchEdgeType( edge.getTargetNode(), null ) );
+
+ assertFalse("No edge types exist", edgeTypes.hasNext());
+
+
+ Iterator<String> sourceTypes = edgeMetadataSerialization.getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getTargetNode(), edge.getType(), null ) );
+
+ assertFalse("No edge types exist", sourceTypes.hasNext());
+
+
+
+ }
+
+
+}
[4/4] git commit: Passing tests for meta data cleanup.
Posted by sn...@apache.org.
Passing tests for meta data cleanup.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/78dc432d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/78dc432d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/78dc432d
Branch: refs/heads/asyncqueue
Commit: 78dc432d1ed80ca96776657ced8961fee34265d5
Parents: 4dd96aa
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 12 12:57:08 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 12 12:57:08 2014 -0700
----------------------------------------------------------------------
.../graph/impl/stage/EdgeAsyncImpl.java | 180 ++++++++++---------
.../graph/impl/stage/EdgeAsyncTest.java | 165 +++++++++++++++--
2 files changed, 242 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/78dc432d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
index cb2cf6d..ba21bf0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
@@ -38,7 +38,6 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
-import org.apache.usergrid.persistence.graph.serialization.util.EdgeUtils;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Preconditions;
@@ -83,7 +82,7 @@ public class EdgeAsyncImpl implements EdgeAsync {
@Override
public Observable<Integer> cleanSources( final OrganizationScope scope, final Id sourceId, final String edgeType,
- final UUID version ) {
+ final UUID version ) {
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -100,107 +99,114 @@ public class EdgeAsyncImpl implements EdgeAsync {
Preconditions.checkNotNull( version, "version is required" );
+ Observable<Integer> deleteCounts =
+ loadEdgeIdsToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) )
+ .buffer( graphFig.getRepairConcurrentSize() )
+ //buffer them into concurrent groups based on the concurrent repair size
+ .flatMap( new Func1<List<String>, Observable<Integer>>() {
- return loadEdgeIdsToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) )
- .buffer( graphFig.getRepairConcurrentSize() )
- //buffer them into concurrent groups based on the concurrent repair size
- .flatMap( new Func1<List<String>, Observable<Integer>>() {
-
- @Override
- public Observable<Integer> call( final List<String> types ) {
+ @Override
+ public Observable<Integer> call( final List<String> types ) {
- final MutationBatch batch = keyspace.prepareMutationBatch();
+ final MutationBatch batch = keyspace.prepareMutationBatch();
- final List<Observable<Integer>> checks = new ArrayList<Observable<Integer>>( types.size() );
+ final List<Observable<Integer>> checks =
+ new ArrayList<Observable<Integer>>( types.size() );
- //for each id type, check if the exist in parallel to increase processing speed
- for ( final String sourceIdType : types ) {
+ //for each id type, check if the exist in parallel to increase processing speed
+ for ( final String sourceIdType : types ) {
- final SearchByIdType searchData = new SimpleSearchByIdType( targetId, edgeType, version, sourceIdType, null );
+ final SearchByIdType searchData =
+ new SimpleSearchByIdType( targetId, edgeType, version, sourceIdType, null );
- Observable<Integer> search = getEdgesToTargetBySourceType( scope, searchData
- )
- .distinctUntilChanged( new Func1<MarkedEdge, Id>() {
+ Observable<Integer> search = getEdgesToTargetBySourceType( scope, searchData )
+ .distinctUntilChanged( new Func1<MarkedEdge, Id>() {
- //get distinct by source node
- @Override
- public Id call( final MarkedEdge markedEdge ) {
- return markedEdge.getSourceNode();
- }
- } ).take( 1 ).count().doOnNext( new Action1<Integer>() {
+ //get distinct by source node
+ @Override
+ public Id call( final MarkedEdge markedEdge ) {
+ return markedEdge.getSourceNode();
+ }
+ } ).take( 1 ).count().doOnNext( new Action1<Integer>() {
- @Override
- public void call( final Integer count ) {
- /**
- * we only want to delete if no edges are in this class. If there are
- * still edges
- * we must retain the information in order to keep our index structure
- * correct for edge
- * iteration
- **/
- if ( count != 0 ) {
- return;
- }
+ @Override
+ public void call( final Integer count ) {
+ /**
+ * we only want to delete if no edges are in this class. If there
+ * are
+ * still edges
+ * we must retain the information in order to keep our index
+ * structure
+ * correct for edge
+ * iteration
+ **/
+ if ( count != 0 ) {
+ return;
+ }
- batch.mergeShallow( edgeMetadataSerialization
- .removeIdTypeToTarget( scope, targetId, edgeType, sourceIdType,
- version ) );
- }
- } );
+ batch.mergeShallow( edgeMetadataSerialization
+ .removeIdTypeToTarget( scope, targetId, edgeType,
+ sourceIdType, version ) );
+ }
+ } );
- checks.add( search );
- }
+ checks.add( search );
+ }
- /**
- * Sum up the total number of edges we had, then execute the mutation if we have anything to do
- */
- return MathObservable.sumInteger(Observable.merge( checks )).doOnNext( new Action1<Integer>() {
- @Override
- public void call( final Integer count ) {
+ /**
+ * Sum up the total number of edges we had, then execute the mutation if we have
+ * anything to do
+ */
+ return MathObservable.sumInteger( Observable.merge( checks ) )
+ .doOnNext( new Action1<Integer>() {
+ @Override
+ public void call( final Integer count ) {
+
+ if ( batch.isEmpty() ) {
+ return;
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException(
+ "Unable to execute mutation", e );
+ }
+ }
+ } );
+ }
+ } )
+ //if we get no edges, emit a 0 so the caller knows nothing was deleted
+ .defaultIfEmpty( 0 );
- if(batch.isEmpty()){
- return;
- }
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute mutation", e );
- }
- }
- } );
-
- }
-
- } )
- .map( new Func1<Integer, Integer>() {
- @Override
- public Integer call( final Integer subTypes ) {
-
- /**
- * We can only execute deleting this type if no sub types were deleted
- */
- if(subTypes != 0){
- return subTypes;
- }
-
- try {
- edgeMetadataSerialization.removeEdgeTypeToTarget( scope, targetId, edgeType, version )
- .execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute mutation" );
- }
-
- return subTypes;
- }
- } )
- //if we get no edges, emit a 0 so the caller knows nothing was deleted
- .defaultIfEmpty( 0 );
+ //sum up everything emitted by sub types. If there's no edges existing for all sub types,
+ // then we can safely remove them
+ return MathObservable.sumInteger( deleteCounts ).map( new Func1<Integer, Integer>() {
+ @Override
+ public Integer call( final Integer subTypes ) {
+
+ /**
+ * We can only execute deleting this type if no sub types were deleted
+ */
+ if ( subTypes != 0 ) {
+ return subTypes;
+ }
+
+ try {
+ edgeMetadataSerialization.removeEdgeTypeToTarget( scope, targetId, edgeType, version ).execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation" );
+ }
+
+ return subTypes;
+ }
+ } );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/78dc432d/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
index c17676b..5f57e64 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
@@ -20,7 +20,9 @@
package org.apache.usergrid.persistence.graph.impl.stage;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.Set;
import java.util.UUID;
import org.jukito.JukitoRunner;
@@ -35,6 +37,7 @@ import org.apache.usergrid.persistence.collection.OrganizationScope;
import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
@@ -59,8 +62,8 @@ import static org.mockito.Mockito.when;
*
*
*/
-@RunWith(JukitoRunner.class)
-@UseModules({ TestGraphModule.class })
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
public class EdgeAsyncTest {
@@ -85,6 +88,9 @@ public class EdgeAsyncTest {
@Inject
protected EdgeMetadataSerialization edgeMetadataSerialization;
+ @Inject
+ protected GraphFig graphFig;
+
protected OrganizationScope scope;
@@ -102,53 +108,180 @@ public class EdgeAsyncTest {
@Test
- public void cleanTargetNoEdgesNoMeta(){
- //do no writes, then execute a cleanup with no meta data
+ public void cleanTargetNoEdgesNoMeta() {
+ //do no writes, then execute a cleanup with no meta data
- final Id targetId = createId ("target" );
+ final Id targetId = createId( "target" );
final String test = "test";
final UUID version = UUIDGenerator.newTimeUUID();
int value = edgeAsync.clearTargets( scope, targetId, test, version ).toBlockingObservable().single();
- assertEquals("No subtypes found", 0, value);
+ assertEquals( "No subtypes found", 0, value );
}
+
@Test
- public void cleanTargetSingleEge() throws ConnectionException {
+ public void cleanTargetSingleEdge() throws ConnectionException {
Edge edge = createEdge( "source", "test", "target" );
edgeSerialization.writeEdge( scope, edge ).execute();
edgeMetadataSerialization.writeEdge( scope, edge ).execute();
- int value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() ).toBlockingObservable().single();
+ int value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
+ .toBlockingObservable().single();
- assertEquals("No subtypes removed, edge exists", 1, value);
+ assertEquals( "No subtypes removed, edge exists", 1, value );
//now delete the edge
edgeSerialization.deleteEdge( scope, edge ).execute();
- value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() ).toBlockingObservable().single();
+ value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
+ .toBlockingObservable().single();
- assertEquals("Single subtype should be removed", 0, value);
+ assertEquals( "Single subtype should be removed", 0, value );
//now verify they're gone
- Iterator<String> edgeTypes = edgeMetadataSerialization.getEdgeTypesToTarget( scope,
- new SimpleSearchEdgeType( edge.getTargetNode(), null ) );
+ Iterator<String> edgeTypes = edgeMetadataSerialization
+ .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge.getTargetNode(), null ) );
+
+ assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+ Iterator<String> sourceTypes = edgeMetadataSerialization
+ .getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getTargetNode(), edge.getType(), null ) );
+
+ assertFalse( "No edge types exist", sourceTypes.hasNext() );
+ }
+
+
+ @Test
+ public void cleanTargetMultipleEdge() throws ConnectionException {
+
+ Id targetId = createId( "target" );
+
+ Edge edge1 = createEdge( createId("source1"), "test", targetId);
+
+
+
+ edgeSerialization.writeEdge( scope, edge1 ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge1 ).execute();
+
+ Edge edge2 = createEdge( createId("source2"), "test", targetId );
+
+ edgeSerialization.writeEdge( scope, edge2 ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge2 ).execute();
+
+ Edge edge3 = createEdge( createId("source3"), "test", targetId );
+
+ edgeSerialization.writeEdge( scope, edge3 ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge3 ).execute();
+
+
+ UUID cleanupVersion = UUIDGenerator.newTimeUUID();
+
+ int value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "No subtypes removed, edges exist", 3, value );
+
+ //now delete the edge
+
+ edgeSerialization.deleteEdge( scope, edge1 ).execute();
+
+ value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "No subtypes removed, edges exist", 2, value );
+
+ edgeSerialization.deleteEdge( scope, edge2 ).execute();
- assertFalse("No edge types exist", edgeTypes.hasNext());
+ value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+ .toBlockingObservable().single();
+ assertEquals( "No subtypes removed, edges exist", 1, value );
- Iterator<String> sourceTypes = edgeMetadataSerialization.getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getTargetNode(), edge.getType(), null ) );
+ edgeSerialization.deleteEdge( scope, edge3 ).execute();
- assertFalse("No edge types exist", sourceTypes.hasNext());
+ value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+ .toBlockingObservable().single();
+ assertEquals( "Single subtype should be removed", 0, value );
+ //now verify they're gone
+
+ Iterator<String> edgeTypes = edgeMetadataSerialization
+ .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge1.getTargetNode(), null ) );
+
+ assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+ Iterator<String> sourceTypes = edgeMetadataSerialization
+ .getIdTypesToTarget( scope, new SimpleSearchIdType( edge1.getTargetNode(), edge1.getType(), null ) );
+
+ assertFalse( "No edge types exist", sourceTypes.hasNext() );
}
+ @Test
+ public void cleanTargetMultipleEdgeBuffer() throws ConnectionException {
+
+ final Id targetId = createId( "target" );
+ final String edgeType = "test";
+
+ final int size = graphFig.getRepairConcurrentSize()*2;
+
+ Set<Edge> writtenEdges = new HashSet<Edge>();
+
+
+ for(int i = 0; i < size; i ++){
+ Edge edge = createEdge( createId("source"+i), edgeType, targetId);
+
+ edgeSerialization.writeEdge( scope, edge ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge ).execute();
+
+ writtenEdges.add( edge );
+ }
+
+
+ UUID cleanupVersion = UUIDGenerator.newTimeUUID();
+
+ int value = edgeAsync.clearTargets( scope, targetId, edgeType, cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "No subtypes removed, edges exist", size, value );
+
+ //now delete the edge
+
+ for(Edge created: writtenEdges){
+ edgeSerialization.deleteEdge( scope, created ).execute();
+ }
+
+
+ value = edgeAsync.clearTargets( scope, targetId, edgeType, cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "Subtypes removed", 0, value );
+
+ //now verify they're gone
+
+ Iterator<String> edgeTypes = edgeMetadataSerialization
+ .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( targetId, null ) );
+
+ assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+ Iterator<String> sourceTypes = edgeMetadataSerialization
+ .getIdTypesToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) );
+
+ assertFalse( "No edge types exist", sourceTypes.hasNext() );
+ }
}
[2/4] git commit: Checkpoint before refactor
Posted by sn...@apache.org.
Checkpoint before refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e8568ba4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e8568ba4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e8568ba4
Branch: refs/heads/asyncqueue
Commit: e8568ba48cb45251eab41723d53ac57a5844bc3f
Parents: 0a256bb
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 11 10:42:35 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 11 10:42:35 2014 -0700
----------------------------------------------------------------------
.../graph/impl/EdgeDeleteListener.java | 4 +-
.../persistence/graph/impl/EdgeEvent.java | 13 +-
.../graph/impl/NodeDeleteListener.java | 175 +++++++++++++++----
.../EdgeMetadataSerialization.java | 76 +++++++-
.../impl/EdgeMetadataSerializationImpl.java | 44 ++++-
.../graph/impl/NodeDeleteListenerTest.java | 8 +-
6 files changed, 261 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
index 85e4b4f..e0ceafc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -44,8 +44,8 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
@Inject
public EdgeDeleteListener( final EdgeSerialization edgeSerialization,
final EdgeMetadataSerialization edgeMetadataSerialization,
- final EdgeManagerFactory edgeManagerFactory, final Keyspace keyspace, @EdgeDelete
- final AsyncProcessor edgeDelete ) {
+ final EdgeManagerFactory edgeManagerFactory, final Keyspace keyspace,
+ @EdgeDelete final AsyncProcessor edgeDelete ) {
this.edgeSerialization = edgeSerialization;
this.edgeMetadataSerialization = edgeMetadataSerialization;
this.edgeManagerFactory = edgeManagerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
index d4b6f91..b2ba686 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
@@ -1,6 +1,8 @@
package org.apache.usergrid.persistence.graph.impl;
+import java.util.UUID;
+
import org.apache.usergrid.persistence.collection.OrganizationScope;
@@ -12,11 +14,13 @@ public class EdgeEvent<T> {
private final OrganizationScope organizationScope;
private final T data;
+ private final UUID version;
- public EdgeEvent( final OrganizationScope organizationScope, final T data ) {
+ public EdgeEvent( final OrganizationScope organizationScope, final UUID version, final T data ) {
this.organizationScope = organizationScope;
this.data = data;
+ this.version = version;
}
@@ -25,7 +29,14 @@ public class EdgeEvent<T> {
}
+ public UUID getVersion() {
+ return version;
+ }
+
+
public T getData() {
return data;
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index fbd5d08..1d2124a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -6,6 +6,7 @@ import java.util.List;
import java.util.UUID;
import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -26,6 +27,8 @@ import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
@@ -35,11 +38,15 @@ import rx.functions.Func1;
*/
public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEvent<Id>> {
+
private final NodeSerialization nodeSerialization;
- private final EdgeSerialization edgeSerialization;
- private final EdgeMetadataSerialization edgeMetadataSerialization;
+// private final EdgeSerialization edgeSerialization;
+// private final EdgeMetadataSerialization edgeMetadataSerialization;
private final GraphFig graphFig;
- private final Keyspace keyspace;
+// private final Keyspace keyspace;
+
+ private final Scheduler scheduler;
+ private final EdgeManagerFactory edgeManagerFactory;
/**
@@ -48,14 +55,19 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
@Inject
public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
final EdgeMetadataSerialization edgeMetadataSerialization, final GraphFig graphFig,
- final Keyspace keyspace, @NodeDelete final AsyncProcessor nodeDelete ) {
+ final Keyspace keyspace, final Scheduler scheduler,
+ final EdgeManagerFactory edgeManagerFactory,
+ @NodeDelete final AsyncProcessor nodeDelete ) {
this.nodeSerialization = nodeSerialization;
- this.edgeSerialization = edgeSerialization;
- this.edgeMetadataSerialization = edgeMetadataSerialization;
+// this.edgeSerialization = edgeSerialization;
+// this.edgeMetadataSerialization = edgeMetadataSerialization;
this.graphFig = graphFig;
- this.keyspace = keyspace;
+// this.keyspace = keyspace;
+ this.scheduler = scheduler;
+ this.edgeManagerFactory = edgeManagerFactory;
+
nodeDelete.addListener( this );
}
@@ -65,9 +77,10 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
final Id node = edgeEvent.getData();
final OrganizationScope scope = edgeEvent.getOrganizationScope();
+ final UUID version = edgeEvent.getVersion();
- return Observable.from( node ).map( new Func1<Id, Optional<UUID>>() {
+ return Observable.from( node ).subscribeOn( scheduler ).map( new Func1<Id, Optional<UUID>>() {
@Override
public Optional<UUID> call( final Id id ) {
return nodeSerialization.getMaxVersion( scope, node );
@@ -80,60 +93,148 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
}
} )
- //delete source and targets in parallel and merge them into a single observable
+ //delete source and targets in parallel and merge them into a single observable
.flatMap( new Func1<Optional<UUID>, Observable<List<MarkedEdge>>>() {
@Override
public Observable<List<MarkedEdge>> call( final Optional<UUID> uuidOptional ) {
//get all edges pointing to the target node and buffer then into groups for deletion
- Observable<List<MarkedEdge>> targetEdges =
+ Observable<MarkedEdge> targetEdges =
getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
- .flatMap( new Func1<String, Observable<List<MarkedEdge>>>() {
+ .flatMap( new Func1<String, Observable<MarkedEdge>>() {
@Override
- public Observable<List<MarkedEdge>> call( final String edgeType ) {
+ public Observable<MarkedEdge> call( final String edgeType ) {
return loadEdgesToTarget( scope,
new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(),
- null ) ).buffer( graphFig.getScanPageSize() );
+ null ) );
}
} );
//get all edges pointing to the source node and buffer them into groups for deletion
- Observable<List<MarkedEdge>> sourceEdges =
+ Observable<MarkedEdge> sourceEdges =
getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null ) )
- .flatMap( new Func1<String, Observable<List<MarkedEdge>>>() {
+ .flatMap( new Func1<String, Observable<MarkedEdge>>() {
@Override
- public Observable<List<MarkedEdge>> call( final String edgeType ) {
+ public Observable<MarkedEdge> call( final String edgeType ) {
return loadEdgesFromSource( scope,
new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(),
- null ) ).buffer( graphFig.getScanPageSize() );
+ null ) );
}
} );
- return Observable.merge( targetEdges, sourceEdges );
+ //each time an edge is emitted, delete it via batch mutation since we'll already be buffered
+ return Observable.merge( targetEdges, sourceEdges ).buffer( graphFig.getScanPageSize() )
+ .doOnNext( new Action1<List<MarkedEdge>>() {
+ @Override
+ public void call( final List<MarkedEdge> markedEdges ) {
+ MutationBatch batch = keyspace.prepareMutationBatch();
+
+
+ for ( MarkedEdge marked : markedEdges ) {
+ batch.mergeShallow(
+ edgeSerialization.deleteEdge( scope, marked ) );
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation" );
+ }
+ }
+ } );
}
- } ).doOnNext( new Action1<List<MarkedEdge>>() {
+ } ).last().flatMap( new Func1<List<MarkedEdge>, Observable<String>>() {
@Override
- public void call( final List<MarkedEdge> markedEdges ) {
- MutationBatch batch = keyspace.prepareMutationBatch();
+ public Observable<String> call( final List<MarkedEdge> markedEdges ) {
+
+ //delete all meta for edges to this node
+ Observable<String> targets =
+ getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
+ .doOnNext( new Action1<String>() {
+ @Override
+ public void call( final String type ) {
+
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ final Iterator<String> types = edgeMetadataSerialization
+ .getIdTypesToTarget( scope,
+ new SimpleSearchIdType( node, type, null ) );
+ while ( types.hasNext() ) {
+ batch.mergeShallow( edgeMetadataSerialization
+ .removeIdTypeToTarget( scope, node, type, types.next(),
+ version ) );
+ }
- for ( MarkedEdge marked : markedEdges ) {
- final MutationBatch edgeBatch = edgeSerialization.deleteEdge( scope, marked );
- batch.mergeShallow( edgeBatch );
- }
+ batch.mergeShallow( edgeMetadataSerialization
+ .removeEdgeTypeToTarget( scope, node, type, version ) );
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute cassandra call" );
+ }
+ }
+ } );
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute mutation" );
- }
+ //delete all meta for edges from this node
+ Observable<String> sources =
+ getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null ) )
+ .doOnNext( new Action1<String>() {
+ @Override
+ public void call( final String type ) {
+
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ final Iterator<String> types = edgeMetadataSerialization
+ .getIdTypesFromSource( scope,
+ new SimpleSearchIdType( node, type, null ) );
+
+ while ( types.hasNext() ) {
+ batch.mergeShallow( edgeMetadataSerialization
+ .removeIdTypeFromSource( scope, node, type, types.next(),
+ version ) );
+ }
+
+ batch.mergeShallow( edgeMetadataSerialization
+ .removeEdgeTypeFromSource( scope, node, type, version ) );
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute cassandra call" );
+ }
+ }
+ } );
+
+
+ //no op, just zip them up so we can do our action on our last
+ return Observable.merge( targets, sources )
+ //when we're done emitting, merge them
+ .doOnCompleted( new Action0() {
+ @Override
+ public void call() {
+ try {
+ nodeSerialization.delete( scope, node, edgeEvent.getVersion() ).execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation" );
+ }
+ }
+ } );
}
- } ).map( new Func1<List<MarkedEdge>, EdgeEvent<Id>>() {
+ } )
+ //when we're completed, we need to delete our id from types, then remove our mark
+
+ //return our event to the caller
+ .map( new Func1<String, EdgeEvent<Id>>() {
@Override
- public EdgeEvent<Id> call( final List<MarkedEdge> markedEdges ) {
+ public EdgeEvent<Id> call( final String mark ) {
return edgeEvent;
}
} );
@@ -150,7 +251,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
}
- } );
+ } ).subscribeOn( scheduler );
}
@@ -164,7 +265,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
}
- } );
+ } ).subscribeOn( scheduler );
}
@@ -178,7 +279,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesToTarget( scope, search );
}
- } );
+ } ).subscribeOn( scheduler );
}
@@ -192,7 +293,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesFromSource( scope, search );
}
- } );
+ } ).subscribeOn( scheduler );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
index 8806b92..5fa6c7c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
@@ -21,11 +21,13 @@ package org.apache.usergrid.persistence.graph.serialization;
import java.util.Iterator;
+import java.util.UUID;
import org.apache.usergrid.persistence.collection.OrganizationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.SearchEdgeType;
import org.apache.usergrid.persistence.graph.SearchIdType;
+import org.apache.usergrid.persistence.model.entity.Id;
import com.netflix.astyanax.MutationBatch;
@@ -42,8 +44,8 @@ public interface EdgeMetadataSerialization {
MutationBatch writeEdge( OrganizationScope scope, Edge edge );
/**
- * Remove all meta data from the source to the target type. The caller must ensure that this is the last
- * edge with this type at version <= edge version
+ * Remove all meta data from the source to the target type. The caller must ensure that this is the last edge with
+ * this type at version <= edge version
*
* @param scope The org scope
* @param edge The edge to remove
@@ -54,8 +56,20 @@ public interface EdgeMetadataSerialization {
/**
- * Remove all meta data from the source to the target type. The caller must ensure that this is the last
- * edge with this type at version <= edge version
+ * Remove the edge type from the source with the specified version
+ *
+ * @param scope Organization scope
+ * @param sourceNode Source node
+ * @param type The edge type
+ * @param version The version to use on the delete
+ *
+ * @return A mutation batch to use on issuing the delelete
+ */
+ MutationBatch removeEdgeTypeFromSource( OrganizationScope scope, Id sourceNode, String type, UUID version );
+
+ /**
+ * Remove all meta data from the source to the target type. The caller must ensure that this is the last edge with
+ * this type at version <= edge version
*
* @param scope The org scope
* @param edge The edge to remove
@@ -64,9 +78,25 @@ public interface EdgeMetadataSerialization {
*/
MutationBatch removeIdTypeFromSource( OrganizationScope scope, Edge edge );
+
/**
- * Remove all meta data from the target to the source type. The caller must ensure that this is the last
- * edge with this type at version <= edge version
+ * Remove all meta data from the source to the target type. The caller must ensure that this is the last edge with
+ * this type at version <= edge version
+ *
+ * @param scope Organization scope
+ * @param sourceNode Source node
+ * @param type The edge type
+ * @param idType The idType to use
+ * @param version The version to use on the delete
+ *
+ * @return a mutation batch with the delete operations
+ */
+ MutationBatch removeIdTypeFromSource( OrganizationScope scope, Id sourceNode, String type, String idType,
+ UUID version );
+
+ /**
+ * Remove all meta data from the target to the source type. The caller must ensure that this is the last edge with
+ * this type at version <= edge version
*
* @param scope The org scope
* @param edge The edge to remove
@@ -77,8 +107,22 @@ public interface EdgeMetadataSerialization {
/**
- * Remove all meta data from the target to the source type. The caller must ensure that this is the last
- * edge with this type at version <= edge version
+ * Remove all meta data from the target to the source type. The caller must ensure that this is the last edge with
+ * this type at version <= edge version
+ *
+ * @param scope Organization scope
+ * @param targetNode Source node
+ * @param type The edge type
+ * @param version The version to use on the delete
+ *
+ * @return A mutation batch to use on issuing the delelete
+ */
+ MutationBatch removeEdgeTypeToTarget( OrganizationScope scope, Id targetNode, String type, UUID version );
+
+
+ /**
+ * Remove all meta data from the target to the source type. The caller must ensure that this is the last edge with
+ * this type at version <= edge version
*
* @param scope The org scope
* @param edge The edge to remove
@@ -87,6 +131,22 @@ public interface EdgeMetadataSerialization {
*/
MutationBatch removeIdTypeToTarget( OrganizationScope scope, Edge edge );
+
+ /**
+ * Remove all meta data from the target to the source type. The caller must ensure that this is the last edge with
+ * this type at version <= edge version
+ *
+ * @param scope Organization scope
+ * @param targetNode Source node
+ * @param type The edge type
+ * @param idType The idType to use
+ * @param version The version to use on the delete
+ *
+ * @return a mutation batch with the delete operations
+ */
+ MutationBatch removeIdTypeToTarget( OrganizationScope scope, Id targetNode, String type, String idType,
+ UUID version );
+
/**
* Get all edge types from the given source node
*
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
index b6028ea..49a8be6 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
@@ -175,27 +175,55 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
@Override
public MutationBatch removeEdgeTypeFromSource( final OrganizationScope scope, final Edge edge ) {
- return removeEdgeType( scope, edge.getSourceNode(), edge.getType(), edge.getVersion(), CF_SOURCE_EDGE_TYPES );
+ return removeEdgeTypeFromSource( scope, edge.getSourceNode(), edge.getType(), edge.getVersion() );
+ }
+
+
+ @Override
+ public MutationBatch removeEdgeTypeFromSource( final OrganizationScope scope, final Id sourceNode,
+ final String type, final UUID version ) {
+ return removeEdgeType( scope, sourceNode, type, version, CF_SOURCE_EDGE_TYPES );
}
@Override
public MutationBatch removeIdTypeFromSource( final OrganizationScope scope, final Edge edge ) {
- return removeIdType( scope, edge.getSourceNode(), edge.getTargetNode(), edge.getType(), edge.getVersion(),
- CF_SOURCE_EDGE_ID_TYPES );
+ return removeIdTypeFromSource( scope, edge.getSourceNode(), edge.getType(), edge.getTargetNode().getType(),
+ edge.getVersion() );
+ }
+
+
+ @Override
+ public MutationBatch removeIdTypeFromSource( final OrganizationScope scope, final Id sourceNode, final String type,
+ final String idType, final UUID version ) {
+ return removeIdType( scope, sourceNode, idType, type, version,
+ CF_SOURCE_EDGE_ID_TYPES );
}
@Override
public MutationBatch removeEdgeTypeToTarget( final OrganizationScope scope, final Edge edge ) {
- return removeEdgeType( scope, edge.getTargetNode(), edge.getType(), edge.getVersion(), CF_TARGET_EDGE_TYPES );
+ return removeEdgeTypeToTarget( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() );
}
+ @Override
+ public MutationBatch removeEdgeTypeToTarget( final OrganizationScope scope, final Id targetNode, final String type,
+ final UUID version ) {
+ return removeEdgeType( scope, targetNode,type, version, CF_TARGET_EDGE_TYPES );
+ }
+
@Override
public MutationBatch removeIdTypeToTarget( final OrganizationScope scope, final Edge edge ) {
- return removeIdType( scope, edge.getTargetNode(), edge.getSourceNode(), edge.getType(), edge.getVersion(),
+ return removeIdTypeToTarget(scope, edge.getTargetNode(), edge.getType(), edge.getSourceNode().getType(), edge.getVersion());
+ }
+
+
+ @Override
+ public MutationBatch removeIdTypeToTarget( final OrganizationScope scope, final Id targetNode, final String type,
+ final String idType, final UUID version ) {
+ return removeIdType( scope, targetNode, idType,type, version,
CF_TARGET_EDGE_ID_TYPES );
}
@@ -234,14 +262,14 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
*
* @param scope The scope to use
* @param rowId The id to use in the row key
- * @param colId The id type to use in the column
+ * @param idType The id type to use in the column
* @param edgeType The edge type to use in the column
* @param version The version to use on the column
* @param cf The column family to use
*
* @return A populated mutation with the remove operations
*/
- private MutationBatch removeIdType( final OrganizationScope scope, final Id rowId, final Id colId,
+ private MutationBatch removeIdType( final OrganizationScope scope, final Id rowId, final String idType,
final String edgeType, final UUID version,
final MultiTennantColumnFamily<OrganizationScope, EdgeIdTypeKey, String> cf ) {
@@ -256,7 +284,7 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
new ScopedRowKey<OrganizationScope, EdgeIdTypeKey>( scope, new EdgeIdTypeKey( rowId, edgeType ) );
- batch.withRow( cf, rowKey ).setTimestamp( timestamp ).deleteColumn( colId.getType() );
+ batch.withRow( cf, rowKey ).setTimestamp( timestamp ).deleteColumn( idType );
return batch;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index 2d55698..5b4899a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -109,15 +109,17 @@ public class NodeDeleteListenerTest {
Id targetNode = edge.getTargetNode();
+ UUID version = UUIDGenerator.newTimeUUID();
- EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, sourceNode );
+
+ EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, version, sourceNode );
EdgeEvent<Id> event = deleteListener.receive( deleteEvent ).toBlockingObservable().lastOrDefault( null );
assertNull( "Mark was not set, no delete should be executed", event );
- deleteEvent = new EdgeEvent<Id>( scope, targetNode );
+ deleteEvent = new EdgeEvent<Id>( scope, version, targetNode );
event = deleteListener.receive( deleteEvent ).toBlockingObservable().lastOrDefault( null );
@@ -152,7 +154,7 @@ public class NodeDeleteListenerTest {
nodeSerialization.mark( scope, sourceNode, deleteVersion ).execute();
- EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, sourceNode );
+ EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, deleteVersion, sourceNode );
EdgeEvent<Id> event = deleteListener.receive( deleteEvent ).toBlockingObservable().last();