You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/27 22:21:29 UTC
[15/50] [abbrv] git commit: Merged hystrix into asyncqueue
Merged hystrix into asyncqueue
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/136edaba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/136edaba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/136edaba
Branch: refs/pull/77/head
Commit: 136edaba0d9132282a097ca8ae743bf0075da7c2
Parents: 51381a3
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 25 13:18:51 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 25 13:18:51 2014 -0700
----------------------------------------------------------------------
.../usergrid/persistence/graph/EdgeManager.java | 163 --
.../persistence/graph/EdgeManagerFactory.java | 40 -
.../persistence/graph/GraphManager.java | 169 ++
.../persistence/graph/GraphManagerFactory.java | 40 +
.../graph/consistency/AsyncProcessorImpl.java | 2 +-
.../persistence/graph/guice/GraphModule.java | 10 +-
.../graph/impl/CollectionIndexObserver.java | 14 +-
.../graph/impl/EdgeDeleteListener.java | 66 +-
.../persistence/graph/impl/EdgeManagerImpl.java | 410 -----
.../graph/impl/EdgeWriteListener.java | 116 +-
.../graph/impl/GraphManagerImpl.java | 393 +++++
.../graph/impl/NodeDeleteListener.java | 139 +-
.../graph/impl/stage/AbstractEdgeRepair.java | 27 +-
.../graph/serialization/EdgeSerialization.java | 12 +-
.../impl/EdgeSerializationImpl.java | 221 ++-
.../persistence/graph/EdgeManagerIT.java | 110 +-
.../graph/EdgeManagerStressTest.java | 54 +-
.../persistence/graph/EdgeManagerTimeoutIT.java | 1562 ------------------
.../graph/GraphManagerTimeoutIT.java | 1562 ++++++++++++++++++
.../graph/consistency/AsyncProcessorTest.java | 23 +-
.../consistency/LocalTimeoutQueueTest.java | 2 +-
.../graph/guice/TestGraphModule.java | 4 +-
.../graph/impl/NodeDeleteListenerTest.java | 210 ++-
.../graph/impl/stage/EdgeDeleteRepairTest.java | 6 +-
.../graph/impl/stage/EdgeMetaRepairTest.java | 83 +-
.../graph/impl/stage/EdgeWriteRepairTest.java | 40 +-
.../EdgeSerializationChopTest.java | 35 +-
.../serialization/EdgeSerializationTest.java | 163 +-
.../serialization/NodeSerializationTest.java | 13 +-
.../serialization/util/EdgeHasherTest.java | 14 +-
stack/corepersistence/pom.xml | 1 +
31 files changed, 2986 insertions(+), 2718 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java
deleted file mode 100644
index 9f9c510..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph;
-
-
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-
-
-/**
- * Represents operations that can be performed on edges within our graph. A graph should be within an
- * OrganizationScope
- *
- * An Edge: is defined as the following.
- *
- * The edge is directed It has 2 Identifiers (Id). 1 Id is the source node, 1 Id is the target node It has an edge type
- * (a string name)
- *
- * All edges are directed edges. By definition, the direction is from Source to Target.
- *
- * I.E Source ---- type -----> Target Ex:
- *
- * Dave (user) ----"follows"---> Alex (user)
- *
- * Alex (user) ----"likes"---> Guinness (beer)
- *
- * Todd (user) ----"worksfor"-----> Apigee (company)
- *
- * Note that edges are directed. All implementations always have an implicit inverse of the directed edge. This can be
- * used to search both incoming and outgoing edges within the graph.
- *
- * @author tnine
- * @see Edge
- */
-public interface EdgeManager {
-
-
- /**
- * @param edge The edge to write
- *
- * Create or update an edge. Note that the implementation should also create incoming (reversed) edges for this
- * edge.
- */
- Observable<Edge> writeEdge( Edge edge );
-
-
- /**
- * @param edge The edge to delete
- *
- *
- * EdgeDelete the edge. Implementation should also delete the incoming (reversed) edge.
- */
- Observable<Edge> deleteEdge( Edge edge );
-
- /**
- * TODO: This needs to mark a node as deleted while consistency processing occurs, our reads would need to check this filter on read
- *
- * Remove the node from the graph.
- *
- * @param node
- * @return
- */
- Observable<Id> deleteNode(Id node);
-
- /**
- * Returns an observable that emits all edges where the specified node is the source node. The edges will match the
- * search criteria of the edge type
- *
- * @param search The search parameters
- *
- * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
- */
- Observable<Edge> loadEdgesFromSource( SearchByEdgeType search );
-
- /**
- * Returns an observable that emits all edges where the specified node is the target node. The edges will match the
- * search criteria of the edge type
- *
- * @param search The search parameters
- *
- * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
- */
- Observable<Edge> loadEdgesToTarget( SearchByEdgeType search );
-
-
- /**
- * Returns an observable that emits all edges where the specified node is the source node. The edges will match the
- * search criteria of the edge type and the target type
- *
- * @param search The search parameters
- *
- * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
- */
- Observable<Edge> loadEdgesFromSourceByType( SearchByIdType search );
-
-
- /**
- * Returns an observable that emits all edges where the specified node is the target node. The edges will match the
- * search criteria of the edge type and the target type
- *
- * @param search The search parameters
- *
- * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
- */
- Observable<Edge> loadEdgesToTargetByType( SearchByIdType search );
-
- /**
- * Get all edge types to this node. The node provided by search is the target node.
- *
- * @param search The search
- *
- * @return An observable that emits strings for edge types
- */
- Observable<String> getEdgeTypesFromSource( SearchEdgeType search );
-
-
- /**
- * Get all id types to this node. The node provided by search is the target node with the edge type to search.
- *
- * @param search The search criteria
- *
- * @return An observable of all source id types
- */
- Observable<String> getIdTypesFromSource( SearchIdType search );
-
-
- /**
- * Get all edge types from this node. The node provided by search is the source node.
- *
- * @param search The search
- *
- * @return An observable that emits strings for edge types
- */
- Observable<String> getEdgeTypesToTarget( SearchEdgeType search );
-
-
- /**
- * Get all id types from this node. The node provided by search is the source node with the edge type to search.
- *
- * @param search The search criteria
- *
- * @return An observable of all source id types
- */
- Observable<String> getIdTypesToTarget( SearchIdType search );
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManagerFactory.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManagerFactory.java
deleted file mode 100644
index f01f876..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManagerFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph;
-
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-
-
-/**
- *
- * @author: tnine
- *
- */
-public interface EdgeManagerFactory
-{
-
- /**
- * Create an graph manager for the collection context
- *
- * @param collectionScope The context to use when creating the graph manager
- */
- public EdgeManager createEdgeManager( OrganizationScope collectionScope );
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
new file mode 100644
index 0000000..8af77d6
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph;
+
+
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Represents operations that can be performed on edges within our graph. A graph should be within an
+ * OrganizationScope
+ *
+ * An Edge: is defined as the following.
+ *
+ * The edge is directed It has 2 Identifiers (Id). 1 Id is the source node, 1 Id is the target node It has an edge type
+ * (a string name)
+ *
+ * All edges are directed edges. By definition, the direction is from Source to Target.
+ *
+ * I.E Source ---- type -----> Target Ex:
+ *
+ * Dave (user) ----"follows"---> Alex (user)
+ *
+ * Alex (user) ----"likes"---> Guinness (beer)
+ *
+ * Todd (user) ----"worksfor"-----> Apigee (company)
+ *
+ * Note that edges are directed. All implementations always have an implicit inverse of the directed edge. This can be
+ * used to search both incoming and outgoing edges within the graph.
+ *
+ * @author tnine
+ * @see Edge
+ */
+public interface GraphManager {
+
+
+ /**
+ * @param edge The edge to write
+ *
+ * Create or update an edge. Note that the implementation should also create incoming (reversed) edges for this
+ * edge.
+ */
+ Observable<Edge> writeEdge( Edge edge );
+
+
+ /**
+ * @param edge The edge to delete
+ *
+ *
+ * EdgeDelete the edge. Implementation should also delete the incoming (reversed) edge. Only deletes the specific version
+ */
+ Observable<Edge> deleteEdge( Edge edge );
+
+ /**
+ *
+ * Remove the node from the graph.
+ *
+ * @param node
+ * @return
+ */
+ Observable<Id> deleteNode(Id node);
+
+ /**
+ * Get all versions of this edge where versions <= max version
+ * @param edge
+ * @return
+ */
+ Observable<Edge> loadEdgeVersions( SearchByEdge edge );
+
+ /**
+ * Returns an observable that emits all edges where the specified node is the source node. The edges will match the
+ * search criteria of the edge type
+ *
+ * @param search The search parameters
+ *
+ * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
+ */
+ Observable<Edge> loadEdgesFromSource( SearchByEdgeType search );
+
+ /**
+ * Returns an observable that emits all edges where the specified node is the target node. The edges will match the
+ * search criteria of the edge type
+ *
+ * @param search The search parameters
+ *
+ * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
+ */
+ Observable<Edge> loadEdgesToTarget( SearchByEdgeType search );
+
+
+ /**
+ * Returns an observable that emits all edges where the specified node is the source node. The edges will match the
+ * search criteria of the edge type and the target type
+ *
+ * @param search The search parameters
+ *
+ * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
+ */
+ Observable<Edge> loadEdgesFromSourceByType( SearchByIdType search );
+
+
+ /**
+ * Returns an observable that emits all edges where the specified node is the target node. The edges will match the
+ * search criteria of the edge type and the target type
+ *
+ * @param search The search parameters
+ *
+ * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
+ */
+ Observable<Edge> loadEdgesToTargetByType( SearchByIdType search );
+
+ /**
+ * Get all edge types to this node. The node provided by search is the target node.
+ *
+ * @param search The search
+ *
+ * @return An observable that emits strings for edge types
+ */
+ Observable<String> getEdgeTypesFromSource( SearchEdgeType search );
+
+
+ /**
+ * Get all id types to this node. The node provided by search is the target node with the edge type to search.
+ *
+ * @param search The search criteria
+ *
+ * @return An observable of all source id types
+ */
+ Observable<String> getIdTypesFromSource( SearchIdType search );
+
+
+ /**
+ * Get all edge types from this node. The node provided by search is the source node.
+ *
+ * @param search The search
+ *
+ * @return An observable that emits strings for edge types
+ */
+ Observable<String> getEdgeTypesToTarget( SearchEdgeType search );
+
+
+ /**
+ * Get all id types from this node. The node provided by search is the source node with the edge type to search.
+ *
+ * @param search The search criteria
+ *
+ * @return An observable of all source id types
+ */
+ Observable<String> getIdTypesToTarget( SearchIdType search );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
new file mode 100644
index 0000000..499259b
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph;
+
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+
+
+/**
+ *
+ * @author: tnine
+ *
+ */
+public interface GraphManagerFactory
+{
+
+ /**
+ * Create an graph manager for the collection context
+ *
+ * @param collectionScope The context to use when creating the graph manager
+ */
+ public GraphManager createEdgeManager( OrganizationScope collectionScope );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index 2b00d1a..20411e7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -45,7 +45,7 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
@Inject
- public AsyncProcessorImpl( final TimeoutQueue<T> queue, final GraphFig graphFig ) {
+ public AsyncProcessorImpl( final TimeoutQueue<T> queue, final GraphFig graphFig ) {
this.queue = queue;
this.graphFig = graphFig;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index b499e4c..d575471 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -24,15 +24,15 @@ import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.collection.guice.CollectionModule;
import org.apache.usergrid.persistence.collection.migration.Migration;
import org.apache.usergrid.persistence.collection.mvcc.event.PostProcessObserver;
-import org.apache.usergrid.persistence.graph.EdgeManager;
-import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
import org.apache.usergrid.persistence.graph.consistency.AsyncProcessorImpl;
import org.apache.usergrid.persistence.graph.consistency.LocalTimeoutQueue;
import org.apache.usergrid.persistence.graph.consistency.TimeoutQueue;
import org.apache.usergrid.persistence.graph.impl.CollectionIndexObserver;
-import org.apache.usergrid.persistence.graph.impl.EdgeManagerImpl;
+import org.apache.usergrid.persistence.graph.impl.GraphManagerImpl;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepair;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepairImpl;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepair;
@@ -74,8 +74,8 @@ public class GraphModule extends AbstractModule {
bind( CassandraConfig.class).to( CassandraConfigImpl.class );
// create a guice factory for getting our collection manager
- install( new FactoryModuleBuilder().implement( EdgeManager.class, EdgeManagerImpl.class )
- .build( EdgeManagerFactory.class ) );
+ install( new FactoryModuleBuilder().implement( GraphManager.class, GraphManagerImpl.class )
+ .build( GraphManagerFactory.class ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
index 6175b0b..a41cd4a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
@@ -24,8 +24,8 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.event.PostProcessObserver;
import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.EdgeManager;
-import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
@@ -38,13 +38,13 @@ import com.google.inject.Singleton;
@Singleton
public class CollectionIndexObserver implements PostProcessObserver {
- private final EdgeManagerFactory edgeManagerFactory;
+ private final GraphManagerFactory graphManagerFactory;
@Inject
- public CollectionIndexObserver( final EdgeManagerFactory edgeManagerFactory ) {
- Preconditions.checkNotNull( edgeManagerFactory, "edgeManagerFactory cannot be null" );
- this.edgeManagerFactory = edgeManagerFactory;
+ public CollectionIndexObserver( final GraphManagerFactory graphManagerFactory ) {
+ Preconditions.checkNotNull( graphManagerFactory, "graphManagerFactory cannot be null" );
+ this.graphManagerFactory = graphManagerFactory;
}
@@ -53,7 +53,7 @@ public class CollectionIndexObserver implements PostProcessObserver {
public void postCommit( final CollectionScope scope, final MvccEntity entity ) {
//get the edge manager for the org scope
- EdgeManager em = edgeManagerFactory.createEdgeManager( scope );
+ GraphManager em = graphManagerFactory.createEdgeManager( scope );
/**
* create an edge from owner->entity of the type name in the scope.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
index a5715c9..11c052b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -1,21 +1,18 @@
package org.apache.usergrid.persistence.graph.impl;
-import java.util.Iterator;
import java.util.UUID;
import org.apache.usergrid.persistence.collection.OrganizationScope;
import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.EdgeManager;
-import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
import org.apache.usergrid.persistence.graph.consistency.MessageListener;
import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -26,7 +23,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
-import rx.functions.Func5;
+import rx.functions.Func4;
/**
@@ -38,7 +35,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
private final EdgeSerialization edgeSerialization;
private final EdgeMetadataSerialization edgeMetadataSerialization;
- private final EdgeManagerFactory edgeManagerFactory;
+ private final GraphManagerFactory graphManagerFactory;
private final Keyspace keyspace;
private final GraphFig graphFig;
@@ -46,12 +43,11 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
@Inject
public EdgeDeleteListener( final EdgeSerialization edgeSerialization,
final EdgeMetadataSerialization edgeMetadataSerialization,
- final EdgeManagerFactory edgeManagerFactory, final Keyspace keyspace,
- final GraphFig graphFig,
- @EdgeDelete final AsyncProcessor edgeDelete ) {
+ final GraphManagerFactory graphManagerFactory, final Keyspace keyspace,
+ @EdgeDelete final AsyncProcessor edgeDelete, final GraphFig graphFig ) {
this.edgeSerialization = edgeSerialization;
this.edgeMetadataSerialization = edgeMetadataSerialization;
- this.edgeManagerFactory = edgeManagerFactory;
+ this.graphManagerFactory = graphManagerFactory;
this.keyspace = keyspace;
this.graphFig = graphFig;
@@ -65,7 +61,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
final Edge edge = delete.getData();
final OrganizationScope scope = delete.getOrganizationScope();
final UUID maxVersion = edge.getVersion();
- final EdgeManager edgeManager = edgeManagerFactory.createEdgeManager( scope );
+ final GraphManager graphManager = graphManagerFactory.createEdgeManager( scope );
return Observable.from( edge ).flatMap( new Func1<Edge, Observable<MutationBatch>>() {
@@ -75,26 +71,26 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
final MutationBatch batch = keyspace.prepareMutationBatch();
- //go through every version of this edge <= the current version and remove it
- Observable<MarkedEdge> edges = Observable.create( new ObservableIterator<MarkedEdge>( ) {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
- return edgeSerialization.getEdgeToTarget( scope,
- new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
- edge.getVersion(), null ) );
- }
- } ).doOnNext( new Action1<MarkedEdge>() {
- @Override
- public void call( final MarkedEdge markedEdge ) {
- final MutationBatch delete = edgeSerialization.deleteEdge( scope, markedEdge );
- batch.mergeShallow( delete );
- }
- } );
-
+// TODO T.N. no longer needed since each version is explicity deleted
+// //go through every version of this edge <= the current version and remove it
+// Observable<MarkedEdge> edges = Observable.create( new ObservableIterator<MarkedEdge>() {
+// @Override
+// protected Iterator<MarkedEdge> getIterator() {
+// return edgeSerialization.getEdgeVersions( scope,
+// new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+// edge.getVersion(), null ) );
+// }
+// } ).doOnNext( new Action1<MarkedEdge>() {
+// @Override
+// public void call( final MarkedEdge markedEdge ) {
+// final MutationBatch delete = edgeSerialization.deleteEdge( scope, markedEdge );
+// batch.mergeShallow( delete );
+// }
+// } );
//search by edge type and target type. If any other edges with this target type exist,
// we can't delete it
- Observable<Integer> sourceIdType = edgeManager.loadEdgesFromSourceByType(
+ Observable<Integer> sourceIdType = graphManager.loadEdgesFromSourceByType(
new SimpleSearchByIdType( edge.getSourceNode(), edge.getType(), maxVersion,
edge.getTargetNode().getType(), null ) ).take( 2 ).count()
.doOnNext( new Action1<Integer>() {
@@ -115,7 +111,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
} );
- Observable<Integer> targetIdType = edgeManager.loadEdgesToTargetByType(
+ Observable<Integer> targetIdType = graphManager.loadEdgesToTargetByType(
new SimpleSearchByIdType( edge.getTargetNode(), edge.getType(), maxVersion,
edge.getSourceNode().getType(), null ) ).take( 2 ).count()
.doOnNext( new Action1<Integer>() {
@@ -138,7 +134,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
//search by edge type and target type. If any other edges with this target type exist,
// we can't delete it
- Observable<Integer> sourceType = edgeManager.loadEdgesFromSource(
+ Observable<Integer> sourceType = graphManager.loadEdgesFromSource(
new SimpleSearchByEdgeType( edge.getSourceNode(), edge.getType(), maxVersion, null ) ).take( 2 )
.count().doOnNext( new Action1<Integer>() {
@Override
@@ -156,7 +152,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
} );
- Observable<Integer> targetType = edgeManager.loadEdgesToTarget(
+ Observable<Integer> targetType = graphManager.loadEdgesToTarget(
new SimpleSearchByEdgeType( edge.getTargetNode(), edge.getType(), maxVersion, null ) ).take( 2 )
.count().doOnNext( new Action1<Integer>() {
@Override
@@ -175,10 +171,10 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
//no op, just wait for each observable to populate the mutation before returning it
- return Observable.zip( edges, sourceIdType, targetIdType, sourceType, targetType,
- new Func5<MarkedEdge, Integer, Integer, Integer, Integer, MutationBatch>() {
+ return Observable.zip(sourceIdType, targetIdType, sourceType, targetType,
+ new Func4<Integer, Integer, Integer, Integer, MutationBatch>() {
@Override
- public MutationBatch call( final MarkedEdge markedEdge, final Integer integer,
+ public MutationBatch call( final Integer integer,
final Integer integer2, final Integer integer3,
final Integer integer4 ) {
return batch;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
deleted file mode 100644
index 4e5746a..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl;
-
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.EdgeManager;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.SearchByIdType;
-import org.apache.usergrid.persistence.graph.SearchEdgeType;
-import org.apache.usergrid.persistence.graph.SearchIdType;
-import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
-import org.apache.usergrid.persistence.graph.consistency.AsynchronousMessage;
-import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
-import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
-import org.apache.usergrid.persistence.graph.guice.NodeDelete;
-import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
-import org.apache.usergrid.persistence.graph.serialization.util.EdgeUtils;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import rx.Observable;
-import rx.Scheduler;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
-
-
-/**
- *
- *
- */
-public class EdgeManagerImpl implements EdgeManager {
-
-
- private final OrganizationScope scope;
-
- private final EdgeMetadataSerialization edgeMetadataSerialization;
-
-
- private final EdgeSerialization edgeSerialization;
-
- private final NodeSerialization nodeSerialization;
-
- private final AsyncProcessor<Edge> edgeWriteAsyncProcessor;
- private final AsyncProcessor<Edge> edgeDeleteAsyncProcessor;
- private final AsyncProcessor<Id> nodeDeleteAsyncProcessor;
-
- private final GraphFig graphFig;
-
-
- @Inject
- public EdgeManagerImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
- final EdgeSerialization edgeSerialization, final NodeSerialization nodeSerialization,
- final GraphFig graphFig, @EdgeWrite final AsyncProcessor edgeWrite,
- @EdgeDelete final AsyncProcessor edgeDelete, @NodeDelete final AsyncProcessor nodeDelete,
- @Assisted final OrganizationScope scope ) {
-
- ValidationUtils.validateOrganizationScope( scope );
-
-
- this.scope = scope;
- this.edgeMetadataSerialization = edgeMetadataSerialization;
- this.edgeSerialization = edgeSerialization;
- this.nodeSerialization = nodeSerialization;
- this.graphFig = graphFig;
-
-
- this.edgeWriteAsyncProcessor = edgeWrite;
-
-
- this.edgeDeleteAsyncProcessor = edgeDelete;
-
-
- this.nodeDeleteAsyncProcessor = nodeDelete;
- }
-
-
- @Override
- public Observable<Edge> writeEdge( final Edge edge ) {
- EdgeUtils.validateEdge( edge );
-
- return Observable.from( edge ).subscribeOn( Schedulers.io() ).map( new Func1<Edge, Edge>() {
- @Override
- public Edge call( final Edge edge ) {
- final MutationBatch mutation = edgeMetadataSerialization.writeEdge( scope, edge );
-
- final MutationBatch edgeMutation = edgeSerialization.writeEdge( scope, edge );
-
- mutation.mergeShallow( edgeMutation );
-
- final AsynchronousMessage<Edge> event = edgeWriteAsyncProcessor.setVerification( edge, getTimeout() );
-
- try {
- mutation.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to cassandra", e );
- }
-
- edgeWriteAsyncProcessor.start( event );
-
- return edge;
- }
- } );
- }
-
-
- @Override
- public Observable<Edge> deleteEdge( final Edge edge ) {
- EdgeUtils.validateEdge( edge );
-
- return Observable.from( edge ).subscribeOn( Schedulers.io() ).map( new Func1<Edge, Edge>() {
- @Override
- public Edge call( final Edge edge ) {
- final MutationBatch edgeMutation = edgeSerialization.markEdge( scope, edge );
-
- final AsynchronousMessage<Edge> event = edgeDeleteAsyncProcessor.setVerification( edge, getTimeout() );
-
-
- try {
- edgeMutation.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to cassandra", e );
- }
-
- edgeDeleteAsyncProcessor.start( event );
-
-
- return edge;
- }
- } );
- }
-
-
- @Override
- public Observable<Id> deleteNode( final Id node ) {
- return Observable.from( node ).subscribeOn( Schedulers.io() ).map( new Func1<Id, Id>() {
- @Override
- public Id call( final Id id ) {
-
- //mark the node as deleted
- final UUID deleteTime = UUIDGenerator.newTimeUUID();
-
- final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, deleteTime );
-
- final AsynchronousMessage<Id> event = nodeDeleteAsyncProcessor.setVerification( node, getTimeout() );
-
-
- try {
- nodeMutation.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to cassandra", e );
- }
-
- nodeDeleteAsyncProcessor.start( event );
-
- return id;
- }
- } );
- }
-
-
- @Override
- public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
-
-
- return
-
- Observable.create( new ObservableIterator<MarkedEdge>() {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
- return edgeSerialization.getEdgesFromSource( scope, search );
- }
- } )//we intentionally use distinct until changed. This way we won't store all the keys since this
- //would hog far too much memory.
- .distinctUntilChanged( new Func1<Edge, Id>() {
- @Override
- public Id call( final Edge edge ) {
- return edge.getTargetNode();
- }
- } ).buffer( graphFig.getScanPageSize() )
- .flatMap( new EdgeBufferFilter( search.getMaxVersion() ) ).cast( Edge.class );
- }
-
-
- @Override
- public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
- return Observable.create( new ObservableIterator<MarkedEdge>() {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
- return edgeSerialization.getEdgesToTarget( scope, search );
- }
- } )
- //we intentionally use distinct until changed. This way we won't store all the keys since this
- //would hog far too much memory.
- .distinctUntilChanged( new Func1<Edge, Id>() {
- @Override
- public Id call( final Edge edge ) {
- return edge.getSourceNode();
- }
- } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxVersion() ) )
- .cast( Edge.class );
- }
-
-
- @Override
- public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
- return Observable.create( new ObservableIterator<MarkedEdge>() {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
- return edgeSerialization.getEdgesFromSourceByTargetType( scope, search );
- }
- } ).distinctUntilChanged( new Func1<Edge, Id>() {
- @Override
- public Id call( final Edge edge ) {
- return edge.getTargetNode();
- }
- } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxVersion() ) )
-
- .cast( Edge.class );
- }
-
-
- @Override
- public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
- return Observable.create( new ObservableIterator<MarkedEdge>() {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
- return edgeSerialization.getEdgesToTargetBySourceType( scope, search );
- }
- } ).distinctUntilChanged( new Func1<Edge, Id>() {
- @Override
- public Id call( final Edge edge ) {
- return edge.getSourceNode();
- }
- } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxVersion() ) )
- .cast( Edge.class );
- }
-
-
- @Override
- public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
-
- return Observable.create( new ObservableIterator<String>() {
- @Override
- protected Iterator<String> getIterator() {
- return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
- }
- } );
- }
-
-
- @Override
- public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
- return Observable.create( new ObservableIterator<String>() {
- @Override
- protected Iterator<String> getIterator() {
- return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
- }
- } );
- }
-
-
- @Override
- public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
-
- return Observable.create( new ObservableIterator<String>() {
- @Override
- protected Iterator<String> getIterator() {
- return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
- }
- } );
- }
-
-
- @Override
- public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
- return Observable.create( new ObservableIterator<String>() {
- @Override
- protected Iterator<String> getIterator() {
- return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
- }
- } );
- }
-
-
- /**
- * Get our timeout for write consistency
- */
- private long getTimeout() {
- return graphFig.getRepairTimeout() * 2;
- }
-
-
- /**
- * Helper filter to perform mapping and return an observable of pre-filtered edges
- */
- private class EdgeBufferFilter implements Func1<List<MarkedEdge>, Observable<MarkedEdge>> {
-
- private final UUID maxVersion;
-
-
- private EdgeBufferFilter( final UUID maxVersion ) {
- this.maxVersion = maxVersion;
- }
-
-
- /**
- * Takes a buffered list of marked edges. It then does a single round trip to fetch marked ids These are then
- * used in conjunction with the max version filter to filter any edges that should not be returned
- *
- * @return An observable that emits only edges that can be consumed. There could be multiple versions of the
- * same edge so those need de-duped.
- */
- @Override
- public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
-
- final Map<Id, UUID> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges );
- return Observable.from( markedEdges ).subscribeOn( Schedulers.io() )
- .filter( new EdgeFilter( this.maxVersion, markedVersions ) );
- }
- }
-
-
- /**
- * Filter the returned values based on the max uuid and if it's been marked for deletion or not
- */
- private static class EdgeFilter implements Func1<MarkedEdge, Boolean> {
-
- private final UUID maxVersion;
-
- private final Map<Id, UUID> markCache;
-
-
- private EdgeFilter( final UUID maxVersion, Map<Id, UUID> markCache ) {
- this.maxVersion = maxVersion;
- this.markCache = markCache;
- }
-
-
- @Override
- public Boolean call( final MarkedEdge edge ) {
-
-
- final UUID edgeVersion = edge.getVersion();
-
- //our edge needs to not be deleted and have a version that's > max Version
- if ( edge.isDeleted() || UUIDComparator.staticCompare( edgeVersion, maxVersion ) > 0 ) {
- return false;
- }
-
-
- final UUID sourceVersion = markCache.get( edge.getSourceNode() );
-
- //the source Id has been marked for deletion. It's version is <= to the marked version for deletion,
- // so we need to discard it
- if ( sourceVersion != null && UUIDComparator.staticCompare( edgeVersion, sourceVersion ) < 1 ) {
- return false;
- }
-
- final UUID targetVersion = markCache.get( edge.getTargetNode() );
-
- //the target Id has been marked for deletion. It's version is <= to the marked version for deletion,
- // so we need to discard it
- if ( targetVersion != null && UUIDComparator.staticCompare( edgeVersion, targetVersion ) < 1 ) {
- return false;
- }
-
-
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
index e146ba9..90146dc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.usergrid.persistence.graph.impl;
@@ -52,52 +71,55 @@ public class EdgeWriteListener implements MessageListener<EdgeEvent<Edge>, EdgeE
final OrganizationScope scope = write.getOrganizationScope();
final UUID maxVersion = edge.getVersion();
- return Observable.create( new ObservableIterator<MarkedEdge>( ) {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
-
- final SimpleSearchByEdge search =
- new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), maxVersion,
- null );
-
- return edgeSerialization.getEdgeFromSource( scope, search );
- }
- } ).filter( new Func1<MarkedEdge, Boolean>() {
-
- //TODO, reuse this for delete operation
-
-
- /**
- * We only want to return edges < this version so we remove them
- * @param markedEdge
- * @return
- */
- @Override
- public Boolean call( final MarkedEdge markedEdge ) {
- return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
- }
- //buffer the deletes and issue them in a single mutation
- } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, EdgeEvent<Edge>>() {
- @Override
- public EdgeEvent<Edge> call( final List<MarkedEdge> markedEdges ) {
-
- final MutationBatch batch = keyspace.prepareMutationBatch();
-
- for ( MarkedEdge edge : markedEdges ) {
- final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
-
- batch.mergeShallow( delete );
- }
-
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to issue write to cassandra", e );
- }
-
- return write;
- }
- } );
+ return Observable.empty();
+
+// TODO T.N, some async processing for balancing here
+// return Observable.create( new ObservableIterator<MarkedEdge>() {
+// @Override
+// protected Iterator<MarkedEdge> getIterator() {
+//
+// final SimpleSearchByEdge search =
+// new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), maxVersion,
+// null );
+//
+// return edgeSerialization.getEdgeVersions( scope, search );
+// }
+// } ).filter( new Func1<MarkedEdge, Boolean>() {
+//
+// //TODO, reuse this for delete operation
+//
+//
+// /**
+// * We only want to return edges < this version so we remove them
+// * @param markedEdge
+// * @return
+// */
+// @Override
+// public Boolean call( final MarkedEdge markedEdge ) {
+// return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
+// }
+// //buffer the deletes and issue them in a single mutation
+// } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, EdgeEvent<Edge>>() {
+// @Override
+// public EdgeEvent<Edge> call( final List<MarkedEdge> markedEdges ) {
+//
+// final MutationBatch batch = keyspace.prepareMutationBatch();
+//
+// for ( MarkedEdge edge : markedEdges ) {
+// final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
+//
+// batch.mergeShallow( delete );
+// }
+//
+// try {
+// batch.execute();
+// }
+// catch ( ConnectionException e ) {
+// throw new RuntimeException( "Unable to issue write to cassandra", e );
+// }
+//
+// return write;
+// }
+// } );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
new file mode 100644
index 0000000..94d4dc3
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.SearchEdgeType;
+import org.apache.usergrid.persistence.graph.SearchIdType;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.AsynchronousMessage;
+import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
+import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
+import org.apache.usergrid.persistence.graph.guice.NodeDelete;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+import org.apache.usergrid.persistence.graph.serialization.util.EdgeUtils;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+
+/**
+ * Implementation of graph edges
+ */
+public class GraphManagerImpl implements GraphManager {
+
+
+ private final OrganizationScope scope;
+
+ private final EdgeMetadataSerialization edgeMetadataSerialization;
+
+
+ private final EdgeSerialization edgeSerialization;
+
+ private final NodeSerialization nodeSerialization;
+
+ private final AsyncProcessor<Edge> edgeWriteAsyncProcessor;
+ private final AsyncProcessor<Edge> edgeDeleteAsyncProcessor;
+ private final AsyncProcessor<Id> nodeDeleteAsyncProcessor;
+
+ private final GraphFig graphFig;
+
+
+ @Inject
+ public GraphManagerImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
+ final EdgeSerialization edgeSerialization, final NodeSerialization nodeSerialization,
+ final GraphFig graphFig, @EdgeWrite final AsyncProcessor edgeWrite,
+ @EdgeDelete final AsyncProcessor edgeDelete, @NodeDelete final AsyncProcessor nodeDelete,
+ @Assisted final OrganizationScope scope ) {
+
+ ValidationUtils.validateOrganizationScope( scope );
+
+
+ this.scope = scope;
+ this.edgeMetadataSerialization = edgeMetadataSerialization;
+ this.edgeSerialization = edgeSerialization;
+ this.nodeSerialization = nodeSerialization;
+ this.graphFig = graphFig;
+
+
+ this.edgeWriteAsyncProcessor = edgeWrite;
+
+
+ this.edgeDeleteAsyncProcessor = edgeDelete;
+
+
+ this.nodeDeleteAsyncProcessor = nodeDelete;
+ }
+
+
+ @Override
+ public Observable<Edge> writeEdge( final Edge edge ) {
+ EdgeUtils.validateEdge( edge );
+
+ return Observable.from( edge ).subscribeOn( Schedulers.io() ).map( new Func1<Edge, Edge>() {
+ @Override
+ public Edge call( final Edge edge ) {
+ final MutationBatch mutation = edgeMetadataSerialization.writeEdge( scope, edge );
+
+ final MutationBatch edgeMutation = edgeSerialization.writeEdge( scope, edge );
+
+ mutation.mergeShallow( edgeMutation );
+
+ final AsynchronousMessage<Edge> event = edgeWriteAsyncProcessor.setVerification( edge, getTimeout() );
+
+ try {
+ mutation.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to cassandra", e );
+ }
+
+ edgeWriteAsyncProcessor.start( event );
+
+ return edge;
+ }
+ } );
+ }
+
+
+ @Override
+ public Observable<Edge> deleteEdge( final Edge edge ) {
+ EdgeUtils.validateEdge( edge );
+
+ return Observable.from( edge ).subscribeOn( Schedulers.io() ).map( new Func1<Edge, Edge>() {
+ @Override
+ public Edge call( final Edge edge ) {
+ final MutationBatch edgeMutation = edgeSerialization.markEdge( scope, edge );
+
+ final AsynchronousMessage<Edge> event = edgeDeleteAsyncProcessor.setVerification( edge, getTimeout() );
+
+
+ try {
+ edgeMutation.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to cassandra", e );
+ }
+
+ edgeDeleteAsyncProcessor.start( event );
+
+
+ return edge;
+ }
+ } );
+ }
+
+
+ @Override
+ public Observable<Id> deleteNode( final Id node ) {
+ return Observable.from( node ).subscribeOn( Schedulers.io() ).map( new Func1<Id, Id>() {
+ @Override
+ public Id call( final Id id ) {
+
+ //mark the node as deleted
+ final UUID deleteTime = UUIDGenerator.newTimeUUID();
+
+ final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, deleteTime );
+
+ final AsynchronousMessage<Id> event = nodeDeleteAsyncProcessor.setVerification( node, getTimeout() );
+
+
+ try {
+ nodeMutation.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to cassandra", e );
+ }
+
+ nodeDeleteAsyncProcessor.start( event );
+
+ return id;
+ }
+ } );
+ }
+
+
+ @Override
+ public Observable<Edge> loadEdgeVersions( final SearchByEdge searchByEdge ) {
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeSerialization.getEdgeVersions( scope, searchByEdge );
+ }
+ } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( searchByEdge.getMaxVersion() ) )
+ .cast( Edge.class );
+ }
+
+
+ @Override
+ public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeSerialization.getEdgesFromSource( scope, search );
+ }
+ } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxVersion() ) )
+ .cast( Edge.class );
+ }
+
+
+ @Override
+ public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeSerialization.getEdgesToTarget( scope, search );
+ }
+ } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxVersion() ) )
+ .cast( Edge.class );
+ }
+
+
+ @Override
+ public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeSerialization.getEdgesFromSourceByTargetType( scope, search );
+ }
+ } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxVersion() ) )
+
+ .cast( Edge.class );
+ }
+
+
+ @Override
+ public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeSerialization.getEdgesToTargetBySourceType( scope, search );
+ }
+ } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxVersion() ) )
+ .cast( Edge.class );
+ }
+
+
+ @Override
+ public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
+
+ return Observable.create( new ObservableIterator<String>() {
+ @Override
+ protected Iterator<String> getIterator() {
+ return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
+ }
+ } );
+ }
+
+
+ @Override
+ public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
+ return Observable.create( new ObservableIterator<String>() {
+ @Override
+ protected Iterator<String> getIterator() {
+ return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
+ }
+ } );
+ }
+
+
+ @Override
+ public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
+
+ return Observable.create( new ObservableIterator<String>() {
+ @Override
+ protected Iterator<String> getIterator() {
+ return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
+ }
+ } );
+ }
+
+
+ @Override
+ public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
+ return Observable.create( new ObservableIterator<String>() {
+ @Override
+ protected Iterator<String> getIterator() {
+ return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
+ }
+ } );
+ }
+
+
+ /**
+ * Get our timeout for write consistency
+ */
+ private long getTimeout() {
+ return graphFig.getRepairTimeout() * 2;
+ }
+
+
+ /**
+ * Helper filter to perform mapping and return an observable of pre-filtered edges
+ */
+ private class EdgeBufferFilter implements Func1<List<MarkedEdge>, Observable<MarkedEdge>> {
+
+ private final UUID maxVersion;
+
+
+ private EdgeBufferFilter( final UUID maxVersion ) {
+ this.maxVersion = maxVersion;
+ }
+
+
+ /**
+ * Takes a buffered list of marked edges. It then does a single round trip to fetch marked ids These are then
+ * used in conjunction with the max version filter to filter any edges that should not be returned
+ *
+ * @return An observable that emits only edges that can be consumed. There could be multiple versions of the
+ * same edge so those need de-duped.
+ */
+ @Override
+ public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
+
+ final Map<Id, UUID> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges );
+ return Observable.from( markedEdges )
+ .filter( new EdgeFilter( this.maxVersion, markedVersions ) );
+ }
+ }
+
+
+ /**
+ * Filter the returned values based on the max uuid and if it's been marked for deletion or not
+ */
+ private static class EdgeFilter implements Func1<MarkedEdge, Boolean> {
+
+ private final UUID maxVersion;
+
+ private final Map<Id, UUID> markCache;
+
+
+ private EdgeFilter( final UUID maxVersion, Map<Id, UUID> markCache ) {
+ this.maxVersion = maxVersion;
+ this.markCache = markCache;
+ }
+
+
+ @Override
+ public Boolean call( final MarkedEdge edge ) {
+
+
+ final UUID edgeVersion = edge.getVersion();
+
+ //our edge needs to not be deleted and have a version that's > max Version
+ if ( edge.isDeleted() || UUIDComparator.staticCompare( edgeVersion, maxVersion ) > 0 ) {
+ return false;
+ }
+
+
+ final UUID sourceVersion = markCache.get( edge.getSourceNode() );
+
+ //the source Id has been marked for deletion. It's version is <= to the marked version for deletion,
+ // so we need to discard it
+ if ( sourceVersion != null && UUIDComparator.staticCompare( edgeVersion, sourceVersion ) < 1 ) {
+ return false;
+ }
+
+ final UUID targetVersion = markCache.get( edge.getTargetNode() );
+
+ //the target Id has been marked for deletion. It's version is <= to the marked version for deletion,
+ // so we need to discard it
+ if ( targetVersion != null && UUIDComparator.staticCompare( edgeVersion, targetVersion ) < 1 ) {
+ return false;
+ }
+
+
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index 190bbff..4ba0142 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -2,11 +2,14 @@ package org.apache.usergrid.persistence.graph.impl;
import java.util.Iterator;
+import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.thrift.Mutation;
+
import org.apache.usergrid.persistence.collection.OrganizationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphFig;
@@ -26,11 +29,13 @@ import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Optional;
import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
-import rx.Scheduler;
import rx.functions.Action0;
+import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@ -48,16 +53,30 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
private final EdgeMetadataSerialization edgeMetadataSerialization;
private final EdgeDeleteRepair edgeDeleteRepair;
private final EdgeMetaRepair edgeMetaRepair;
+<<<<<<< Updated upstream
+=======
+ private final GraphFig graphFig;
+ protected final Keyspace keyspace;
+>>>>>>> Stashed changes
/**
* Wire the serialization dependencies
*/
@Inject
+<<<<<<< Updated upstream
public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization, @NodeDelete final AsyncProcessor nodeDelete,
final EdgeMetadataSerialization edgeMetadataSerialization,
final EdgeDeleteRepair edgeDeleteRepair, final EdgeMetaRepair edgeMetaRepair
) {
+=======
+ public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
+
+ final EdgeMetadataSerialization edgeMetadataSerialization,
+ final EdgeDeleteRepair edgeDeleteRepair, final EdgeMetaRepair edgeMetaRepair,
+ final GraphFig graphFig, @NodeDelete final AsyncProcessor nodeDelete,
+ final Keyspace keyspace ) {
+>>>>>>> Stashed changes
this.nodeSerialization = nodeSerialization;
@@ -65,6 +84,8 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
this.edgeMetadataSerialization = edgeMetadataSerialization;
this.edgeDeleteRepair = edgeDeleteRepair;
this.edgeMetaRepair = edgeMetaRepair;
+ this.graphFig = graphFig;
+ this.keyspace = keyspace;
nodeDelete.addListener( this );
}
@@ -72,8 +93,11 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
/**
* Removes this node from the graph.
+ *
* @param edgeEvent The edge event that was fired.
- * @return An observable that emits the total number of edges that have been removed with this node both as the target and source
+ *
+ * @return An observable that emits the total number of edges that have been removed with this node both as the
+ * target and source
*/
@Override
public Observable<Integer> receive( final EdgeEvent<Id> edgeEvent ) {
@@ -112,17 +136,9 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
@Override
public Observable<MarkedEdge> call( final String edgeType ) {
return loadEdgesToTarget( scope,
- new SimpleSearchByEdgeType( node, edgeType, version,
- null ) );
+ new SimpleSearchByEdgeType( node, edgeType, version, null ) );
}
- } )
- //filter "old" edges, since we'll be deleting them in 1 shot
- .distinctUntilChanged( new Func1<Edge, Id>() {
- @Override
- public Id call( final Edge edge ) {
- return edge.getSourceNode();
- }
- } );
+ } );
//get all edges pointing to the source node and buffer them into groups for deletion
@@ -132,60 +148,71 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
@Override
public Observable<MarkedEdge> call( final String edgeType ) {
return loadEdgesFromSource( scope,
- new SimpleSearchByEdgeType( node, edgeType, version,
- null ) );
+ new SimpleSearchByEdgeType( node, edgeType, version, null ) );
}
- } )
- //filter "old" edges, since we'll be deleting them in 1 shot
- .distinctUntilChanged( new Func1<Edge, Id>() {
- @Override
- public Id call( final Edge edge ) {
- return edge.getTargetNode();
- }
- } );
+ } );
//each time an edge is emitted, delete it via batch mutation since we'll already be buffered
- return Observable.concat( targetEdges, sourceEdges );
+ return Observable.merge( targetEdges, sourceEdges );
}
- } ).flatMap( new Func1<MarkedEdge, Observable<MarkedEdge>>() {
- @Override
- public Observable<MarkedEdge> call( final MarkedEdge edge ) {
-
- //delete the newest edge <= the version on the node delete
- LOG.debug( "Deleting edge {}", edge );
- return edgeDeleteRepair.repair( scope, edge );
-
-
- }
- } ).flatMap( new Func1<MarkedEdge, Observable<MarkedEdge>>() {
- @Override
- public Observable<MarkedEdge> call( final MarkedEdge edge ) {
+ } )
+ //buffer and delete marked edges in our buffer size
+ .buffer( graphFig.getScanPageSize() ).flatMap(
+ new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
+ @Override
+ public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+ for(MarkedEdge edge: markedEdges){
- //delete both the source and target meta data in parallel for the edge we deleted in the previous step
- //if nothing else is using them
- Observable<Integer> sourceMetaRepaired =
- edgeMetaRepair.repairSources( scope, edge.getSourceNode(), edge.getType(), version );
+ //delete the newest edge <= the version on the node delete
+ LOG.debug( "Deleting edge {}", edge );
+ final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
- Observable<Integer> targetMetaRepaired = edgeMetaRepair.repairTargets( scope,
- edge.getTargetNode(), edge.getType(), version );
+ batch.mergeShallow( delete );
+ }
- //sum up the number of subtypes we retain
- return Observable.concat(sourceMetaRepaired, targetMetaRepaired ).last().map( new Func1
- <Integer, MarkedEdge>() {
- @Override
- public MarkedEdge call( final Integer integer ) {
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to delete edges", e );
+ }
- LOG.debug( "Retained {} subtypes for edge {}", integer, edge );
-
- return edge;
+ return Observable.from(markedEdges);
}
- } );
- }
- } ).count()
- //if nothing is ever emitted, emit 0 so that we know no operations took place. Finally remove the target node in the mark
+ } )
+ .flatMap( new Func1<MarkedEdge, Observable<MarkedEdge>>() {
+ @Override
+ public Observable<MarkedEdge> call( final MarkedEdge edge ) {
+
+
+ return Observable.just( edge );
+// //delete both the source and target meta data in parallel for the edge we deleted in the previous step
+// //if nothing else is using them
+// Observable<Integer> sourceMetaRepaired =
+// edgeMetaRepair.repairSources( scope, edge.getSourceNode(), edge.getType(), version );
+//
+// Observable<Integer> targetMetaRepaired =
+// edgeMetaRepair.repairTargets( scope, edge.getTargetNode(), edge.getType(), version );
+//
+// //sum up the number of subtypes we retain
+// return Observable.concat( sourceMetaRepaired, targetMetaRepaired ).last()
+// .map( new Func1<Integer, MarkedEdge>() {
+// @Override
+// public MarkedEdge call( final Integer integer ) {
+//
+// LOG.debug( "Retained {} subtypes for edge {}", integer, edge );
+//
+// return edge;
+// }
+// } );
+ }
+ } ).count()
+ //if nothing is ever emitted, emit 0 so that we know no operations took place. Finally remove the
+ // target node in the mark
.defaultIfEmpty( 0 ).doOnCompleted( new Action0() {
@Override
public void call() {
@@ -193,7 +220,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
nodeSerialization.delete( scope, node, version ).execute();
}
catch ( ConnectionException e ) {
- throw new RuntimeException("Unable to delete marked graph node " + node, e);
+ throw new RuntimeException( "Unable to delete marked graph node " + node, e );
}
}
} );
@@ -254,6 +281,4 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
}
} ).subscribeOn( Schedulers.io() );
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
index 3ff7178..310fa1f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
@@ -77,15 +77,13 @@ public abstract class AbstractEdgeRepair {
final UUID maxVersion = edge.getVersion();
//get source edges
- Observable<MarkedEdge> sourceEdges = getEdgeVersionsFromSource( scope, edge );
+ Observable<MarkedEdge> edgeVersions = getEdgeVersions( scope, edge );
- //get target edges
- Observable<MarkedEdge> targetEdges = getEdgeVersionsToTarget( scope, edge );
//merge source and target then deal with the distinct values
- return Observable.merge( sourceEdges, targetEdges ).filter( getFilter( maxVersion ) ).distinctUntilChanged().buffer( graphFig.getScanPageSize() )
+ return edgeVersions.filter( getFilter( maxVersion ) ).buffer( graphFig.getScanPageSize() )
.flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
@Override
public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
@@ -122,7 +120,7 @@ public abstract class AbstractEdgeRepair {
/**
* Get all edge versions <= the specified max from the source
*/
- private Observable<MarkedEdge> getEdgeVersionsFromSource( final OrganizationScope scope, final Edge edge ) {
+ private Observable<MarkedEdge> getEdgeVersions( final OrganizationScope scope, final Edge edge ) {
return Observable.create( new ObservableIterator<MarkedEdge>( ) {
@Override
@@ -130,24 +128,7 @@ public abstract class AbstractEdgeRepair {
final SimpleSearchByEdge search = getSearchByEdge(edge);
- return edgeSerialization.getEdgeFromSource( scope, search );
- }
- } ).subscribeOn( Schedulers.io() );
- }
-
-
- /**
- * Get all edge versions <= the specified max from the source
- */
- private Observable<MarkedEdge> getEdgeVersionsToTarget( final OrganizationScope scope, final Edge edge ) {
-
- return Observable.create( new ObservableIterator<MarkedEdge>( ) {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
-
- final SimpleSearchByEdge search = getSearchByEdge(edge);
-
- return edgeSerialization.getEdgeToTarget( scope, search );
+ return edgeSerialization.getEdgeVersions( scope, search );
}
} ).subscribeOn( Schedulers.io() );
}