You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/08/17 21:18:00 UTC
[41/50] [abbrv] usergrid git commit: Fixing
https://issues.apache.org/jira/browse/USERGRID-1310. Also fixed an NPE found
during the fix. ( in abstract connection service when entity us null it
throws NPE. Changed it to throw 404)
Fixing https://issues.apache.org/jira/browse/USERGRID-1310.
Also fixed an NPE found during the fix. ( in abstract connection service when entity us null it throws NPE. Changed it to throw 404)
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b1157a89
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b1157a89
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b1157a89
Branch: refs/heads/datastax-cass-driver
Commit: b1157a8924686557e5c26966c5a0b14fb87eb2d6
Parents: 10e8957
Author: Ayesha Dastagiri <ay...@gmail.com>
Authored: Thu Aug 11 11:37:23 2016 -0700
Committer: Ayesha Dastagiri <ay...@gmail.com>
Committed: Thu Aug 11 11:37:23 2016 -0700
----------------------------------------------------------------------
.../corepersistence/CpRelationManager.java | 55 ++--
.../pipeline/builder/IdBuilder.java | 29 +-
.../pipeline/read/FilterFactory.java | 8 +
.../AbstractReadReverseGraphFilter.java | 291 +++++++++++++++++++
.../ReadGraphReverseConnectionFilter.java | 53 ++++
.../service/ConnectionSearch.java | 8 +-
.../service/ConnectionServiceImpl.java | 9 +-
.../org/apache/usergrid/persistence/Query.java | 39 ++-
.../persistence/EntityConnectionsIT.java | 67 ++++-
.../services/AbstractConnectionsService.java | 35 +--
.../usergrid/services/ConnectionsServiceIT.java | 74 ++++-
11 files changed, 559 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index b398562..57b1526 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -17,69 +17,48 @@
package org.apache.usergrid.corepersistence;
-import java.util.*;
-
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.index.CollectionSettings;
import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl;
-import org.apache.usergrid.corepersistence.results.IdQueryExecutor;
-import org.apache.usergrid.persistence.map.MapManager;
-import org.apache.usergrid.persistence.map.MapScope;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.Assert;
-
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor;
import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
+import org.apache.usergrid.corepersistence.results.IdQueryExecutor;
import org.apache.usergrid.corepersistence.service.CollectionSearch;
import org.apache.usergrid.corepersistence.service.CollectionService;
import org.apache.usergrid.corepersistence.service.ConnectionSearch;
import org.apache.usergrid.corepersistence.service.ConnectionService;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.ConnectedEntityRef;
-import org.apache.usergrid.persistence.ConnectionRef;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.Query.Level;
-import org.apache.usergrid.persistence.RelationManager;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.RoleRef;
-import org.apache.usergrid.persistence.Schema;
-import org.apache.usergrid.persistence.SimpleEntityRef;
-import org.apache.usergrid.persistence.SimpleRoleRef;
import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.entities.Group;
import org.apache.usergrid.persistence.entities.User;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.SearchByEdge;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.*;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.index.query.Identifier;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapScope;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.schema.CollectionInfo;
import org.apache.usergrid.utils.InflectionUtils;
import org.apache.usergrid.utils.MapUtils;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
import rx.Observable;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionEdge;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionEdge;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchByEdge;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType;
+import java.util.*;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.*;
import static org.apache.usergrid.persistence.Schema.*;
import static org.apache.usergrid.utils.ClassUtils.cast;
import static org.apache.usergrid.utils.InflectionUtils.singularize;
@@ -954,7 +933,7 @@ public class CpRelationManager implements RelationManager {
final Id sourceId = headEntity.asId();
final Optional<String> queryString = query.isGraphSearch()? Optional.<String>absent(): query.getQl();
-
+ final boolean isConnecting = query.isConnecting();
if ( query.getResultsLevel() == Level.REFS || query.getResultsLevel() == Level.IDS ) {
@@ -968,7 +947,7 @@ public class CpRelationManager implements RelationManager {
final ConnectionSearch search =
new ConnectionSearch( applicationScope, sourceId, entityType, connection, toExecute.getLimit(),
- queryString, cursor );
+ queryString, cursor, isConnecting );
return connectionService.searchConnectionAsRefs( search );
}
}.next();
@@ -983,7 +962,7 @@ public class CpRelationManager implements RelationManager {
//we need the callback so as we get a new cursor, we execute a new search and re-initialize our builders
final ConnectionSearch search =
new ConnectionSearch( applicationScope, sourceId, entityType, connection, toExecute.getLimit(),
- queryString, cursor );
+ queryString, cursor, isConnecting );
return connectionService.searchConnection( search );
}
}.next();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
index 781d7d5..b7d1f86 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
@@ -20,9 +20,10 @@
package org.apache.usergrid.corepersistence.pipeline.builder;
+import com.google.common.base.Optional;
+import org.apache.usergrid.corepersistence.pipeline.Pipeline;
import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
-import org.apache.usergrid.corepersistence.pipeline.Pipeline;
import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefFilter;
@@ -30,13 +31,9 @@ import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefRe
import org.apache.usergrid.corepersistence.pipeline.read.collect.IdResumeFilter;
import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.IdFilter;
import org.apache.usergrid.persistence.ConnectionRef;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-
import rx.Observable;
@@ -69,6 +66,28 @@ public class IdBuilder {
/**
+ * Traverse all connection edges to our input Id
+ * @param connectionName The name of the connection
+ * @param entityType The optional type of the entity
+ * @return
+ */
+ public IdBuilder traverseReverseConnection( final String connectionName, final Optional<String> entityType ) {
+
+ final PipelineOperation<FilterResult<Id>, FilterResult<Id>> filter;
+
+ if(entityType.isPresent()){
+ //todo: change this too.
+ filter = filterFactory.readGraphConnectionByTypeFilter( connectionName, entityType.get() );
+ }else{
+ filter = filterFactory.readGraphReverseConnectionFilter( connectionName );
+ }
+
+
+ return new IdBuilder( pipeline.withFilter(filter ), filterFactory );
+ }
+
+
+ /**
* Traverse all the collection edges from our input Id
* @param collectionName
* @return
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
index 883fdc8..4b615d8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -62,6 +62,14 @@ public interface FilterFactory {
*/
ReadGraphConnectionFilter readGraphConnectionFilter( final String connectionName );
+
+ /**
+ * Generate a new instance of the command with the specified parameters
+ *
+ * @param connectionName The connection name to use when reverse traversing the graph
+ */
+ ReadGraphReverseConnectionFilter readGraphReverseConnectionFilter( final String connectionName );
+
/**
* Generate a new instance of the command with the specified parameters
*
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
new file mode 100644
index 0000000..dcda98f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+import rx.functions.Func1;
+
+
+/**
+ * Command for reading graph edges in reverse order.
+ */
+public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<Id, Id, MarkedEdge> {
+
+ private static final Logger logger = LoggerFactory.getLogger( AbstractReadGraphFilter.class );
+
+ private final GraphManagerFactory graphManagerFactory;
+ private final RxTaskScheduler rxTaskScheduler;
+ private final EventBuilder eventBuilder;
+ private final AsyncEventService asyncEventService;
+
+
+ /**
+ * Create a new instance of our command
+ */
+ public AbstractReadReverseGraphFilter( final GraphManagerFactory graphManagerFactory,
+ final RxTaskScheduler rxTaskScheduler,
+ final EventBuilder eventBuilder,
+ final AsyncEventService asyncEventService ) {
+ this.graphManagerFactory = graphManagerFactory;
+ this.rxTaskScheduler = rxTaskScheduler;
+ this.eventBuilder = eventBuilder;
+ this.asyncEventService = asyncEventService;
+ }
+
+
+ @Override
+ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) {
+
+
+ final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
+ //get the graph manager
+ final GraphManager graphManager =
+ graphManagerFactory.createEdgeManager( applicationScope );
+
+
+ final String edgeName = getEdgeTypeName();
+ final EdgeState edgeCursorState = new EdgeState();
+
+
+ //return all ids that are emitted from this edge
+ return previousIds.flatMap( previousFilterValue -> {
+
+ //set our our constant state
+ final Optional<MarkedEdge> startFromCursor = getSeekValue();
+ final Id id = previousFilterValue.getValue();
+
+
+ final Optional<Edge> typeWrapper = Optional.fromNullable(startFromCursor.orNull());
+
+ /**
+ * We do not want to filter. This is intentional DO NOT REMOVE!!!
+ *
+ * We want to fire events on these edges if they exist, the delete was missed.
+ */
+ final SimpleSearchByEdgeType search =
+ new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+ typeWrapper, false );
+
+ /**
+ * TODO, pass a message with pointers to our cursor values to be generated later
+ */
+ return graphManager.loadEdgesToTarget( search ).filter(markedEdge -> {
+
+ final boolean isDeleted = markedEdge.isDeleted();
+ final boolean isSourceNodeDeleted = markedEdge.isSourceNodeDelete();
+ final boolean isTargetNodeDelete = markedEdge.isTargetNodeDeleted();
+
+
+ if (isDeleted) {
+
+ logger.info("Edge {} is deleted when seeking, deleting the edge", markedEdge);
+ final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
+
+ indexMessageObservable
+ .compose(applyCollector())
+ .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+ .subscribe();
+
+ }
+
+ if (isSourceNodeDeleted) {
+
+ final Id sourceNodeId = markedEdge.getSourceNode();
+ logger.info("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
+
+ final EventBuilderImpl.EntityDeleteResults
+ entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
+
+ entityDeleteResults.getIndexObservable()
+ .compose(applyCollector())
+ .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+ .subscribe();
+
+ Observable.merge(entityDeleteResults.getEntitiesDeleted(),
+ entityDeleteResults.getCompactedNode())
+ .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
+ subscribe();
+
+ }
+
+ if (isTargetNodeDelete) {
+
+ final Id targetNodeId = markedEdge.getTargetNode();
+ logger.info("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId);
+
+ final EventBuilderImpl.EntityDeleteResults
+ entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
+
+ entityDeleteResults.getIndexObservable()
+ .compose(applyCollector())
+ .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+ .subscribe();
+
+ Observable.merge(entityDeleteResults.getEntitiesDeleted(),
+ entityDeleteResults.getCompactedNode())
+ .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
+ subscribe();
+
+ }
+
+
+ //filter if any of them are marked
+ return !isDeleted && !isSourceNodeDeleted && !isTargetNodeDelete;
+
+
+ }) // any non-deleted edges should be de-duped here so the results are unique
+ .distinct( new EdgeDistinctKey() )
+ //set the edge state for cursors
+ .doOnNext( edge -> {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Seeking over edge {}", edge);
+ }
+ edgeCursorState.update( edge );
+ } )
+
+ //map our id from the target edge and set our cursor every edge we traverse
+ .map( edge -> createFilterResult( edge.getSourceNode(), edgeCursorState.getCursorEdge(),
+ previousFilterValue.getPath() ) );
+ } );
+ }
+
+
+ @Override
+ protected FilterResult<Id> createFilterResult( final Id emit, final MarkedEdge cursorValue,
+ final Optional<EdgePath> parent ) {
+
+ //if it's our first pass, there's no cursor to generate
+ if(cursorValue == null){
+ return new FilterResult<>( emit, parent );
+ }
+
+ return super.createFilterResult( emit, cursorValue, parent );
+ }
+
+
+ @Override
+ protected CursorSerializer<MarkedEdge> getCursorSerializer() {
+ return EdgeCursorSerializer.INSTANCE;
+ }
+
+
+ /**
+ * Get the edge type name we should use when traversing
+ */
+ protected abstract String getEdgeTypeName();
+
+
+ /**
+ * Wrapper class. Because edges seek > the last returned, we need to keep our n-1 value. This will be our cursor We
+ * always try to seek to the same position as we ended. Since we don't deal with a persistent read result, if we
+ * seek to a value = to our last, we may skip data.
+ */
+ private final class EdgeState {
+
+ private MarkedEdge cursorEdge = null;
+ private MarkedEdge currentEdge = null;
+
+
+ /**
+ * Update the pointers
+ */
+ private void update( final MarkedEdge newEdge ) {
+ cursorEdge = currentEdge;
+ currentEdge = newEdge;
+ }
+
+
+ /**
+ * Get the edge to use in cursors for resume
+ */
+ private MarkedEdge getCursorEdge() {
+ return cursorEdge;
+ }
+ }
+
+ private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() {
+
+ return observable -> observable
+ .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
+ .filter(msg -> !msg.isEmpty())
+ .doOnNext(indexOperation -> {
+ asyncEventService.queueIndexOperationMessage(indexOperation);
+ });
+
+ }
+
+ /**
+ * Return a key that Rx can use for determining a distinct edge. Build a string containing the UUID
+ * of the source and target nodes, with the type to ensure uniqueness rather than the int sum of the hash codes.
+ * Edge timestamp is specifically left out as edges with the same source,target,type but different timestamps
+ * are considered duplicates.
+ */
+ private class EdgeDistinctKey implements Func1<Edge,String> {
+
+ @Override
+ public String call(Edge edge) {
+
+ return buildDistinctKey(edge.getSourceNode().getUuid().toString(), edge.getTargetNode().getUuid().toString(),
+ edge.getType().toLowerCase());
+ }
+ }
+
+ protected static String buildDistinctKey(final String sourceNode, final String targetNode, final String type){
+
+ final String DISTINCT_KEY_SEPARATOR = ":";
+ StringBuilder stringBuilder = new StringBuilder();
+
+ stringBuilder
+ .append(sourceNode)
+ .append(DISTINCT_KEY_SEPARATOR)
+ .append(targetNode)
+ .append(DISTINCT_KEY_SEPARATOR)
+ .append(type);
+
+ return stringBuilder.toString();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphReverseConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphReverseConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphReverseConnectionFilter.java
new file mode 100644
index 0000000..aa369c2
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphReverseConnectionFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType;
+
+/**
+ * Created by ayeshadastagiri on 8/9/16.
+ */
+public class ReadGraphReverseConnectionFilter extends AbstractReadReverseGraphFilter{
+ private final String connectionName;
+
+ /**
+ * Create a new instance of our command
+ */
+ @Inject
+ public ReadGraphReverseConnectionFilter( final GraphManagerFactory graphManagerFactory,
+ @AsyncRepair final RxTaskScheduler rxTaskScheduler,
+ final EventBuilder eventBuilder,
+ final AsyncEventService asyncEventService,
+ @Assisted final String connectionName ) {
+ super( graphManagerFactory, rxTaskScheduler, eventBuilder, asyncEventService );
+ this.connectionName = connectionName;
+ }
+ @Override
+ protected String getEdgeTypeName() {
+ return getEdgeTypeFromConnectionType( connectionName ); }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java
index 51f6768..8ad57fb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java
@@ -36,11 +36,12 @@ public class ConnectionSearch {
private final int limit;
private final Optional<String> query;
private final Optional<String> cursor;
+ private final boolean isConnecting;
public ConnectionSearch( final ApplicationScope applicationScope, final Id sourceNodeId, final Optional<String> entityType,
final String connectionName, final int limit, final Optional<String> query, final
- Optional<String> cursor ) {
+ Optional<String> cursor, boolean isConnecting ) {
this.applicationScope = applicationScope;
this.sourceNodeId = sourceNodeId;
this.entityType = entityType;
@@ -48,6 +49,7 @@ public class ConnectionSearch {
this.limit = limit;
this.query = query;
this.cursor = cursor;
+ this.isConnecting = isConnecting;
}
@@ -84,4 +86,8 @@ public class ConnectionSearch {
public Optional<String> getEntityType() {
return entityType;
}
+
+ public boolean getIsConnecting(){
+ return isConnecting;
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
index 4b7e66c..926c676 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
@@ -94,8 +94,13 @@ public class ConnectionServiceImpl implements ConnectionService {
if ( !query.isPresent() ) {
- results =
- pipelineBuilder.traverseConnection( search.getConnectionName(), search.getEntityType() ).loadEntities();
+ if(search.getIsConnecting()){
+ results = pipelineBuilder.traverseReverseConnection(search.getConnectionName(), search.getEntityType()).loadEntities();
+ }
+ else {
+ results =
+ pipelineBuilder.traverseConnection(search.getConnectionName(), search.getEntityType()).loadEntities();
+ }
}
else {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
index 150a1b0..d68c085 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
@@ -19,36 +19,25 @@
package org.apache.usergrid.persistence;
-import java.io.IOException;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.codec.binary.Base64;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
import org.apache.commons.lang.StringUtils;
-
import org.apache.usergrid.persistence.index.SelectFieldMapping;
import org.apache.usergrid.persistence.index.exceptions.QueryParseException;
import org.apache.usergrid.persistence.index.query.CounterResolution;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.index.query.tree.Operand;
import org.apache.usergrid.persistence.index.utils.ClassUtils;
-import org.apache.usergrid.persistence.index.utils.ConversionUtils;
import org.apache.usergrid.persistence.index.utils.ListUtils;
import org.apache.usergrid.persistence.index.utils.MapUtils;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Optional;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.*;
+import java.util.Map.Entry;
public class Query {
@@ -82,6 +71,7 @@ public class Query {
private Long startTime;
private Long finishTime;
private boolean pad;
+ private boolean connecting = false;
private CounterResolution resolution = CounterResolution.ALL;
private List<Identifier> identifiers;
private List<CounterFilterPredicate> counterFilters;
@@ -611,6 +601,15 @@ public class Query {
this.pad = pad;
}
+ //set the flag to retrieve the edges in the reverse direction.
+ public void setConnecting( boolean connecting ) {
+ this.connecting = connecting;
+ }
+
+ public boolean isConnecting() {
+ return connecting;
+ }
+
public void setResolution( CounterResolution resolution ) {
this.resolution = resolution;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
index be2f06e..3d4e53c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
@@ -17,24 +17,17 @@
package org.apache.usergrid.persistence;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.persistence.Query.Level;
+import org.apache.usergrid.persistence.entities.User;
+import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.AbstractCoreIT;
-import org.apache.usergrid.persistence.entities.User;
-import org.apache.usergrid.persistence.Query.Level;
+import java.util.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
public class EntityConnectionsIT extends AbstractCoreIT {
private static final Logger logger = LoggerFactory.getLogger( EntityConnectionsIT.class );
@@ -335,6 +328,54 @@ public class EntityConnectionsIT extends AbstractCoreIT {
assertEquals( "user", res.getEntity().getType() );
}
+ //not required . addd tests at service layer.
+ @Ignore
+ @Test
+ public void testGetConnectingEntitiesCursor() throws Exception {
+
+ UUID applicationId = app.getId( );
+ assertNotNull( applicationId );
+
+ EntityManager em = app.getEntityManager();
+ assertNotNull( em );
+
+ User fred = new User();
+ fred.setUsername( "fred" );
+ fred.setEmail( "fred@flintstones.com" );
+ Entity fredEntity = em.create( fred );
+ assertNotNull( fredEntity );
+
+ User wilma = new User();
+ wilma.setUsername( "wilma" );
+ wilma.setEmail( "wilma@flintstones.com" );
+ Entity wilmaEntity = em.create( wilma );
+ assertNotNull( wilmaEntity );
+
+ User John = new User();
+ John.setUsername( "John" );
+ John.setEmail( "John@flintstones.com" );
+ Entity JohnEntity = em.create( John );
+ assertNotNull( JohnEntity );
+
+ em.createConnection( fredEntity, "likes", wilmaEntity );
+ em.createConnection( fredEntity, "likes", JohnEntity );
+
+
+ app.refreshIndex();
+
+ // now query via the testConnection, this should work
+
+ Query query = Query.fromQLNullSafe("" );
+ query.setConnectionType( "likes" );
+// query.setConnecting(true);
+ query.setEntityType( "user" );
+
+ // goes through "traverseReverseConnection"
+ Results r = em.searchTargetEntities(fredEntity, query);
+
+ assertEquals( 2, r.size() );
+ }
+
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
index 83549dd..0a9f6a7 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
@@ -17,21 +17,8 @@
package org.apache.usergrid.services;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.ConnectionRef;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.Query.Level;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.Schema;
-import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.services.ServiceParameter.IdParameter;
import org.apache.usergrid.services.ServiceParameter.NameParameter;
@@ -39,10 +26,15 @@ import org.apache.usergrid.services.ServiceParameter.QueryParameter;
import org.apache.usergrid.services.ServiceResults.Type;
import org.apache.usergrid.services.exceptions.ServiceResourceNotFoundException;
import org.apache.usergrid.services.exceptions.UnsupportedServiceOperationException;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.schedulers.Schedulers;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
import static org.apache.usergrid.services.ServiceParameter.filter;
import static org.apache.usergrid.services.ServiceParameter.firstParameterIsName;
import static org.apache.usergrid.utils.ClassUtils.cast;
@@ -307,6 +299,7 @@ public class AbstractConnectionsService extends AbstractService {
Results r = null;
if ( connecting() ) {
+ query.setConnecting(true);
if ( query.hasQueryPredicates() ) {
if (logger.isTraceEnabled()) {
logger.trace("Attempted query of backwards connections");
@@ -314,13 +307,7 @@ public class AbstractConnectionsService extends AbstractService {
return null;
}
else {
-// r = em.getSourceEntities( context.getOwner().getUuid(), query.getConnectionType(),
-// query.getEntityType(), level );
- // usergrid-2389: User defined limit in the query is ignored. Fixed it by adding
- // the limit to the method parameter downstream.
- r = em.getSourceEntities(
- new SimpleEntityRef(context.getOwner().getType(), context.getOwner().getUuid()),
- query.getConnectionType(), query.getEntityType(), level, query.getLimit());
+ r = em.searchTargetEntities(context.getOwner(),query);
}
}
else {
@@ -381,6 +368,10 @@ public class AbstractConnectionsService extends AbstractService {
}
else {
entity = em.create( query.getEntityType(), context.getProperties() );
+ //if entity is null here it throws NPE. Fixing it to throw 404.
+ if ( entity == null ) {
+ throw new ServiceResourceNotFoundException( context );
+ }
}
entity = importEntity( context, entity );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/services/src/test/java/org/apache/usergrid/services/ConnectionsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/ConnectionsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/ConnectionsServiceIT.java
index a1f19d4..4e65f54 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/ConnectionsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/ConnectionsServiceIT.java
@@ -17,19 +17,17 @@
package org.apache.usergrid.services;
-import java.util.Map;
-
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.Query;
import org.junit.Assert;
import org.junit.Test;
-
-import org.apache.usergrid.persistence.Entity;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
public class ConnectionsServiceIT extends AbstractServiceIT {
@@ -86,6 +84,66 @@ public class ConnectionsServiceIT extends AbstractServiceIT {
app.testRequest( ServiceAction.POST, 1, "users", "conn-user1", "manages", "user" );
}
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testUserConnectionsCursor() throws Exception {
+ app.put("username", "conn-user1");
+ app.put("email", "conn-user1@apigee.com");
+
+ Entity user1 = app.testRequest(ServiceAction.POST, 1, "users").getEntity();
+ assertNotNull(user1);
+
+ app.testRequest(ServiceAction.GET, 1, "users", "conn-user1");
+
+ app.put("username", "conn-user2");
+ app.put("email", "conn-user2@apigee.com");
+
+ Entity user2 = app.testRequest(ServiceAction.POST, 1, "users").getEntity();
+ assertNotNull(user2);
+
+
+ app.put("username", "conn-user3");
+ app.put("email", "conn-user3@apigee.com");
+
+ Entity user3 = app.testRequest(ServiceAction.POST, 1, "users").getEntity();
+ assertNotNull(user3);
+
+
+ //POST users/conn-user2/manages/user2/conn-user1
+ app.testRequest(ServiceAction.POST, 1, "users", "conn-user2", "likes", "users", "conn-user1");
+ //POST users/conn-user3/reports/users/conn-user1
+ app.testRequest(ServiceAction.POST, 1, "users", "conn-user3", "likes", "users", "conn-user1");
+
+ Query query = new Query().fromQLNullSafe("");
+ query.setLimit(1);
+
+ //the result should return a valid cursor.
+ ServiceResults result = app.testRequest(ServiceAction.GET, 1, "users", "conn-user1", "connecting", "likes",query);
+ assertNotNull(result.getCursor());
+ String enityName1 = result.getEntity().getProperty("email").toString();
+
+ Query newquery = new Query().fromQLNullSafe("");
+ query.setCursor(result.getCursor());
+ result = app.testRequest(ServiceAction.GET,1,"users","conn-user1","connecting","likes",query);
+ String enityName2 = result.getEntity().getProperty("email").toString();
+
+ //ensure the two entities returned in above requests are different.
+ assertNotEquals(enityName1,enityName2);
+
+ newquery = new Query().fromQLNullSafe("");
+ query.setCursor(result.getCursor());
+ result = app.testRequest(ServiceAction.GET,0,"users","conn-user1","connecting","likes",query);
+ //return empty cursor when no more entitites found.
+ assertNull(result.getCursor());
+
+ //DELETE users/conn-user1/manages/user2/conn-user2 (qualified by collection type on second entity)
+ app.testRequest(ServiceAction.DELETE, 1, "users", "conn-user2", "likes", "users", "conn-user1");
+
+ app.testRequest(ServiceAction.GET,1,"users","conn-user1","connecting","likes");
+
+
+ }
+
@Test
public void testNonExistentEntity() throws Exception {