You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/26 23:17:06 UTC
[02/11] incubator-usergrid git commit: Refactors operations into
easier build pattern. Pipeline still need some work.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
new file mode 100644
index 0000000..d0b6af9
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.search;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from an incoming CandidateResult emissions into entities, then streams them on
+ * performs internal buffering for efficiency. Note that all entities may not be emitted if our load crosses page boundaries. It is up to the
+ * collector to determine when to stop streaming entities.
+ */
+public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate>, FilterResult<Entity>> {
+
+ private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+ private final EntityIndexFactory entityIndexFactory;
+
+
+ @Inject
+ public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final EntityIndexFactory entityIndexFactory ) {
+ this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ this.entityIndexFactory = entityIndexFactory;
+ }
+
+
+ @Override
+ public Observable<FilterResult<Entity>> call(
+ final Observable<FilterResult<Candidate>> candidateResultsObservable ) {
+
+
+ /**
+ * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results
+ * objects
+ */
+
+ final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
+ final EntityCollectionManager entityCollectionManager =
+ entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+
+ final ApplicationEntityIndex applicationIndex =
+ entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+ //buffer them to get a page size we can make 1 network hop
+ final Observable<FilterResult<Entity>> searchIdSetObservable = candidateResultsObservable.buffer( pipelineContext.getLimit() )
+
+ //load them
+ .flatMap( candidateResults -> {
+ //flatten toa list of ids to load
+ final Observable<List<Id>> candidateIds =
+ Observable.from( candidateResults ).map( filterResultCandidate -> filterResultCandidate.getValue().getCandidateResult().getId() ).toList();
+
+ //load the ids
+ final Observable<EntitySet> entitySetObservable =
+ candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) );
+
+ //now we have a collection, validate our canidate set is correct.
+
+ return entitySetObservable.map(
+ entitySet -> new EntityVerifier( applicationIndex.createBatch(), entitySet,
+ candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() )
+ .flatMap(
+ entityCollector -> Observable.from( entityCollector.getResults() ) );
+ } );
+
+ //if we filter all our results, we want to continue to try the next page
+ return searchIdSetObservable;
+ }
+
+
+
+
+ /**
+ * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally
+ */
+ private static final class EntityVerifier {
+
+ private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
+ private List<FilterResult<Entity>> results = new ArrayList<>();
+
+ private final EntityIndexBatch batch;
+ private final List<FilterResult<Candidate>> candidateResults;
+ private final EntitySet entitySet;
+
+
+ public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet,
+ final List<FilterResult<Candidate>> candidateResults ) {
+ this.batch = batch;
+ this.entitySet = entitySet;
+ this.candidateResults = candidateResults;
+ this.results = new ArrayList<>( entitySet.size() );
+ }
+
+
+ /**
+ * Merge our candidates and our entity set into results
+ */
+ public void merge() {
+
+ for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
+ validate( candidateResult );
+ }
+
+ batch.execute();
+ }
+
+
+ public List<FilterResult<Entity>> getResults() {
+ return results;
+ }
+
+
+ public EntityIndexBatch getBatch() {
+ return batch;
+ }
+
+
+ private void validate( final FilterResult<Candidate> filterResult ) {
+
+ final Candidate candidate = filterResult.getValue();
+ final CandidateResult candidateResult = candidate.getCandidateResult();
+ final SearchEdge searchEdge = candidate.getSearchEdge();
+ final Id candidateId = candidateResult.getId();
+ final UUID candidateVersion = candidateResult.getVersion();
+
+
+ final MvccEntity entity = entitySet.getEntity( candidateId );
+
+
+ //doesn't exist warn and drop
+ if ( entity == null ) {
+ logger.warn(
+ "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra."
+ + " Ignoring since this could be a region sync issue",
+ candidateId, candidateVersion );
+
+
+ //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+ return;
+
+ }
+
+
+ final UUID entityVersion = entity.getVersion();
+ final Id entityId = entity.getId();
+
+
+
+
+
+ //entity is newer than ES version, could be an update or the entity is marked as deleted
+ if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) {
+
+ logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
+ new Object[] { searchEdge, entityId, entityVersion } );
+ batch.deindex( searchEdge, entityId, candidateVersion );
+ return;
+ }
+
+ //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
+ //remove the ES record, since the read in cass should cause a read repair, just ignore
+ if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
+
+ logger.warn(
+ "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair "
+ + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
+
+ //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+ return;
+ }
+
+ //they're the same add it
+
+ final Entity returnEntity = entity.getEntity().get();
+
+ final Optional<EdgePath> parent = filterResult.getPath();
+
+ final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
+
+ results.add( toReturn );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
new file mode 100644
index 0000000..1ef358a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.search;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.VersionSet;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Responsible for verifying candidate result versions, then emitting the Ids of these versions Input is a batch of
+ * candidate results, output is a stream of validated Ids
+ */
+public class CandidateIdFilter extends AbstractFilter<FilterResult<Candidate>, FilterResult<Id>> {
+
+ private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+ private final EntityIndexFactory entityIndexFactory;
+
+
+ @Inject
+ public CandidateIdFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final EntityIndexFactory entityIndexFactory ) {
+ this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ this.entityIndexFactory = entityIndexFactory;
+ }
+
+
+ @Override
+ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) {
+
+
+ /**
+ * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results
+ * objects
+ */
+
+ final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
+ final EntityCollectionManager entityCollectionManager =
+ entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+
+ final ApplicationEntityIndex applicationIndex =
+ entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+ final Observable<FilterResult<Id>> searchIdSetObservable =
+ filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( candidateResults -> {
+ //flatten toa list of ids to load
+ final Observable<List<Id>> candidateIds = Observable.from( candidateResults ).map(
+ candidate -> candidate.getValue().getCandidateResult().getId() ).toList();
+
+ //load the ids
+ final Observable<VersionSet> versionSetObservable =
+ candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
+
+ //now we have a collection, validate our canidate set is correct.
+
+ return versionSetObservable.map(
+ entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet,
+ candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+ entityCollector -> Observable.from( entityCollector.collectResults() ) );
+ } );
+
+ return searchIdSetObservable;
+ }
+
+
+ /**
+ * Map a new cp entity to an old entity. May be null if not present
+ */
+ private static final class EntityCollector {
+
+ private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
+ private List<FilterResult<Id>> results = new ArrayList<>();
+
+ private final EntityIndexBatch batch;
+ private final List<FilterResult<Candidate>> candidateResults;
+ private final VersionSet versionSet;
+
+
+ public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet,
+ final List<FilterResult<Candidate>> candidateResults ) {
+ this.batch = batch;
+ this.versionSet = versionSet;
+ this.candidateResults = candidateResults;
+ this.results = new ArrayList<>( versionSet.size() );
+ }
+
+
+ /**
+ * Merge our candidates and our entity set into results
+ */
+ public void merge() {
+
+ for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
+ validate( candidateResult );
+ }
+
+ batch.execute();
+ }
+
+
+ public List<FilterResult<Id>> collectResults() {
+ return results;
+ }
+
+
+ /**
+ * Validate each candidate results vs the data loaded from cass
+ */
+ private void validate( final FilterResult<Candidate> filterCandidate ) {
+
+ final CandidateResult candidateResult = filterCandidate.getValue().getCandidateResult();
+
+ final SearchEdge searchEdge = filterCandidate.getValue().getSearchEdge();
+
+ final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() );
+
+ final UUID candidateVersion = candidateResult.getVersion();
+
+ final UUID entityVersion = logEntry.getVersion();
+
+ final Id entityId = logEntry.getEntityId();
+
+ //entity is newer than ES version
+ if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) {
+
+ logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
+ new Object[] { searchEdge, entityId, entityVersion } );
+ batch.deindex( searchEdge, entityId, entityVersion );
+ return;
+ }
+
+ //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
+ //remove the ES record, since the read in cass should cause a read repair, just ignore
+ if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
+
+ logger.warn(
+ "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair "
+ + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
+ }
+
+ //they're the same add it
+
+ final FilterResult<Id> result = new FilterResult<>( entityId, filterCandidate.getPath() );
+
+ results.add( result );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/ElasticsearchCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/ElasticsearchCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/ElasticsearchCursorSerializer.java
new file mode 100644
index 0000000..7cf5a78
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/ElasticsearchCursorSerializer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.search;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
+
+
+/**
+ * ElasticSearch cursor serializer
+ */
+public class ElasticsearchCursorSerializer extends AbstractCursorSerializer<Integer> {
+
+
+ public static final ElasticsearchCursorSerializer INSTANCE = new ElasticsearchCursorSerializer();
+
+ @Override
+ protected Class<Integer> getType() {
+ return Integer.class;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Elasticsearchdiagram.jpg
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Elasticsearchdiagram.jpg b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Elasticsearchdiagram.jpg
new file mode 100644
index 0000000..08970e3
Binary files /dev/null and b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Elasticsearchdiagram.jpg differ
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchCollectionFilter.java
new file mode 100644
index 0000000..a6edd56
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchCollectionFilter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.search;
+
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionSearchEdge;
+
+
+public class SearchCollectionFilter extends AbstractElasticSearchFilter {
+
+ private final String collectionName;
+ private final String entityType;
+
+ /**
+ * Create a new instance of our command
+ *
+ * @param entityIndexFactory The entity index factory used to search
+ * @param metricsFactory The metrics factory for metrics
+ * @param collectionName The name of the collection
+ * @param entityType The entity type
+ */
+ @Inject
+ public SearchCollectionFilter( final EntityIndexFactory entityIndexFactory, final MetricsFactory metricsFactory,
+ @Assisted( "query" ) final String query,
+ @Assisted( "collectionName" ) final String collectionName,
+ @Assisted( "entityType" ) final String entityType ) {
+ super( entityIndexFactory, metricsFactory, query );
+ this.collectionName = collectionName;
+ this.entityType = entityType;
+ }
+
+
+
+ @Override
+ protected SearchTypes getSearchTypes() {
+ final SearchTypes types = SearchTypes.fromTypes( entityType );
+
+ return types;
+ }
+
+
+ @Override
+ protected SearchEdge getSearchEdge( final Id incomingId ) {
+ final SearchEdge searchEdge = createCollectionSearchEdge( incomingId, collectionName );
+
+ return searchEdge;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchConnectionFilter.java
new file mode 100644
index 0000000..82d7377
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchConnectionFilter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.search;
+
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchEdge;
+
+
+public class SearchConnectionFilter extends AbstractElasticSearchFilter {
+
+
+ private final String connectionName;
+ private final Optional<String> connectedEntityType;
+
+
+ /**
+ * Create a new instance of our command
+ */
+ @Inject
+ public SearchConnectionFilter( final EntityIndexFactory entityIndexFactory, final MetricsFactory metricsFactory,
+ @Assisted( "query" ) final String query,
+ @Assisted( "connectionName" ) final String connectionName,
+ @Assisted( "connectedEntityType" ) final Optional<String> connectedEntityType ) {
+ super( entityIndexFactory, metricsFactory, query );
+
+ this.connectionName = connectionName;
+ this.connectedEntityType = connectedEntityType;
+ }
+
+
+ @Override
+ protected SearchTypes getSearchTypes() {
+ final SearchTypes searchTypes = SearchTypes.fromNullableTypes( connectedEntityType.orNull() );
+
+ return searchTypes;
+ }
+
+
+ @Override
+ protected SearchEdge getSearchEdge( final Id id ) {
+ final SearchEdge searchEdge = createConnectionSearchEdge( id, connectionName );
+
+ return searchEdge;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphEdgeByIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphEdgeByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphEdgeByIdFilter.java
new file mode 100644
index 0000000..5b3a42e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphEdgeByIdFilter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+
+/**
+ * Filter should take and Id and a graph edge, and ensure the connection between the two exists
+ */
+public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractFilter<FilterResult<Id>, FilterResult<Id>> implements
+ PipelineOperation<FilterResult<Id>, FilterResult<Id>> {
+
+ private final GraphManagerFactory graphManagerFactory;
+ private final Id targetId;
+
+
+ @Inject
+ public AbstractReadGraphEdgeByIdFilter( final GraphManagerFactory graphManagerFactory, @Assisted final Id
+ targetId ) {
+ this.graphManagerFactory = graphManagerFactory;
+ this.targetId = targetId;
+ }
+
+
+ @Override
+ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
+
+ final GraphManager gm = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
+
+ return filterValueObservable.flatMap( filterValue -> {
+ final String edgeTypeName = getEdgeName();
+ final Id id = filterValue.getValue();
+
+ //create our search
+ final SearchByEdge searchByEdge =
+ new SimpleSearchByEdge( id, edgeTypeName, targetId, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+ Optional.absent() );
+
+ //load the versions of the edge, take the first since that's all we need to validate existence, then emit the target node
+ return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() ).map( targetId -> new FilterResult<>(targetId, filterValue.getPath()));
+ } );
+ }
+
+
+ /**
+ * Get the name of the edge to be used in the seek
+ */
+ protected abstract String getEdgeName();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
new file mode 100644
index 0000000..f477092
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * Command for reading graph edges
+ */
+public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> {
+
+ private final GraphManagerFactory graphManagerFactory;
+
+
+ /**
+ * Create a new instance of our command
+ */
+ public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory ) {
+ this.graphManagerFactory = graphManagerFactory;
+ }
+
+
+ @Override
+ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) {
+
+
+ //get the graph manager
+ final GraphManager graphManager =
+ graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
+
+
+ final String edgeName = getEdgeTypeName();
+ final EdgeState edgeCursorState = new EdgeState();
+
+
+ //return all ids that are emitted from this edge
+ return previousIds.flatMap( previousFilterValue -> {
+
+ //set our our constant state
+ final Optional<Edge> startFromCursor = getSeekValue();
+ final Id id = previousFilterValue.getValue();
+
+
+ final SimpleSearchByEdgeType search =
+ new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+ startFromCursor );
+
+ /**
+ * TODO, pass a message with pointers to our cursor values to be generated later
+ */
+ return graphManager.loadEdgesFromSource( search )
+ //set the edge state for cursors
+ .doOnNext( edge -> edgeCursorState.update( edge ) )
+
+ //map our id from the target edge and set our cursor every edge we traverse
+ .map( edge -> createFilterResult( edge.getTargetNode(), edgeCursorState.getCursorEdge(),
+ previousFilterValue.getPath() ) );
+ } );
+ }
+
+
+ @Override
+ protected FilterResult<Id> createFilterResult( final Id emit, final Edge cursorValue,
+ final Optional<EdgePath> parent ) {
+
+ //if it's our first pass, there's no cursor to generate
+ if(cursorValue == null){
+ return new FilterResult<>( emit, parent );
+ }
+
+ return super.createFilterResult( emit, cursorValue, parent );
+ }
+
+
+ @Override
+ protected CursorSerializer<Edge> getCursorSerializer() {
+ return EdgeCursorSerializer.INSTANCE;
+ }
+
+
+ /**
+ * Get the edge type name we should use when traversing
+ */
+ protected abstract String getEdgeTypeName();
+
+
+ /**
+ * Wrapper class. Because edges seek > the last returned, we need to keep our n-1 value. This will be our cursor We
+ * always try to seek to the same position as we ended. Since we don't deal with a persistent read result, if we
+ * seek to a value = to our last, we may skip data.
+ */
+ private final class EdgeState {
+
+ private Edge cursorEdge = null;
+ private Edge currentEdge = null;
+
+
+ /**
+ * Update the pointers
+ */
+ private void update( final Edge newEdge ) {
+ cursorEdge = currentEdge;
+ currentEdge = newEdge;
+ }
+
+
+ /**
+ * Get the edge to use in cursors for resume
+ */
+ private Edge getCursorEdge() {
+ return cursorEdge;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
new file mode 100644
index 0000000..8d9bf6f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+
+
+/**
+ * Edge cursor serializer
+ */
+public class EdgeCursorSerializer extends AbstractCursorSerializer<Edge> {
+
+
+ public static final EdgeCursorSerializer INSTANCE = new EdgeCursorSerializer();
+
+ @Override
+ protected Class<SimpleEdge> getType() {
+ return SimpleEdge.class;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityIdFilter.java
new file mode 100644
index 0000000..003038a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityIdFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+
+/**
+ * This command is a stopgap to make migrating 1.0 code easier. Once full traversal has been implemented, this should
+ * be removed
+ */
+public class EntityIdFilter extends AbstractFilter<FilterResult<Id>, FilterResult<Id>>{
+
+ private final Id entityId;
+
+
+ @Inject
+ public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;}
+
+
+
+ @Override
+ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
+ //ignore what our input was, and simply emit the id specified
+ return filterValueObservable.map( idFilterResult -> new FilterResult( entityId, idFilterResult.getPath() ));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
new file mode 100644
index 0000000..41507e9
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from a set of Ids. and verify they are valid
+ *
+ * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification
+ */
+public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>, FilterResult<Entity>>{
+
+ private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+
+
+ @Inject
+ public EntityLoadVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
+ this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ }
+
+
+ @Override
+ public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
+
+
+ final EntityCollectionManager entityCollectionManager =
+ entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() );
+
+ //it's more efficient to make 1 network hop to get everything, then drop our results if required
+ final Observable<FilterResult<Entity>> entityObservable =
+ filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> {
+
+ final Observable<EntitySet> entitySetObservable =
+ Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
+ .flatMap( ids -> entityCollectionManager.load( ids ) );
+
+
+ //now we have a collection, validate our canidate set is correct.
+
+ return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) )
+ .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+ entityCollector -> Observable.from( entityCollector.getResults() ) );
+ } );
+
+ return entityObservable;
+ }
+
+
+ /**
+ * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this
+ * state is mutable and difficult to represent functionally
+ */
+ private static final class EntityVerifier {
+
+ private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
+ private List<FilterResult<Entity>> results = new ArrayList<>();
+
+ private final List<FilterResult<Id>> candidateResults;
+ private final EntitySet entitySet;
+
+
+ public EntityVerifier( final EntitySet entitySet, final List<FilterResult<Id>> candidateResults ) {
+ this.entitySet = entitySet;
+ this.candidateResults = candidateResults;
+ this.results = new ArrayList<>( entitySet.size() );
+ }
+
+
+ /**
+ * Merge our candidates and our entity set into results
+ */
+ public void merge() {
+
+ for ( final FilterResult<Id> candidateResult : candidateResults ) {
+ validate( candidateResult );
+ }
+ }
+
+
+ public List<FilterResult<Entity>> getResults() {
+ return results;
+ }
+
+
+ private void validate( final FilterResult<Id> filterResult ) {
+
+ final Id candidateId = filterResult.getValue();
+
+
+ final MvccEntity entity = entitySet.getEntity( candidateId );
+
+
+ //doesn't exist warn and drop
+ if ( entity == null || !entity.getEntity().isPresent() ) {
+ logger.warn( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra."
+ + " Ignoring since this could be a region sync issue", candidateId );
+
+
+ //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+ return;
+ }
+
+ //it exists, add it
+
+ final Entity returnEntity = entity.getEntity().get();
+
+ final Optional<EdgePath> parent = filterResult.getPath();
+
+ final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
+
+ results.add( toReturn );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/GraphDiagram.jpg
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/GraphDiagram.jpg b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/GraphDiagram.jpg
new file mode 100644
index 0000000..c0308bd
Binary files /dev/null and b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/GraphDiagram.jpg differ
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionByIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionByIdFilter.java
new file mode 100644
index 0000000..71d606c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionByIdFilter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+
+/**
+ * Read an edge in the graph to verify it's existence by id
+ */
+public class ReadGraphCollectionByIdFilter extends AbstractReadGraphEdgeByIdFilter{
+
+ private final String collectionName;
+
+ @Inject
+ public ReadGraphCollectionByIdFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName, @Assisted final Id targetId ) {
+ super( graphManagerFactory, targetId );
+ this.collectionName = collectionName;
+ }
+
+
+ @Override
+ protected String getEdgeName() {
+ return CpNamingUtils.getEdgeTypeFromCollectionName( collectionName );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
new file mode 100644
index 0000000..dc39f5c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromCollectionName;
+
+
+/**
+ * Command for reading graph edges on a collection
+ */
+public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
+
+ private final String collectionName;
+
+
+ /**
+ * Create a new instance of our command
+ */
+ @Inject
+ public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName ) {
+ super( graphManagerFactory );
+ this.collectionName = collectionName;
+ }
+
+
+ @Override
+ protected String getEdgeTypeName() {
+ return getEdgeTypeFromCollectionName( collectionName );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByIdFilter.java
new file mode 100644
index 0000000..4f71202
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByIdFilter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+
+/**
+ * Read an edge in the graph to verify it's existence by id
+ */
+public class ReadGraphConnectionByIdFilter extends AbstractReadGraphEdgeByIdFilter{
+
+ private final String connectionName;
+
+ @Inject
+ public ReadGraphConnectionByIdFilter( final GraphManagerFactory graphManagerFactory,
+ @Assisted final String connectionName, @Assisted final Id targetId ) {
+ super( graphManagerFactory, targetId );
+ this.connectionName = connectionName;
+ }
+
+
+ @Override
+ protected String getEdgeName() {
+ return CpNamingUtils.getEdgeTypeFromConnectionType( connectionName );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
new file mode 100644
index 0000000..61ba4ad
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType;
+
+
+/**
+ * Command for reading graph edges on a connection
+ */
+public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, Edge>{
+
+ private final GraphManagerFactory graphManagerFactory;
+ private final String connectionName;
+ private final String entityType;
+
+
+ /**
+ * Create a new instance of our command
+ */
+ @Inject
+ public ReadGraphConnectionByTypeFilter( final GraphManagerFactory graphManagerFactory,
+ @Assisted("connectionName") final String connectionName, @Assisted("entityType") final String entityType ) {
+ this.graphManagerFactory = graphManagerFactory;
+ this.connectionName = connectionName;
+ this.entityType = entityType;
+ }
+
+
+
+ @Override
+ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
+
+ //get the graph manager
+ final GraphManager graphManager = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
+
+
+
+ final String edgeName = getEdgeTypeFromConnectionType( connectionName );
+
+
+ //return all ids that are emitted from this edge
+ return filterResultObservable.flatMap( idFilterResult -> {
+
+ //set our our constant state
+ final Optional<Edge> startFromCursor = getSeekValue();
+ final Id id = idFilterResult.getValue();
+
+ final SimpleSearchByIdType search =
+ new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+ entityType, startFromCursor );
+
+ return graphManager.loadEdgesFromSourceByType( search ).map(
+ edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() ));
+ } );
+ }
+
+
+ @Override
+ protected CursorSerializer<Edge> getCursorSerializer() {
+ return EdgeCursorSerializer.INSTANCE;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
new file mode 100644
index 0000000..11ec5f8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType;
+
+
+/**
+ * Command for reading graph edges on a connection
+ */
+public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
+
+ private final String connectionName;
+
+
+ /**
+ * Create a new instance of our command
+ */
+ @Inject
+ public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String connectionName ) {
+ super( graphManagerFactory );
+ this.connectionName = connectionName;
+ }
+
+
+ @Override
+ protected String getEdgeTypeName() {
+ return getEdgeTypeFromConnectionType( connectionName );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index 0260d1d..c779bb7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -49,7 +49,7 @@ public class ObservableQueryExecutor implements QueryExecutor {
public Iterator<Results> iterator;
- public ObservableQueryExecutor( final Observable<ResultsPage> resultsObservable) {
+ public ObservableQueryExecutor( final Observable<ResultsPage<Entity>> resultsObservable) {
//map to our old results objects, return a default empty if required
this.resultsObservable = resultsObservable.map( resultsPage -> createResults( resultsPage ) ).defaultIfEmpty( new Results() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
index fd65ebf..7128dcf 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
@@ -25,8 +25,8 @@ package org.apache.usergrid.corepersistence.pipeline.cursor;
import org.junit.Test;
import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticsearchCursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.graph.EdgeCursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.search.ElasticsearchCursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.EdgeCursorSerializer;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.impl.SimpleEdge;