You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/14 23:21:03 UTC
[1/2] git commit: Temporary commit before adding delete repair
Repository: incubator-usergrid
Updated Branches:
refs/heads/asyncqueue 78dc432d1 -> 078ca7dca
Temporary commit before adding delete repair
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/96e6f5fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/96e6f5fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/96e6f5fc
Branch: refs/heads/asyncqueue
Commit: 96e6f5fcb1ae332b9bbd333bc52b4aab613300f6
Parents: 78dc432
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 14 11:38:07 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 14 11:38:07 2014 -0700
----------------------------------------------------------------------
.../persistence/graph/guice/GraphModule.java | 19 +-
.../persistence/graph/impl/stage/EdgeAsync.java | 62 ---
.../graph/impl/stage/EdgeAsyncImpl.java | 261 -----------
.../graph/impl/stage/EdgeDeleteRepair.java | 41 ++
.../graph/impl/stage/EdgeMetaRepair.java | 62 +++
.../graph/impl/stage/EdgeMetaRepairImpl.java | 331 ++++++++++++++
.../graph/impl/stage/EdgeWriteRepair.java | 41 ++
.../graph/impl/stage/EdgeWriteRepairImpl.java | 155 +++++++
.../graph/impl/stage/EdgeAsyncTest.java | 287 ------------
.../graph/impl/stage/EdgeMetaRepairTest.java | 451 +++++++++++++++++++
.../graph/impl/stage/EdgeWriteRepairTest.java | 185 ++++++++
11 files changed, 1274 insertions(+), 621 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index a18e741..c81dc82 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -34,8 +34,10 @@ import org.apache.usergrid.persistence.graph.consistency.LocalTimeoutQueue;
import org.apache.usergrid.persistence.graph.consistency.TimeoutQueue;
import org.apache.usergrid.persistence.graph.impl.CollectionIndexObserver;
import org.apache.usergrid.persistence.graph.impl.EdgeManagerImpl;
-import org.apache.usergrid.persistence.graph.impl.stage.EdgeAsync;
-import org.apache.usergrid.persistence.graph.impl.stage.EdgeAsyncImpl;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepair;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepairImpl;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeWriteRepair;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeWriteRepairImpl;
import org.apache.usergrid.persistence.graph.serialization.CassandraConfig;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
@@ -45,16 +47,9 @@ import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSeri
import org.apache.usergrid.persistence.graph.serialization.impl.EdgeSerializationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.NodeSerializationImpl;
-import com.google.common.eventbus.EventBus;
import com.google.inject.AbstractModule;
-import com.google.inject.Provides;
-import com.google.inject.TypeLiteral;
import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.google.inject.matcher.Matchers;
import com.google.inject.multibindings.Multibinder;
-import com.google.inject.spi.InjectionListener;
-import com.google.inject.spi.TypeEncounter;
-import com.google.inject.spi.TypeListener;
/**
@@ -108,8 +103,10 @@ public class GraphModule extends AbstractModule {
bind(AsyncProcessor.class).annotatedWith( EdgeWrite.class ).to( AsyncProcessorImpl.class );
bind(AsyncProcessor.class).annotatedWith( NodeDelete.class ).to( AsyncProcessorImpl.class );
- //Edge stages
- bind( EdgeAsync.class).to( EdgeAsyncImpl.class );
+ //Repair/cleanup classes
+ bind( EdgeMetaRepair.class).to( EdgeMetaRepairImpl.class );
+
+ bind( EdgeWriteRepair.class).to( EdgeWriteRepairImpl.class );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
deleted file mode 100644
index 90c1bfe..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl.stage;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.netflix.astyanax.MutationBatch;
-
-import rx.Observable;
-
-
-/**
- * Creates a mutation which will remove obsolete
- *
- */
-public interface EdgeAsync {
-
- /**
- * Validate that the source types can be cleaned for the given info
- * @param scope The scope to use
- * @param sourceId The source Id to use
- * @param edgeType The edge type
- * @param version The max version to clean
- * @return The mutation with the operations
- */
- public Observable<Integer> cleanSources(OrganizationScope scope, Id sourceId, String edgeType, UUID version);
-
-
- /**
- *
- * Remove all source id types that are empty, as well as the edge type if there are no more edges for it
- * @param scope The scope to use
- * @param targetId The target Id to use
- * @param edgeType The edge type
- * @param version The max version to clean
- * @return The mutation with the operations
- */
- public Observable<Integer> clearTargets( OrganizationScope scope, Id targetId, String edgeType, UUID version );
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
deleted file mode 100644
index ba21bf0..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl.stage;
-
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.SearchByIdType;
-import org.apache.usergrid.persistence.graph.SearchEdgeType;
-import org.apache.usergrid.persistence.graph.SearchIdType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
-import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import rx.Observable;
-import rx.Scheduler;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.observables.MathObservable;
-
-
-/**
- * Implementation of the cleanup of edge meta data
- */
-@Singleton
-public class EdgeAsyncImpl implements EdgeAsync {
-
-
- private final EdgeMetadataSerialization edgeMetadataSerialization;
- private final EdgeSerialization edgeSerialization;
- private final Keyspace keyspace;
- private final GraphFig graphFig;
- private final Scheduler scheduler;
-
-
- @Inject
- public EdgeAsyncImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
- final EdgeSerialization edgeSerialization, final Keyspace keyspace, final GraphFig graphFig,
- final Scheduler scheduler ) {
- this.edgeMetadataSerialization = edgeMetadataSerialization;
- this.edgeSerialization = edgeSerialization;
- this.keyspace = keyspace;
- this.graphFig = graphFig;
- this.scheduler = scheduler;
- }
-
-
- @Override
- public Observable<Integer> cleanSources( final OrganizationScope scope, final Id sourceId, final String edgeType,
- final UUID version ) {
-
-
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public Observable<Integer> clearTargets( final OrganizationScope scope, final Id targetId, final String edgeType,
- final UUID version ) {
-
- ValidationUtils.validateOrganizationScope( scope );
- ValidationUtils.verifyIdentity( targetId );
- Preconditions.checkNotNull( edgeType, "edge type is required" );
- Preconditions.checkNotNull( version, "version is required" );
-
-
- Observable<Integer> deleteCounts =
- loadEdgeIdsToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) )
- .buffer( graphFig.getRepairConcurrentSize() )
- //buffer them into concurrent groups based on the concurrent repair size
- .flatMap( new Func1<List<String>, Observable<Integer>>() {
-
- @Override
- public Observable<Integer> call( final List<String> types ) {
-
-
- final MutationBatch batch = keyspace.prepareMutationBatch();
-
- final List<Observable<Integer>> checks =
- new ArrayList<Observable<Integer>>( types.size() );
-
- //for each id type, check if the exist in parallel to increase processing speed
- for ( final String sourceIdType : types ) {
-
- final SearchByIdType searchData =
- new SimpleSearchByIdType( targetId, edgeType, version, sourceIdType, null );
-
- Observable<Integer> search = getEdgesToTargetBySourceType( scope, searchData )
- .distinctUntilChanged( new Func1<MarkedEdge, Id>() {
-
- //get distinct by source node
- @Override
- public Id call( final MarkedEdge markedEdge ) {
- return markedEdge.getSourceNode();
- }
- } ).take( 1 ).count().doOnNext( new Action1<Integer>() {
-
- @Override
- public void call( final Integer count ) {
- /**
- * we only want to delete if no edges are in this class. If there
- * are
- * still edges
- * we must retain the information in order to keep our index
- * structure
- * correct for edge
- * iteration
- **/
- if ( count != 0 ) {
- return;
- }
-
-
- batch.mergeShallow( edgeMetadataSerialization
- .removeIdTypeToTarget( scope, targetId, edgeType,
- sourceIdType, version ) );
- }
- } );
-
- checks.add( search );
- }
-
-
- /**
- * Sum up the total number of edges we had, then execute the mutation if we have
- * anything to do
- */
- return MathObservable.sumInteger( Observable.merge( checks ) )
- .doOnNext( new Action1<Integer>() {
- @Override
- public void call( final Integer count ) {
-
- if ( batch.isEmpty() ) {
- return;
- }
-
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException(
- "Unable to execute mutation", e );
- }
- }
- } );
- }
- } )
- //if we get no edges, emit a 0 so the caller knows nothing was deleted
- .defaultIfEmpty( 0 );
-
-
- //sum up everything emitted by sub types. If there's no edges existing for all sub types,
- // then we can safely remove them
- return MathObservable.sumInteger( deleteCounts ).map( new Func1<Integer, Integer>() {
- @Override
- public Integer call( final Integer subTypes ) {
-
- /**
- * We can only execute deleting this type if no sub types were deleted
- */
- if ( subTypes != 0 ) {
- return subTypes;
- }
-
- try {
- edgeMetadataSerialization.removeEdgeTypeToTarget( scope, targetId, edgeType, version ).execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute mutation" );
- }
-
- return subTypes;
- }
- } );
- }
-
-
- /**
- * Get all existing edge types to the target node
- */
- private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
-
- return Observable.create( new ObservableIterator<String>() {
- @Override
- protected Iterator<String> getIterator() {
- return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
- }
- } ).subscribeOn( scheduler );
- }
-
-
- private Observable<MarkedEdge> getEdgesToTargetBySourceType( final OrganizationScope scope,
- final SearchByIdType search ) {
-
- return Observable.create( new ObservableIterator<MarkedEdge>() {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
- return edgeSerialization.getEdgesToTargetBySourceType( scope, search );
- }
- } ).subscribeOn( scheduler );
- }
-
-
- private Observable<String> loadEdgeIdsToTarget( final OrganizationScope scope, final SearchIdType search ) {
- return Observable.create( new ObservableIterator<String>() {
- @Override
- protected Iterator<String> getIterator() {
- return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
- }
- } ).subscribeOn( scheduler );
- }
-
-
- /**
- * Load all edges pointing to this target
- */
- private Observable<MarkedEdge> loadEdgesToTarget( final OrganizationScope scope, final SearchByEdgeType search ) {
-
- return Observable.create( new ObservableIterator<MarkedEdge>() {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
- return edgeSerialization.getEdgesToTarget( scope, search );
- }
- } ).subscribeOn( scheduler );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java
new file mode 100644
index 0000000..487472c
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+
+import rx.Observable;
+
+
+/**
+ * Interface to perform repair operations on an edge when it is written
+ */
+public interface EdgeDeleteRepair {
+
+ /**
+ * Repair this edge. Remove previous entries including this one
+ * @param scope
+ * @param edge
+ */
+ public Observable<MarkedEdge> repair( OrganizationScope scope, Edge edge );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java
new file mode 100644
index 0000000..31207d8
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Audits edge meta data and removes them if they're obscelete
+ */
+public interface EdgeMetaRepair {
+
+ /**
+ * Validate that the source types can be cleaned for the given info
+ *
+ * @param scope The scope to use
+ * @param sourceId The source Id to use
+ * @param edgeType The edge type
+ * @param version The max version to clean
+ *
+ * @return An observable that emits the total number of sub types still in use. 0 implies the type and subtypes
+ * have been removed. Anything > 0 implies the edgeType and subTypes are still in use
+ */
+ public Observable<Integer> repairSources( OrganizationScope scope, Id sourceId, String edgeType, UUID version );
+
+
+ /**
+ * Remove all source id types that are empty, as well as the edge type if there are no more edges for it
+ *
+ * @param scope The scope to use
+ * @param targetId The target Id to use
+ * @param edgeType The edge type
+ * @param version The max version to clean
+ *
+ * @return An observable that emits the total number of sub types still in use. 0 implies the type and subtypes
+ * have been removed. Anything > 0 implies the edgeType and subTypes are still in use
+ */
+ public Observable<Integer> repairTargets( OrganizationScope scope, Id targetId, String edgeType, UUID version );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
new file mode 100644
index 0000000..51777ab
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.observables.MathObservable;
+
+
+/**
+ * Implementation of the cleanup of edge meta data
+ */
+@Singleton
+public class EdgeMetaRepairImpl implements EdgeMetaRepair {
+
+
+ private final EdgeMetadataSerialization edgeMetadataSerialization;
+ private final EdgeSerialization edgeSerialization;
+ private final Keyspace keyspace;
+ private final GraphFig graphFig;
+ private final Scheduler scheduler;
+
+
+ @Inject
+ public EdgeMetaRepairImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
+ final EdgeSerialization edgeSerialization, final Keyspace keyspace,
+ final GraphFig graphFig, final Scheduler scheduler ) {
+ this.edgeMetadataSerialization = edgeMetadataSerialization;
+ this.edgeSerialization = edgeSerialization;
+ this.keyspace = keyspace;
+ this.graphFig = graphFig;
+ this.scheduler = scheduler;
+ }
+
+
+ @Override
+ public Observable<Integer> repairSources( final OrganizationScope scope, final Id sourceId, final String edgeType,
+ final UUID version ) {
+
+
+ return clearTypes( scope, sourceId, edgeType, version, source );
+ }
+
+
+ @Override
+ public Observable<Integer> repairTargets( final OrganizationScope scope, final Id targetId, final String edgeType,
+ final UUID version ) {
+ return clearTypes( scope, targetId, edgeType, version, target );
+ }
+
+
+ private Observable<Integer> clearTypes( final OrganizationScope scope, final Id node, final String edgeType,
+ final UUID version, final CleanSerialization serialization ) {
+
+ ValidationUtils.validateOrganizationScope( scope );
+ ValidationUtils.verifyIdentity( node );
+ Preconditions.checkNotNull( edgeType, "edge type is required" );
+ Preconditions.checkNotNull( version, "version is required" );
+
+
+ Observable<Integer> deleteCounts = serialization.loadEdgeSubTypes( scope, node, edgeType, version )
+ .buffer( graphFig.getRepairConcurrentSize() )
+ //buffer them into concurrent groups based on the concurrent repair size
+ .flatMap( new Func1<List<String>, Observable<Integer>>() {
+
+ @Override
+ public Observable<Integer> call( final List<String> types ) {
+
+
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ final List<Observable<Integer>> checks = new ArrayList<Observable<Integer>>( types.size() );
+
+ //for each id type, check if the exist in parallel to increase processing speed
+ for ( final String sourceIdType : types ) {
+
+ Observable<Integer> search =
+ serialization.loadEdges( scope, node, edgeType, sourceIdType, version )
+ .distinctUntilChanged( new Func1<MarkedEdge, Id>() {
+
+ //get distinct by source node
+ @Override
+ public Id call( final MarkedEdge markedEdge ) {
+ return markedEdge.getSourceNode();
+ }
+ } ).take( 1 ).count().doOnNext( new Action1<Integer>() {
+
+ @Override
+ public void call( final Integer count ) {
+ /**
+ * we only want to delete if no edges are in this class. If there
+ * are
+ * still edges
+ * we must retain the information in order to keep our index
+ * structure
+ * correct for edge
+ * iteration
+ **/
+ if ( count != 0 ) {
+ return;
+ }
+
+ batch.mergeShallow( serialization
+ .removeEdgeSubType( scope, node, edgeType, sourceIdType,
+ version ) );
+ }
+ } );
+
+ checks.add( search );
+ }
+
+
+ /**
+ * Sum up the total number of edges we had, then execute the mutation if we have
+ * anything to do
+ */
+ return MathObservable.sumInteger( Observable.merge( checks ) )
+ .doOnNext( new Action1<Integer>() {
+ @Override
+ public void call( final Integer count ) {
+
+ if ( batch.isEmpty() ) {
+ return;
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation", e );
+ }
+ }
+ } );
+ }
+ } )
+ //if we get no edges, emit a 0 so the caller knows we can delete the type
+ .defaultIfEmpty( 0 );
+
+
+ //sum up everything emitted by sub types. If there's no edges existing for all sub types,
+ // then we can safely remove them
+ return MathObservable.sumInteger( deleteCounts ).last().doOnNext( new Action1<Integer>() {
+
+ @Override
+ public void call( final Integer subTypes ) {
+
+ /**
+ * We can only execute deleting this type if no sub types were deleted
+ */
+ if ( subTypes != 0 ) {
+ return;
+ }
+
+ try {
+
+ serialization.removeEdgeType( scope, node, edgeType, version ).execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation" );
+ }
+
+ }
+ } );
+ }
+
+
+ /**
+ * Simple edge serialization
+ */
+ private static interface CleanSerialization {
+
+ /**
+ * Load all subtypes for the edge with a version <= the provided version
+ */
+ Observable<String> loadEdgeSubTypes( final OrganizationScope scope, final Id nodeId, final String type,
+ final UUID version );
+
+
+ /**
+ * Load an observable with edges from the details provided
+ */
+ Observable<MarkedEdge> loadEdges( final OrganizationScope scope, final Id nodeId, final String edgeType,
+ final String subType, final UUID version );
+
+ /**
+ * Remove the sub type specified
+ */
+ MutationBatch removeEdgeSubType( final OrganizationScope scope, final Id nodeId, final String edgeType,
+ final String subType, final UUID version );
+
+ /**
+ * Remove the edge type
+ */
+ MutationBatch removeEdgeType( final OrganizationScope scope, final Id nodeId, final String type,
+ final UUID version );
+ }
+
+
+ /**
+ * Target serialization i/o for cleaning target edges
+ */
+ private final CleanSerialization target = new CleanSerialization() {
+
+ @Override
+ public Observable<String> loadEdgeSubTypes( final OrganizationScope scope, final Id nodeId,
+ final String edgeType, final UUID version ) {
+ return Observable.create( new ObservableIterator<String>() {
+ @Override
+ protected Iterator<String> getIterator() {
+ return edgeMetadataSerialization
+ .getIdTypesToTarget( scope, new SimpleSearchIdType( nodeId, edgeType, null ) );
+ }
+ } ).subscribeOn( scheduler );
+ }
+
+
+ @Override
+ public Observable<MarkedEdge> loadEdges( final OrganizationScope scope, final Id nodeId, final String edgeType,
+ final String subType, final UUID version ) {
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeSerialization.getEdgesToTargetBySourceType( scope,
+ new SimpleSearchByIdType( nodeId, edgeType, version, subType, null ) );
+ }
+ } ).subscribeOn( scheduler );
+ }
+
+
+ @Override
+ public MutationBatch removeEdgeSubType( final OrganizationScope scope, final Id nodeId, final String type,
+ final String subType, final UUID version ) {
+ return edgeMetadataSerialization.removeIdTypeToTarget( scope, nodeId, type, subType, version );
+ }
+
+
+ @Override
+ public MutationBatch removeEdgeType( final OrganizationScope scope, final Id nodeId, final String type,
+ final UUID version ) {
+ return edgeMetadataSerialization.removeEdgeTypeToTarget( scope, nodeId, type, version );
+ }
+ };
+
+ /**
+ * Target serialization i/o for cleaning target edges
+ */
+ private final CleanSerialization source = new CleanSerialization() {
+
+ @Override
+ public Observable<String> loadEdgeSubTypes( final OrganizationScope scope, final Id nodeId,
+ final String edgeType, final UUID version ) {
+ return Observable.create( new ObservableIterator<String>() {
+ @Override
+ protected Iterator<String> getIterator() {
+ return edgeMetadataSerialization
+ .getIdTypesFromSource( scope, new SimpleSearchIdType( nodeId, edgeType, null ) );
+ }
+ } ).subscribeOn( scheduler );
+ }
+
+
+ @Override
+ public Observable<MarkedEdge> loadEdges( final OrganizationScope scope, final Id nodeId, final String edgeType,
+ final String subType, final UUID version ) {
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeSerialization.getEdgesFromSourceByTargetType( scope,
+ new SimpleSearchByIdType( nodeId, edgeType, version, subType, null ) );
+ }
+ } ).subscribeOn( scheduler );
+ }
+
+
+ @Override
+ public MutationBatch removeEdgeSubType( final OrganizationScope scope, final Id nodeId, final String type,
+ final String subType, final UUID version ) {
+ return edgeMetadataSerialization.removeIdTypeFromSource( scope, nodeId, type, subType, version );
+ }
+
+
+ @Override
+ public MutationBatch removeEdgeType( final OrganizationScope scope, final Id nodeId, final String type,
+ final UUID version ) {
+ return edgeMetadataSerialization.removeEdgeTypeFromSource( scope, nodeId, type, version );
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
new file mode 100644
index 0000000..550ddf1
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+
+import rx.Observable;
+
+
+/**
+ * Interface to perform repair operations on an edge when it is written
+ */
+public interface EdgeWriteRepair {
+
+ /**
+ * Repair this edge. Remove previous entries
+ * @param scope
+ * @param edge
+ */
+ public Observable<MarkedEdge> repair( OrganizationScope scope, Edge edge );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
new file mode 100644
index 0000000..548c0e9
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+
+
+/**
+ * SimpleRepair operation
+ *
+ */
+@Singleton
+public class EdgeWriteRepairImpl implements EdgeWriteRepair {
+
+ protected final EdgeSerialization edgeSerialization;
+ protected final GraphFig graphFig;
+ protected final Keyspace keyspace;
+ protected final Scheduler scheduler;
+
+
+ @Inject
+ public EdgeWriteRepairImpl( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
+ final Keyspace keyspace, final Scheduler scheduler ) {
+ this.edgeSerialization = edgeSerialization;
+ this.graphFig = graphFig;
+ this.keyspace = keyspace;
+ this.scheduler = scheduler;
+ }
+
+
+ @Override
+ public Observable<MarkedEdge> repair( final OrganizationScope scope, final Edge edge ) {
+
+ final UUID maxVersion = edge.getVersion();
+
+ //get source edges
+ Observable<MarkedEdge> sourceEdges = getEdgeVersionsFromSource( scope, edge );
+
+ //get target edges
+ Observable<MarkedEdge> targetEdges = getEdgeVersionsToTarget( scope, edge );
+
+
+ //merge source and target then deal with the distinct values
+ return Observable.merge( sourceEdges, targetEdges ).filter( new Func1<MarkedEdge, Boolean>() {
+ /**
+ * We only want to return edges < this version so we remove them
+ * @param markedEdge
+ * @return
+ */
+ @Override
+ public Boolean call( final MarkedEdge markedEdge ) {
+ return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
+ }
+ //buffer the deletes and issue them in a single mutation
+ } ).distinctUntilChanged().buffer( graphFig.getScanPageSize() )
+ .flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
+ @Override
+ public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ for ( MarkedEdge edge : markedEdges ) {
+ final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
+
+ batch.mergeShallow( delete );
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to issue write to cassandra", e );
+ }
+
+ return Observable.from( markedEdges ).subscribeOn( scheduler );
+ }
+ } );
+ }
+
+
+ /**
+ * Get all edge versions <= the specified max from the source
+ */
+ private Observable<MarkedEdge> getEdgeVersionsFromSource( final OrganizationScope scope, final Edge edge ) {
+
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+
+ final SimpleSearchByEdge search =
+ new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+ edge.getVersion(), null );
+
+ return edgeSerialization.getEdgeFromSource( scope, search );
+ }
+ } ).subscribeOn( scheduler );
+ }
+
+
+ /**
+ * Get all edge versions <= the specified max from the source
+ */
+ private Observable<MarkedEdge> getEdgeVersionsToTarget( final OrganizationScope scope, final Edge edge ) {
+
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+
+ final SimpleSearchByEdge search =
+ new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+ edge.getVersion(), null );
+
+ return edgeSerialization.getEdgeToTarget( scope, search );
+ }
+ } ).subscribeOn( scheduler );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
deleted file mode 100644
index 5f57e64..0000000
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl.stage;
-
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.UUID;
-
-import org.jukito.JukitoRunner;
-import org.jukito.UseModules;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
-import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
-import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.inject.Inject;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
-import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-/**
- *
- *
- */
-@RunWith( JukitoRunner.class )
-@UseModules( { TestGraphModule.class } )
-public class EdgeAsyncTest {
-
-
- @ClassRule
- public static CassandraRule rule = new CassandraRule();
-
-
- @Inject
- @Rule
- public MigrationManagerRule migrationManagerRule;
-
-
- @Inject
- protected NodeSerialization serialization;
-
- @Inject
- protected EdgeAsync edgeAsync;
-
- @Inject
- protected EdgeSerialization edgeSerialization;
-
- @Inject
- protected EdgeMetadataSerialization edgeMetadataSerialization;
-
- @Inject
- protected GraphFig graphFig;
-
- protected OrganizationScope scope;
-
-
- @Before
- public void setup() {
- scope = mock( OrganizationScope.class );
-
- Id orgId = mock( Id.class );
-
- when( orgId.getType() ).thenReturn( "organization" );
- when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
-
- when( scope.getOrganization() ).thenReturn( orgId );
- }
-
-
- @Test
- public void cleanTargetNoEdgesNoMeta() {
- //do no writes, then execute a cleanup with no meta data
-
- final Id targetId = createId( "target" );
- final String test = "test";
- final UUID version = UUIDGenerator.newTimeUUID();
-
- int value = edgeAsync.clearTargets( scope, targetId, test, version ).toBlockingObservable().single();
-
- assertEquals( "No subtypes found", 0, value );
- }
-
-
- @Test
- public void cleanTargetSingleEdge() throws ConnectionException {
- Edge edge = createEdge( "source", "test", "target" );
-
- edgeSerialization.writeEdge( scope, edge ).execute();
-
- edgeMetadataSerialization.writeEdge( scope, edge ).execute();
-
- int value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
- .toBlockingObservable().single();
-
- assertEquals( "No subtypes removed, edge exists", 1, value );
-
- //now delete the edge
-
- edgeSerialization.deleteEdge( scope, edge ).execute();
-
- value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
- .toBlockingObservable().single();
-
- assertEquals( "Single subtype should be removed", 0, value );
-
- //now verify they're gone
-
- Iterator<String> edgeTypes = edgeMetadataSerialization
- .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge.getTargetNode(), null ) );
-
- assertFalse( "No edge types exist", edgeTypes.hasNext() );
-
-
- Iterator<String> sourceTypes = edgeMetadataSerialization
- .getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getTargetNode(), edge.getType(), null ) );
-
- assertFalse( "No edge types exist", sourceTypes.hasNext() );
- }
-
-
- @Test
- public void cleanTargetMultipleEdge() throws ConnectionException {
-
- Id targetId = createId( "target" );
-
- Edge edge1 = createEdge( createId("source1"), "test", targetId);
-
-
-
- edgeSerialization.writeEdge( scope, edge1 ).execute();
-
- edgeMetadataSerialization.writeEdge( scope, edge1 ).execute();
-
- Edge edge2 = createEdge( createId("source2"), "test", targetId );
-
- edgeSerialization.writeEdge( scope, edge2 ).execute();
-
- edgeMetadataSerialization.writeEdge( scope, edge2 ).execute();
-
- Edge edge3 = createEdge( createId("source3"), "test", targetId );
-
- edgeSerialization.writeEdge( scope, edge3 ).execute();
-
- edgeMetadataSerialization.writeEdge( scope, edge3 ).execute();
-
-
- UUID cleanupVersion = UUIDGenerator.newTimeUUID();
-
- int value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
- .toBlockingObservable().single();
-
- assertEquals( "No subtypes removed, edges exist", 3, value );
-
- //now delete the edge
-
- edgeSerialization.deleteEdge( scope, edge1 ).execute();
-
- value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
- .toBlockingObservable().single();
-
- assertEquals( "No subtypes removed, edges exist", 2, value );
-
- edgeSerialization.deleteEdge( scope, edge2 ).execute();
-
- value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
- .toBlockingObservable().single();
-
- assertEquals( "No subtypes removed, edges exist", 1, value );
-
- edgeSerialization.deleteEdge( scope, edge3 ).execute();
-
- value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
- .toBlockingObservable().single();
-
-
- assertEquals( "Single subtype should be removed", 0, value );
-
- //now verify they're gone
-
- Iterator<String> edgeTypes = edgeMetadataSerialization
- .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge1.getTargetNode(), null ) );
-
- assertFalse( "No edge types exist", edgeTypes.hasNext() );
-
-
- Iterator<String> sourceTypes = edgeMetadataSerialization
- .getIdTypesToTarget( scope, new SimpleSearchIdType( edge1.getTargetNode(), edge1.getType(), null ) );
-
- assertFalse( "No edge types exist", sourceTypes.hasNext() );
- }
-
-
- @Test
- public void cleanTargetMultipleEdgeBuffer() throws ConnectionException {
-
- final Id targetId = createId( "target" );
- final String edgeType = "test";
-
- final int size = graphFig.getRepairConcurrentSize()*2;
-
- Set<Edge> writtenEdges = new HashSet<Edge>();
-
-
- for(int i = 0; i < size; i ++){
- Edge edge = createEdge( createId("source"+i), edgeType, targetId);
-
- edgeSerialization.writeEdge( scope, edge ).execute();
-
- edgeMetadataSerialization.writeEdge( scope, edge ).execute();
-
- writtenEdges.add( edge );
- }
-
-
- UUID cleanupVersion = UUIDGenerator.newTimeUUID();
-
- int value = edgeAsync.clearTargets( scope, targetId, edgeType, cleanupVersion )
- .toBlockingObservable().single();
-
- assertEquals( "No subtypes removed, edges exist", size, value );
-
- //now delete the edge
-
- for(Edge created: writtenEdges){
- edgeSerialization.deleteEdge( scope, created ).execute();
- }
-
-
- value = edgeAsync.clearTargets( scope, targetId, edgeType, cleanupVersion )
- .toBlockingObservable().single();
-
- assertEquals( "Subtypes removed", 0, value );
-
- //now verify they're gone
-
- Iterator<String> edgeTypes = edgeMetadataSerialization
- .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( targetId, null ) );
-
- assertFalse( "No edge types exist", edgeTypes.hasNext() );
-
-
- Iterator<String> sourceTypes = edgeMetadataSerialization
- .getIdTypesToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) );
-
- assertFalse( "No edge types exist", sourceTypes.hasNext() );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
new file mode 100644
index 0000000..7bd1a68
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ *
+ *
+ */
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
+public class EdgeMetaRepairTest {
+
+
+ @ClassRule
+ public static CassandraRule rule = new CassandraRule();
+
+
+ @Inject
+ @Rule
+ public MigrationManagerRule migrationManagerRule;
+
+
+ @Inject
+ protected EdgeMetaRepair edgeMetaRepair;
+
+ @Inject
+ protected EdgeSerialization edgeSerialization;
+
+ @Inject
+ protected EdgeMetadataSerialization edgeMetadataSerialization;
+
+ @Inject
+ protected GraphFig graphFig;
+
+ protected OrganizationScope scope;
+
+
+ @Before
+ public void setup() {
+ scope = mock( OrganizationScope.class );
+
+ Id orgId = mock( Id.class );
+
+ when( orgId.getType() ).thenReturn( "organization" );
+ when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+ when( scope.getOrganization() ).thenReturn( orgId );
+ }
+
+
+ @Test
+ public void cleanTargetNoEdgesNoMeta() {
+ //do no writes, then execute a cleanup with no meta data
+
+ final Id targetId = createId( "target" );
+ final String test = "test";
+ final UUID version = UUIDGenerator.newTimeUUID();
+
+ int value = edgeMetaRepair.repairTargets( scope, targetId, test, version ).toBlockingObservable().single();
+
+ assertEquals( "No subtypes found", 0, value );
+ }
+
+
+ @Test
+ public void cleanTargetSingleEdge() throws ConnectionException {
+ Edge edge = createEdge( "source", "test", "target" );
+
+ edgeSerialization.writeEdge( scope, edge ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge ).execute();
+
+ int value = edgeMetaRepair.repairTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
+ .toBlockingObservable().single();
+
+ assertEquals( "No subtypes removed, edge exists", 1, value );
+
+ //now delete the edge
+
+ edgeSerialization.deleteEdge( scope, edge ).execute();
+
+ value = edgeMetaRepair.repairTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
+ .toBlockingObservable().single();
+
+ assertEquals( "Single subtype should be removed", 0, value );
+
+ //now verify they're gone
+
+ Iterator<String> edgeTypes = edgeMetadataSerialization
+ .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge.getTargetNode(), null ) );
+
+ assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+ Iterator<String> sourceTypes = edgeMetadataSerialization
+ .getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getTargetNode(), edge.getType(), null ) );
+
+ assertFalse( "No edge types exist", sourceTypes.hasNext() );
+ }
+
+
+ @Test
+ public void cleanTargetMultipleEdge() throws ConnectionException {
+
+ Id targetId = createId( "target" );
+
+ Edge edge1 = createEdge( createId("source1"), "test", targetId);
+
+
+
+ edgeSerialization.writeEdge( scope, edge1 ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge1 ).execute();
+
+ Edge edge2 = createEdge( createId("source2"), "test", targetId );
+
+ edgeSerialization.writeEdge( scope, edge2 ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge2 ).execute();
+
+ Edge edge3 = createEdge( createId("source3"), "test", targetId );
+
+ edgeSerialization.writeEdge( scope, edge3 ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge3 ).execute();
+
+
+ UUID cleanupVersion = UUIDGenerator.newTimeUUID();
+
+ int value = edgeMetaRepair.repairTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "No subtypes removed, edges exist", 3, value );
+
+ //now delete the edge
+
+ edgeSerialization.deleteEdge( scope, edge1 ).execute();
+
+ value = edgeMetaRepair.repairTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "No subtypes removed, edges exist", 2, value );
+
+ edgeSerialization.deleteEdge( scope, edge2 ).execute();
+
+ value = edgeMetaRepair.repairTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "No subtypes removed, edges exist", 1, value );
+
+ edgeSerialization.deleteEdge( scope, edge3 ).execute();
+
+ value = edgeMetaRepair.repairTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+ .toBlockingObservable().single();
+
+
+ assertEquals( "Single subtype should be removed", 0, value );
+
+ //now verify they're gone
+
+ Iterator<String> edgeTypes = edgeMetadataSerialization
+ .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge1.getTargetNode(), null ) );
+
+ assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+ Iterator<String> sourceTypes = edgeMetadataSerialization
+ .getIdTypesToTarget( scope, new SimpleSearchIdType( edge1.getTargetNode(), edge1.getType(), null ) );
+
+ assertFalse( "No edge types exist", sourceTypes.hasNext() );
+ }
+
+
+ @Test
+ public void cleanTargetMultipleEdgeBuffer() throws ConnectionException {
+
+ final Id targetId = createId( "target" );
+ final String edgeType = "test";
+
+ final int size = graphFig.getRepairConcurrentSize()*2;
+
+ Set<Edge> writtenEdges = new HashSet<Edge>();
+
+
+ for(int i = 0; i < size; i ++){
+ Edge edge = createEdge( createId("source"+i), edgeType, targetId);
+
+ edgeSerialization.writeEdge( scope, edge ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge ).execute();
+
+ writtenEdges.add( edge );
+ }
+
+
+ UUID cleanupVersion = UUIDGenerator.newTimeUUID();
+
+ int value = edgeMetaRepair.repairTargets( scope, targetId, edgeType, cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "No subtypes removed, edges exist", size, value );
+
+ //now delete the edge
+
+ for(Edge created: writtenEdges){
+ edgeSerialization.deleteEdge( scope, created ).execute();
+ }
+
+
+ value = edgeMetaRepair.repairTargets( scope, targetId, edgeType, cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "Subtypes removed", 0, value );
+
+ //now verify they're gone
+
+ Iterator<String> edgeTypes = edgeMetadataSerialization
+ .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( targetId, null ) );
+
+ assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+ Iterator<String> sourceTypes = edgeMetadataSerialization
+ .getIdTypesToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) );
+
+ assertFalse( "No edge types exist", sourceTypes.hasNext() );
+ }
+
+
+
+ @Test
+ public void cleanSourceSingleEdge() throws ConnectionException {
+ Edge edge = createEdge( "source", "test", "target" );
+
+ edgeSerialization.writeEdge( scope, edge ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge ).execute();
+
+ int value = edgeMetaRepair.repairSources( scope, edge.getSourceNode(), edge.getType(), edge.getVersion() )
+ .toBlockingObservable().single();
+
+ assertEquals( "No subtypes removed, edge exists", 1, value );
+
+ //now delete the edge
+
+ edgeSerialization.deleteEdge( scope, edge ).execute();
+
+ value = edgeMetaRepair.repairSources( scope, edge.getSourceNode(), edge.getType(), edge.getVersion() )
+ .toBlockingObservable().single();
+
+ assertEquals( "Single subtype should be removed", 0, value );
+
+ //now verify they're gone
+
+ Iterator<String> edgeTypes = edgeMetadataSerialization
+ .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge.getSourceNode(), null ) );
+
+ assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+ Iterator<String> sourceTypes = edgeMetadataSerialization
+ .getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getSourceNode(), edge.getType(), null ) );
+
+ assertFalse( "No edge types exist", sourceTypes.hasNext() );
+ }
+
+
+ @Test
+ public void cleanSourceMultipleEdge() throws ConnectionException {
+
+ Id sourceId = createId( "source" );
+
+ Edge edge1 = createEdge( sourceId, "test", createId("target1"));
+
+
+
+ edgeSerialization.writeEdge( scope, edge1 ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge1 ).execute();
+
+ Edge edge2 = createEdge( sourceId, "test", createId("target2") );
+
+ edgeSerialization.writeEdge( scope, edge2 ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge2 ).execute();
+
+ Edge edge3 = createEdge( sourceId, "test", createId("target3") );
+
+ edgeSerialization.writeEdge( scope, edge3 ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge3 ).execute();
+
+
+ UUID cleanupVersion = UUIDGenerator.newTimeUUID();
+
+ int value = edgeMetaRepair.repairSources( scope, edge1.getSourceNode(), edge1.getType(), cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "No subtypes removed, edges exist", 3, value );
+
+ //now delete the edge
+
+ edgeSerialization.deleteEdge( scope, edge1 ).execute();
+
+ value = edgeMetaRepair.repairSources( scope, edge1.getSourceNode(), edge1.getType(), cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "No subtypes removed, edges exist", 2, value );
+
+ edgeSerialization.deleteEdge( scope, edge2 ).execute();
+
+ value = edgeMetaRepair.repairSources( scope, edge1.getSourceNode(), edge1.getType(), cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "No subtypes removed, edges exist", 1, value );
+
+ edgeSerialization.deleteEdge( scope, edge3 ).execute();
+
+ value = edgeMetaRepair.repairSources( scope, edge1.getSourceNode(), edge1.getType(), cleanupVersion )
+ .toBlockingObservable().single();
+
+
+ assertEquals( "Single subtype should be removed", 0, value );
+
+ //now verify they're gone
+
+ Iterator<String> edgeTypes = edgeMetadataSerialization
+ .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge1.getSourceNode(), null ) );
+
+ assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+ Iterator<String> sourceTypes = edgeMetadataSerialization
+ .getIdTypesToTarget( scope, new SimpleSearchIdType( edge1.getSourceNode(), edge1.getType(), null ) );
+
+ assertFalse( "No edge types exist", sourceTypes.hasNext() );
+ }
+
+
+ @Test
+ public void cleanSourceMultipleEdgeBuffer() throws ConnectionException {
+
+ Id sourceId = createId( "source" );
+
+ final String edgeType = "test";
+
+ final int size = graphFig.getRepairConcurrentSize()*2;
+
+ Set<Edge> writtenEdges = new HashSet<Edge>();
+
+
+ for(int i = 0; i < size; i ++){
+ Edge edge = createEdge( sourceId, edgeType, createId("target"+i));
+
+ edgeSerialization.writeEdge( scope, edge ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge ).execute();
+
+ writtenEdges.add( edge );
+ }
+
+
+ UUID cleanupVersion = UUIDGenerator.newTimeUUID();
+
+ int value = edgeMetaRepair.repairSources( scope, sourceId, edgeType, cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "No subtypes removed, edges exist", size, value );
+
+ //now delete the edge
+
+ for(Edge created: writtenEdges){
+ edgeSerialization.deleteEdge( scope, created ).execute();
+ }
+
+
+ value = edgeMetaRepair.repairSources( scope, sourceId, edgeType, cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "Subtypes removed", 0, value );
+
+ //now verify they're gone
+
+ Iterator<String> edgeTypes = edgeMetadataSerialization
+ .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( sourceId, null ) );
+
+ assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+ Iterator<String> sourceTypes = edgeMetadataSerialization
+ .getIdTypesToTarget( scope, new SimpleSearchIdType( sourceId, edgeType, null ) );
+
+ assertFalse( "No edge types exist", sourceTypes.hasNext() );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
new file mode 100644
index 0000000..352639e
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ *
+ *
+ */
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
+public class EdgeWriteRepairTest {
+
+ @ClassRule
+ public static CassandraRule rule = new CassandraRule();
+
+
+ @Inject
+ @Rule
+ public MigrationManagerRule migrationManagerRule;
+
+
+ @Inject
+ protected EdgeSerialization edgeSerialization;
+
+ @Inject
+ protected EdgeWriteRepair edgeWriteRepair;
+
+
+ protected OrganizationScope scope;
+
+
+ @Before
+ public void setup() {
+ scope = mock( OrganizationScope.class );
+
+ Id orgId = mock( Id.class );
+
+ when( orgId.getType() ).thenReturn( "organization" );
+ when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+ when( scope.getOrganization() ).thenReturn( orgId );
+ }
+
+
+ /**
+ * Test repairing with no edges
+ */
+ @Test
+ public void noEdges() {
+ Edge edge = createEdge( "source", "test", "target" );
+
+ Iterator<MarkedEdge> edges = edgeWriteRepair.repair( scope, edge ).toBlockingObservable().getIterator();
+
+ assertFalse( "No edges cleaned", edges.hasNext() );
+ }
+
+
+ /**
+ * Test repairing with no edges
+ */
+ @Test
+ public void versionTest() throws ConnectionException {
+ final int size = 10;
+
+ final List<Edge> versions = new ArrayList<Edge>( size );
+
+ final Id sourceId = createId( "source" );
+ final Id targetId = createId( "target" );
+ final String edgeType = "edge";
+
+ for ( int i = 0; i < size; i++ ) {
+ final Edge edge = createEdge( sourceId, edgeType, targetId );
+
+ versions.add( edge );
+
+ edgeSerialization.writeEdge( scope, edge ).execute();
+
+ System.out.println(String.format("[%d] %s", i, edge));
+ }
+
+
+ int keepIndex = size / 2;
+
+ Edge keep = versions.get( keepIndex );
+
+ Iterable<MarkedEdge> edges = edgeWriteRepair.repair( scope, keep ).toBlockingObservable().toIterable();
+
+
+ int index = 0;
+
+ for ( MarkedEdge edge : edges ) {
+
+ final Edge removed = versions.get( keepIndex - index -1 );
+
+ assertEquals( "Removed matches saved index", removed, edge );
+
+ index++;
+ }
+
+ //now verify we get all the versions we expect back
+ Iterator<MarkedEdge> iterator = edgeSerialization.getEdgeFromSource( scope,
+ new SimpleSearchByEdge( sourceId, edgeType, targetId, UUIDGenerator.newTimeUUID(), null ) );
+
+ index = 0;
+
+ for(MarkedEdge edge: new IterableWrapper<MarkedEdge>( iterator )){
+
+ final Edge saved = versions.get( size - index -1 );
+
+ assertEquals(saved, edge);
+
+ index++;
+ }
+
+ assertEquals("Kept edge version was the minimum", keepIndex, index);
+ }
+
+
+ private class IterableWrapper<T> implements Iterable<T> {
+ private final Iterator<T> sourceIterator;
+
+
+ private IterableWrapper( final Iterator<T> sourceIterator ) {
+ this.sourceIterator = sourceIterator;
+ }
+
+
+ @Override
+ public Iterator<T> iterator() {
+ return this.sourceIterator;
+ }
+ }
+}
[2/2] git commit: Finished serialization I/O impls
Posted by sn...@apache.org.
Finished serialization I/O impls
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/078ca7dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/078ca7dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/078ca7dc
Branch: refs/heads/asyncqueue
Commit: 078ca7dca71ce62b7a5ae0469f7485563582145c
Parents: 96e6f5f
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 14 15:14:11 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 14 15:14:11 2014 -0700
----------------------------------------------------------------------
.../persistence/graph/guice/GraphModule.java | 4 +
.../graph/impl/stage/AbstractEdgeRepair.java | 152 +++++++++++++++
.../graph/impl/stage/EdgeDeleteRepairImpl.java | 76 ++++++++
.../graph/impl/stage/EdgeWriteRepairImpl.java | 95 +---------
.../graph/impl/stage/EdgeDeleteRepairTest.java | 189 +++++++++++++++++++
.../graph/impl/stage/EdgeWriteRepairTest.java | 2 +
6 files changed, 431 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index c81dc82..a5dda9a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -34,6 +34,8 @@ import org.apache.usergrid.persistence.graph.consistency.LocalTimeoutQueue;
import org.apache.usergrid.persistence.graph.consistency.TimeoutQueue;
import org.apache.usergrid.persistence.graph.impl.CollectionIndexObserver;
import org.apache.usergrid.persistence.graph.impl.EdgeManagerImpl;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepair;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepairImpl;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepair;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepairImpl;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeWriteRepair;
@@ -107,6 +109,8 @@ public class GraphModule extends AbstractModule {
bind( EdgeMetaRepair.class).to( EdgeMetaRepairImpl.class );
bind( EdgeWriteRepair.class).to( EdgeWriteRepairImpl.class );
+
+ bind( EdgeDeleteRepair.class).to( EdgeDeleteRepairImpl.class );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
new file mode 100644
index 0000000..838de69
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+
+
+/**
+ * SimpleRepair operation
+ *
+ */
+@Singleton
+public abstract class AbstractEdgeRepair {
+
+ protected final EdgeSerialization edgeSerialization;
+ protected final GraphFig graphFig;
+ protected final Keyspace keyspace;
+ protected final Scheduler scheduler;
+
+
+ @Inject
+ public AbstractEdgeRepair( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
+ final Keyspace keyspace, final Scheduler scheduler ) {
+ this.edgeSerialization = edgeSerialization;
+ this.graphFig = graphFig;
+ this.keyspace = keyspace;
+ this.scheduler = scheduler;
+ }
+
+
+
+ public Observable<MarkedEdge> repair( final OrganizationScope scope, final Edge edge ) {
+
+ final UUID maxVersion = edge.getVersion();
+
+ //get source edges
+ Observable<MarkedEdge> sourceEdges = getEdgeVersionsFromSource( scope, edge );
+
+ //get target edges
+ Observable<MarkedEdge> targetEdges = getEdgeVersionsToTarget( scope, edge );
+
+
+ //merge source and target then deal with the distinct values
+ return Observable.merge( sourceEdges, targetEdges ).filter(getFilter(maxVersion)).distinctUntilChanged().buffer( graphFig.getScanPageSize() )
+ .flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
+ @Override
+ public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ for ( MarkedEdge edge : markedEdges ) {
+ final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
+
+ batch.mergeShallow( delete );
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to issue write to cassandra", e );
+ }
+
+ return Observable.from( markedEdges ).subscribeOn( scheduler );
+ }
+ } );
+ }
+
+
+ /**
+ * A filter used to filter out edges appropriately for the max version
+ *
+ * @param maxVersion the max version to use
+ * @return The filter to use in the max version
+ */
+ protected abstract Func1<MarkedEdge, Boolean> getFilter(final UUID maxVersion);
+
+ /**
+ * Get all edge versions <= the specified max from the source
+ */
+ private Observable<MarkedEdge> getEdgeVersionsFromSource( final OrganizationScope scope, final Edge edge ) {
+
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+
+ final SimpleSearchByEdge search =
+ new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+ edge.getVersion(), null );
+
+ return edgeSerialization.getEdgeFromSource( scope, search );
+ }
+ } ).subscribeOn( scheduler );
+ }
+
+
+ /**
+ * Get all edge versions <= the specified max from the source
+ */
+ private Observable<MarkedEdge> getEdgeVersionsToTarget( final OrganizationScope scope, final Edge edge ) {
+
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+
+ final SimpleSearchByEdge search =
+ new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+ edge.getVersion(), null );
+
+ return edgeSerialization.getEdgeToTarget( scope, search );
+ }
+ } ).subscribeOn( scheduler );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
new file mode 100644
index 0000000..bf5d3d2
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+
+
+/**
+ * SimpleRepair operation
+ */
+@Singleton
+public class EdgeDeleteRepairImpl extends AbstractEdgeRepair implements EdgeDeleteRepair {
+
+
+ @Inject
+ public EdgeDeleteRepairImpl( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
+ final Keyspace keyspace, final Scheduler scheduler ) {
+ super( edgeSerialization, graphFig, keyspace, scheduler );
+ }
+
+
+ @Override
+ public Observable<MarkedEdge> repair( final OrganizationScope scope, final Edge edge ) {
+
+ return super.repair( scope, edge );
+ }
+
+
+ @Override
+ protected Func1<MarkedEdge, Boolean> getFilter( final UUID maxVersion ) {
+ return new Func1<MarkedEdge, Boolean>() {
+ /**
+ * We only want to return edges < this version so we remove them
+ * @param markedEdge
+ * @return
+ */
+ @Override
+ public Boolean call( final MarkedEdge markedEdge ) {
+ return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 1;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
index 548c0e9..e8420a8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
@@ -20,24 +20,18 @@
package org.apache.usergrid.persistence.graph.impl.stage;
-import java.util.Iterator;
-import java.util.List;
import java.util.UUID;
import org.apache.usergrid.persistence.collection.OrganizationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
import com.fasterxml.uuid.UUIDComparator;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.Scheduler;
@@ -46,41 +40,28 @@ import rx.functions.Func1;
/**
* SimpleRepair operation
- *
*/
@Singleton
-public class EdgeWriteRepairImpl implements EdgeWriteRepair {
-
- protected final EdgeSerialization edgeSerialization;
- protected final GraphFig graphFig;
- protected final Keyspace keyspace;
- protected final Scheduler scheduler;
+public class EdgeWriteRepairImpl extends AbstractEdgeRepair implements EdgeWriteRepair {
@Inject
public EdgeWriteRepairImpl( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
final Keyspace keyspace, final Scheduler scheduler ) {
- this.edgeSerialization = edgeSerialization;
- this.graphFig = graphFig;
- this.keyspace = keyspace;
- this.scheduler = scheduler;
+ super( edgeSerialization, graphFig, keyspace, scheduler );
}
@Override
public Observable<MarkedEdge> repair( final OrganizationScope scope, final Edge edge ) {
- final UUID maxVersion = edge.getVersion();
-
- //get source edges
- Observable<MarkedEdge> sourceEdges = getEdgeVersionsFromSource( scope, edge );
-
- //get target edges
- Observable<MarkedEdge> targetEdges = getEdgeVersionsToTarget( scope, edge );
+ return super.repair( scope, edge );
+ }
- //merge source and target then deal with the distinct values
- return Observable.merge( sourceEdges, targetEdges ).filter( new Func1<MarkedEdge, Boolean>() {
+ @Override
+ protected Func1<MarkedEdge, Boolean> getFilter( final UUID maxVersion ) {
+ return new Func1<MarkedEdge, Boolean>() {
/**
* We only want to return edges < this version so we remove them
* @param markedEdge
@@ -90,66 +71,6 @@ public class EdgeWriteRepairImpl implements EdgeWriteRepair {
public Boolean call( final MarkedEdge markedEdge ) {
return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
}
- //buffer the deletes and issue them in a single mutation
- } ).distinctUntilChanged().buffer( graphFig.getScanPageSize() )
- .flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
- @Override
- public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
- final MutationBatch batch = keyspace.prepareMutationBatch();
-
- for ( MarkedEdge edge : markedEdges ) {
- final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
-
- batch.mergeShallow( delete );
- }
-
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to issue write to cassandra", e );
- }
-
- return Observable.from( markedEdges ).subscribeOn( scheduler );
- }
- } );
- }
-
-
- /**
- * Get all edge versions <= the specified max from the source
- */
- private Observable<MarkedEdge> getEdgeVersionsFromSource( final OrganizationScope scope, final Edge edge ) {
-
- return Observable.create( new ObservableIterator<MarkedEdge>() {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
-
- final SimpleSearchByEdge search =
- new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
- edge.getVersion(), null );
-
- return edgeSerialization.getEdgeFromSource( scope, search );
- }
- } ).subscribeOn( scheduler );
- }
-
-
- /**
- * Get all edge versions <= the specified max from the source
- */
- private Observable<MarkedEdge> getEdgeVersionsToTarget( final OrganizationScope scope, final Edge edge ) {
-
- return Observable.create( new ObservableIterator<MarkedEdge>() {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
-
- final SimpleSearchByEdge search =
- new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
- edge.getVersion(), null );
-
- return edgeSerialization.getEdgeToTarget( scope, search );
- }
- } ).subscribeOn( scheduler );
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
new file mode 100644
index 0000000..e8eedc9
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ *
+ *
+ */
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
+public class EdgeDeleteRepairTest {
+
+ @ClassRule
+ public static CassandraRule rule = new CassandraRule();
+
+
+ @Inject
+ @Rule
+ public MigrationManagerRule migrationManagerRule;
+
+
+ @Inject
+ protected EdgeSerialization edgeSerialization;
+
+ @Inject
+ protected EdgeDeleteRepair edgeDeleteRepair;
+
+
+ protected OrganizationScope scope;
+
+
+ @Before
+ public void setup() {
+ scope = mock( OrganizationScope.class );
+
+ Id orgId = mock( Id.class );
+
+ when( orgId.getType() ).thenReturn( "organization" );
+ when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+ when( scope.getOrganization() ).thenReturn( orgId );
+ }
+
+
+ /**
+ * Test repairing with no edges
+ */
+ @Test
+ public void noEdges() {
+ Edge edge = createEdge( "source", "test", "target" );
+
+ Iterator<MarkedEdge> edges = edgeDeleteRepair.repair( scope, edge ).toBlockingObservable().getIterator();
+
+ assertFalse( "No edges cleaned", edges.hasNext() );
+ }
+
+
+ /**
+ * Test repairing with no edges
+ * TODO: TN. There appears to be a race condition here with ordering. Not sure if this is intentional as part of the impl
+ * or if it's an issue
+ */
+ @Test
+ public void versionTest() throws ConnectionException {
+ final int size = 10;
+
+ final List<Edge> versions = new ArrayList<Edge>( size );
+
+ final Id sourceId = createId( "source" );
+ final Id targetId = createId( "target" );
+ final String edgeType = "edge";
+
+ for ( int i = 0; i < size; i++ ) {
+ final Edge edge = createEdge( sourceId, edgeType, targetId );
+
+ versions.add( edge );
+
+ edgeSerialization.writeEdge( scope, edge ).execute();
+
+ System.out.println(String.format("[%d] %s", i, edge));
+ }
+
+
+ int deleteIndex = size / 2;
+
+ Edge keep = versions.get( deleteIndex );
+
+ Iterable<MarkedEdge> edges = edgeDeleteRepair.repair( scope, keep ).toBlockingObservable().toIterable();
+
+
+ int index = 0;
+
+ for ( MarkedEdge edge : edges ) {
+
+ final Edge removed = versions.get( deleteIndex - index );
+
+ assertEquals( "Removed matches saved index", removed, edge );
+
+ index++;
+ }
+
+ //now verify we get all the versions we expect back
+ Iterator<MarkedEdge> iterator = edgeSerialization.getEdgeFromSource( scope,
+ new SimpleSearchByEdge( sourceId, edgeType, targetId, UUIDGenerator.newTimeUUID(), null ) );
+
+ index = 0;
+
+ for(MarkedEdge edge: new IterableWrapper<MarkedEdge>( iterator )){
+
+ final Edge saved = versions.get( size - index - 1);
+
+ assertEquals(saved, edge);
+
+ index++;
+ }
+
+ final int keptCount = size-deleteIndex;
+
+ assertEquals("Kept edge version was the minimum", keptCount, index+1);
+ }
+
+
+ private class IterableWrapper<T> implements Iterable<T> {
+ private final Iterator<T> sourceIterator;
+
+
+ private IterableWrapper( final Iterator<T> sourceIterator ) {
+ this.sourceIterator = sourceIterator;
+ }
+
+
+ @Override
+ public Iterator<T> iterator() {
+ return this.sourceIterator;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
index 352639e..f796c85 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
@@ -109,6 +109,8 @@ public class EdgeWriteRepairTest {
/**
* Test repairing with no edges
+ * TODO: TN. There appears to be a race condition here with ordering. Not sure if this is intentional as part of the impl
+ * or if it's an issue
*/
@Test
public void versionTest() throws ConnectionException {