You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/04 19:27:52 UTC
[01/11] incubator-usergrid git commit: Updated mapping to fix missing
doc_values and disable norms since we use external sorting
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-dev d18b8eea3 -> 9b939d15b
Updated mapping to fix missing doc_values and disable norms since we use external sorting
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6d4847a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6d4847a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6d4847a8
Branch: refs/heads/two-dot-o-dev
Commit: 6d4847a817aefa65075970a54af85faf159f9176
Parents: fd6305c
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Apr 30 10:16:10 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Apr 30 10:16:10 2015 -0600
----------------------------------------------------------------------
.../org/apache/usergrid/persistence/index/usergrid-mappings.json | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d4847a8/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
index f12b66b..bee84a2 100644
--- a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
+++ b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
@@ -73,6 +73,10 @@
},
"string": {
"type": "string",
+ "doc_values": true,
+ "norms": {
+ "enabled": false
+ },
"fields": {
"exact": {
"type": "string",
[07/11] incubator-usergrid git commit: Fixes graph cursor resume
state.
Posted by to...@apache.org.
Fixes graph cursor resume state.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/118711ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/118711ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/118711ae
Branch: refs/heads/two-dot-o-dev
Commit: 118711aee83875f72a0b058b784218b211748d4e
Parents: 413f023
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 09:59:00 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 09:59:00 2015 -0600
----------------------------------------------------------------------
.../read/graph/AbstractReadGraphFilter.java | 52 ++++++++++++++++++--
1 file changed, 49 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/118711ae/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
index 503fcf9..303bc5b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph;
import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
import org.apache.usergrid.corepersistence.pipeline.read.Filter;
import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.graph.Edge;
@@ -62,6 +63,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
final String edgeName = getEdgeTypeName();
+ final EdgeState edgeCursorState = new EdgeState();
//return all ids that are emitted from this edge
@@ -80,15 +82,30 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
* TODO, pass a message with pointers to our cursor values to be generated later
*/
return graphManager.loadEdgesFromSource( search )
- //set our cursor every edge we traverse
+ //set the edge state for cursors
+ .doOnNext( edge -> edgeCursorState.update( edge ) )
- //map our id from the target edge
- .map( edge -> createFilterResult( edge.getTargetNode(), edge, previousFilterValue.getPath() ) );
+ //map our id from the target edge and set our cursor every edge we traverse
+ .map( edge -> createFilterResult( edge.getTargetNode(), edgeCursorState.getCursorEdge(),
+ previousFilterValue.getPath() ) );
} );
}
@Override
+ protected FilterResult<Id> createFilterResult( final Id emit, final Edge cursorValue,
+ final Optional<EdgePath> parent ) {
+
+ //if it's our first pass, there's no cursor to generate
+ if(cursorValue == null){
+ return new FilterResult<>( emit, parent );
+ }
+
+ return super.createFilterResult( emit, cursorValue, parent );
+ }
+
+
+ @Override
protected CursorSerializer<Edge> getCursorSerializer() {
return EdgeCursorSerializer.INSTANCE;
}
@@ -98,4 +115,33 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
* Get the edge type name we should use when traversing
*/
protected abstract String getEdgeTypeName();
+
+
+ /**
+ * Wrapper class. Because edges seek > the last returned, we need to keep our n-1 value. This will be our cursor We
+ * always try to seek to the same position as we ended. Since we don't deal with a persistent read result, if we
+ * seek to a value = to our last, we may skip data.
+ */
+ private final class EdgeState {
+
+ private Edge cursorEdge = null;
+ private Edge currentEdge = null;
+
+
+ /**
+ * Update the pointers
+ */
+ private void update( final Edge newEdge ) {
+ cursorEdge = currentEdge;
+ currentEdge = newEdge;
+ }
+
+
+ /**
+ * Get the edge to use in cursors for resume
+ */
+ private Edge getCursorEdge() {
+ return cursorEdge;
+ }
+ }
}
[11/11] incubator-usergrid git commit: Merge branch 'USERGRID-587'
into two-dot-o-dev
Posted by to...@apache.org.
Merge branch 'USERGRID-587' into two-dot-o-dev
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9b939d15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9b939d15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9b939d15
Branch: refs/heads/two-dot-o-dev
Commit: 9b939d15ba398c1aa759f17d647bd85ee232ad9e
Parents: 4107ef3 5bb7798
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 11:27:46 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 11:27:46 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpRelationManager.java | 5 +-
.../corepersistence/pipeline/Pipeline.java | 12 +-
.../pipeline/PipelineContext.java | 16 +-
.../pipeline/PipelineOperation.java | 39 ++++
.../pipeline/PipelineResult.java | 57 -----
.../pipeline/cursor/ResponseCursor.java | 81 ++++---
.../pipeline/read/AbstractFilter.java | 45 ++++
.../pipeline/read/AbstractPathFilter.java | 109 +++++++++
.../read/AbstractPipelineOperation.java | 44 ----
.../pipeline/read/AbstractSeekingFilter.java | 102 --------
.../pipeline/read/CandidateResultsFilter.java | 31 ---
.../pipeline/read/Collector.java | 13 +-
.../pipeline/read/CollectorFactory.java | 12 +-
.../corepersistence/pipeline/read/EdgePath.java | 79 +++++++
.../corepersistence/pipeline/read/Filter.java | 9 +-
.../pipeline/read/FilterFactory.java | 40 +++-
.../pipeline/read/FilterResult.java | 56 +++++
.../pipeline/read/PipelineOperation.java | 38 ---
.../pipeline/read/ReadPipelineBuilder.java | 5 +-
.../pipeline/read/ReadPipelineBuilderImpl.java | 80 ++++---
.../pipeline/read/ResultsPage.java | 26 ++-
.../read/collect/AbstractCollector.java | 46 ++++
.../pipeline/read/collect/EntityFilter.java | 68 ++++++
.../read/collect/IdCursorSerializer.java | 41 ++++
.../read/collect/ResultsPageCollector.java | 80 +++++++
.../AbstractElasticSearchFilter.java | 45 ++--
.../pipeline/read/elasticsearch/Candidate.java | 55 +++++
.../elasticsearch/CandidateEntityFilter.java | 234 +++++++++++++++++++
.../read/elasticsearch/CandidateIdFilter.java | 201 ++++++++++++++++
.../CandidateResultsEntityResultsCollector.java | 217 -----------------
.../CandidateResultsIdVerifyFilter.java | 193 ---------------
.../impl/CollectionRefsVerifier.java | 44 ----
.../CollectionResultsLoaderFactoryImpl.java | 65 ------
.../impl/ConnectionRefsVerifier.java | 59 -----
.../ConnectionResultsLoaderFactoryImpl.java | 73 ------
.../impl/ElasticSearchQueryExecutor.java | 224 ------------------
.../read/elasticsearch/impl/EntityVerifier.java | 127 ----------
.../elasticsearch/impl/FilteringLoader.java | 219 -----------------
.../read/elasticsearch/impl/IdsVerifier.java | 46 ----
.../read/elasticsearch/impl/ResultsLoader.java | 43 ----
.../impl/ResultsLoaderFactory.java | 41 ----
.../elasticsearch/impl/ResultsVerifier.java | 52 -----
.../elasticsearch/impl/VersionVerifier.java | 85 -------
.../pipeline/read/entity/EntityIdFilter.java | 53 -----
.../read/entity/EntityLoadCollector.java | 94 --------
.../graph/AbstractReadGraphEdgeByIdFilter.java | 12 +-
.../read/graph/AbstractReadGraphFilter.java | 65 +++++-
.../pipeline/read/graph/EntityIdFilter.java | 54 +++++
.../pipeline/read/graph/EntityLoadFilter.java | 155 ++++++++++++
.../graph/ReadGraphConnectionByTypeFilter.java | 20 +-
.../results/ObservableQueryExecutor.java | 24 +-
.../apache/usergrid/persistence/Results.java | 2 +-
.../pipeline/cursor/CursorTest.java | 20 +-
.../persistence/index/CandidateResults.java | 11 +-
.../impl/EsApplicationEntityIndexImpl.java | 7 +-
.../persistence/index/usergrid-mappings.json | 1 -
56 files changed, 1575 insertions(+), 2100 deletions(-)
----------------------------------------------------------------------
[08/11] incubator-usergrid git commit: Merge branch 'two-dot-o-dev'
of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into
USERGRID-587
Posted by to...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-587
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a673c81b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a673c81b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a673c81b
Branch: refs/heads/two-dot-o-dev
Commit: a673c81bf7c51ce48f2b8d052fcba990931e1977
Parents: 118711a d18b8ee
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 09:59:09 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 09:59:09 2015 -0600
----------------------------------------------------------------------
.../persistence/index/impl/EntityIndexTest.java | 121 ++++++++++---------
1 file changed, 61 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
[03/11] incubator-usergrid git commit: Massive refactor. Paths for
cursor generation are now part of our I/O results. This allows the collector
to take until satisfied, then generate a serializable path.
Posted by to...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
new file mode 100644
index 0000000..56e1c1c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.VersionSet;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Responsible for verifying candidate result versions, then emitting the Ids of these versions
+ * Input is a batch of candidate results, output is a stream of validated Ids
+ */
+public class CandidateIdFilter extends AbstractFilter<Candidate, Id>
+ implements Filter<Candidate, Id> {
+
+ private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+ private final EntityIndexFactory entityIndexFactory;
+
+
+ @Inject
+ public CandidateIdFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final EntityIndexFactory entityIndexFactory ) {
+ this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ this.entityIndexFactory = entityIndexFactory;
+ }
+
+
+
+ @Override
+ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) {
+
+
+ /**
+ * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results
+ * objects
+ */
+
+ final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
+ final EntityCollectionManager entityCollectionManager =
+ entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+
+ final ApplicationEntityIndex applicationIndex =
+ entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+ final Observable<FilterResult<Id>> searchIdSetObservable = filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap(
+ candidateResults -> {
+ //flatten toa list of ids to load
+ final Observable<List<Id>> candidateIds =
+ Observable.from( candidateResults ).map( candidate -> candidate.getValue().getCandidateResult().getId() )
+ .toList();
+
+ //load the ids
+ final Observable<VersionSet> versionSetObservable =
+ candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
+
+ //now we have a collection, validate our canidate set is correct.
+
+ return versionSetObservable.map(
+ entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) )
+ .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+ entityCollector -> Observable.from( entityCollector.collectResults() ) );
+ } );
+
+ return searchIdSetObservable;
+ }
+
+
+
+
+
+ /**
+ * Map a new cp entity to an old entity. May be null if not present
+ */
+ private static final class EntityCollector {
+
+ private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
+ private List<FilterResult<Id>> results = new ArrayList<>();
+
+ private final EntityIndexBatch batch;
+ private final List<FilterResult<Candidate>> candidateResults;
+ private final VersionSet versionSet;
+
+
+ public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet,
+ final List<FilterResult<Candidate>> candidateResults ) {
+ this.batch = batch;
+ this.versionSet = versionSet;
+ this.candidateResults = candidateResults;
+ this.results = new ArrayList<>( versionSet.size() );
+ }
+
+
+ /**
+ * Merge our candidates and our entity set into results
+ */
+ public void merge() {
+
+ for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
+ validate( candidateResult );
+ }
+
+ batch.execute();
+ }
+
+
+ public List<FilterResult<Id>> collectResults() {
+ return results;
+ }
+
+
+ /**
+ * Validate each candidate results vs the data loaded from cass
+ * @param filterCandidate
+ */
+ private void validate( final FilterResult<Candidate> filterCandidate ) {
+
+ final CandidateResult candidateResult = filterCandidate.getValue().getCandidateResult();
+
+ final SearchEdge searchEdge = filterCandidate.getValue().getSearchEdge();
+
+ final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() );
+
+ final UUID candidateVersion = candidateResult.getVersion();
+
+ final UUID entityVersion = logEntry.getVersion();
+
+ final Id entityId = logEntry.getEntityId();
+
+ //entity is newer than ES version
+ if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) {
+
+ logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
+ new Object[] { searchEdge, entityId, entityVersion } );
+ batch.deindex( searchEdge, entityId, entityVersion );
+ return;
+ }
+
+ //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
+ //remove the ES record, since the read in cass should cause a read repair, just ignore
+ if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
+
+ logger.warn(
+ "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair "
+ + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
+ }
+
+ //they're the same add it
+
+ final FilterResult<Id> result = new FilterResult<>( entityId, filterCandidate.getPath() );
+
+ results.add( result );
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
deleted file mode 100644
index 465ff22..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Loads entities from an incoming CandidateResults object and return them as results
- */
-public class CandidateResultsEntityResultsCollector extends AbstractPipelineOperation<CandidateResults, ResultsPage>
- implements Collector<CandidateResults, ResultsPage> {
-
- private final EntityCollectionManagerFactory entityCollectionManagerFactory;
- private final EntityIndexFactory entityIndexFactory;
-
-
- @Inject
- public CandidateResultsEntityResultsCollector( final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final EntityIndexFactory entityIndexFactory ) {
- this.entityCollectionManagerFactory = entityCollectionManagerFactory;
- this.entityIndexFactory = entityIndexFactory;
- }
-
-
- @Override
- public Observable<ResultsPage> call( final Observable<CandidateResults> candidateResultsObservable ) {
-
-
- /**
- * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results
- * objects
- */
-
- final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
-
- final EntityCollectionManager entityCollectionManager =
- entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-
- final ApplicationEntityIndex applicationIndex =
- entityIndexFactory.createApplicationEntityIndex( applicationScope );
-
- final Observable<ResultsPage> searchIdSetObservable = candidateResultsObservable.flatMap( candidateResults -> {
- //flatten toa list of ids to load
- final Observable<List<Id>> candidateIds =
- Observable.from( candidateResults ).map( candidate -> candidate.getId() ).toList();
-
- //load the ids
- final Observable<EntitySet> entitySetObservable =
- candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) );
-
- //now we have a collection, validate our canidate set is correct.
-
- return entitySetObservable
- .map( entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) )
- .doOnNext( entityCollector -> entityCollector.merge() )
- .map( entityCollector -> entityCollector.getResults() );
- } );
-
- //if we filter all our results, we want to continue to try the next page
- return searchIdSetObservable;
- }
-
-
- /**
- * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally
- */
- private static final class EntityCollector {
-
- private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
- private List<Entity> results = new ArrayList<>();
-
- private final EntityIndexBatch batch;
- private final CandidateResults candidateResults;
- private final EntitySet entitySet;
-
-
- public EntityCollector( final EntityIndexBatch batch, final EntitySet entitySet,
- final CandidateResults candidateResults ) {
- this.batch = batch;
- this.entitySet = entitySet;
- this.candidateResults = candidateResults;
- this.results = new ArrayList<>( entitySet.size() );
- }
-
-
- /**
- * Merge our candidates and our entity set into results
- */
- public void merge() {
-
- for ( final CandidateResult candidateResult : candidateResults ) {
- validate( candidateResult );
- }
-
- batch.execute();
- }
-
-
- public ResultsPage getResults() {
- return new ResultsPage( results );
- }
-
-
- public EntityIndexBatch getBatch() {
- return batch;
- }
-
-
- private void validate( final CandidateResult candidateResult ) {
-
- final Id candidateId = candidateResult.getId();
- final UUID candidateVersion = candidateResult.getVersion();
-
-
- final MvccEntity entity = entitySet.getEntity( candidateId );
-
-
- //doesn't exist warn and drop
- if ( entity == null ) {
- logger.warn(
- "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra."
- + " Ignoring since this could be a region sync issue",
- candidateId, candidateVersion );
-
-
- //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
-
- return;
-
- }
-
-
- final UUID entityVersion = entity.getVersion();
-
-
- //entity is newer than ES version, could be an update or the entity is marked as deleted
- if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0) {
-
- final Id entityId = entity.getId();
- final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
- logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
- new Object[] { searchEdge, entityId, entityVersion } );
- batch.deindex( searchEdge, entityId, entityVersion );
- return;
- }
-
- //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
- //remove the ES record, since the read in cass should cause a read repair, just ignore
- if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
-
- final Id entityId = entity.getId();
- final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
- logger.warn(
- "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair "
- + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
-
- //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
-
- return;
- }
-
- //they're the same add it
-
-
- results.add( entity.getEntity().get() );
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
deleted file mode 100644
index bb9ab76..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.PipelineOperation;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Responsible for verifying candidate result versions, then emitting the Ids of these versions
- * Input is a batch of candidate results, output is a stream of validated Ids
- */
-public class CandidateResultsIdVerifyFilter extends AbstractPipelineOperation<CandidateResults, Id>
- implements PipelineOperation<CandidateResults, Id> {
-
- private final EntityCollectionManagerFactory entityCollectionManagerFactory;
- private final EntityIndexFactory entityIndexFactory;
-
-
- @Inject
- public CandidateResultsIdVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final EntityIndexFactory entityIndexFactory ) {
- this.entityCollectionManagerFactory = entityCollectionManagerFactory;
- this.entityIndexFactory = entityIndexFactory;
- }
-
-
-
- @Override
- public Observable<Id> call( final Observable<CandidateResults> observable ) {
-
-
- /**
- * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results
- * objects
- */
-
- final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
-
- final EntityCollectionManager entityCollectionManager =
- entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-
- final ApplicationEntityIndex applicationIndex =
- entityIndexFactory.createApplicationEntityIndex( applicationScope );
-
- final Observable<Id> searchIdSetObservable = observable.flatMap( candidateResults -> {
- //flatten toa list of ids to load
- final Observable<List<Id>> candidateIds =
- Observable.from( candidateResults ).map( candidate -> candidate.getId() ).toList();
-
- //load the ids
- final Observable<VersionSet> versionSetObservable =
- candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
-
- //now we have a collection, validate our canidate set is correct.
-
- return versionSetObservable
- .map( entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) )
- .doOnNext( entityCollector -> entityCollector.merge() )
- .flatMap( entityCollector -> Observable.from( entityCollector.collectResults() ) );
- } );
-
- return searchIdSetObservable;
- }
-
-
- /**
- * Map a new cp entity to an old entity. May be null if not present
- */
- private static final class EntityCollector {
-
- private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
- private List<Id> results = new ArrayList<>();
-
- private final EntityIndexBatch batch;
- private final CandidateResults candidateResults;
- private final VersionSet versionSet;
-
-
- public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet,
- final CandidateResults candidateResults ) {
- this.batch = batch;
- this.versionSet = versionSet;
- this.candidateResults = candidateResults;
- this.results = new ArrayList<>( versionSet.size() );
- }
-
-
- /**
- * Merge our candidates and our entity set into results
- */
- public void merge() {
-
- for ( final CandidateResult candidateResult : candidateResults ) {
- validate( candidateResult );
- }
-
- batch.execute();
- }
-
-
- public List<Id> collectResults() {
- return results;
- }
-
-
- /**
- * Validate each candidate results vs the data loaded from cass
- * @param candidateResult
- */
- private void validate( final CandidateResult candidateResult ) {
-
- final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() );
-
- final UUID candidateVersion = candidateResult.getVersion();
-
- final UUID entityVersion = logEntry.getVersion();
-
- final Id entityId = logEntry.getEntityId();
-
- //entity is newer than ES version
- if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) {
-
- final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
- logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
- new Object[] { searchEdge, entityId, entityVersion } );
- batch.deindex( searchEdge, entityId, entityVersion );
- return;
- }
-
- //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
- //remove the ES record, since the read in cass should cause a read repair, just ignore
- if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
-
- final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
- logger.warn(
- "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair "
- + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
- }
-
- //they're the same add it
-
- results.add( entityId );
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java
deleted file mode 100644
index cc96633..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.SimpleEntityRef;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public class CollectionRefsVerifier extends VersionVerifier {
-
-
-
- @Override
- public Results getResults( final Collection<Id> ids ) {
- List<EntityRef> refs = new ArrayList<EntityRef>(ids.size());
- for ( Id id : ids ) {
- refs.add( new SimpleEntityRef( id.getType(), id.getUuid() ) );
- }
- return Results.fromRefList( refs );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
deleted file mode 100644
index 94c91d9..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.SearchEdge;
-
-
-/**
- * Factory for creating results
- */
-public class CollectionResultsLoaderFactoryImpl implements ResultsLoaderFactory {
-
- private final EntityCollectionManager entityCollectionManager;
- private final ApplicationEntityIndex applicationEntityIndex;
-
-
- public CollectionResultsLoaderFactoryImpl( final EntityCollectionManager entityCollectionManager,
- final ApplicationEntityIndex applicationEntityIndex ) {
- this.entityCollectionManager = entityCollectionManager;
- this.applicationEntityIndex = applicationEntityIndex;
- }
-
-
- @Override
- public ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge scope,
- final Query.Level resultsLevel ) {
-
- ResultsVerifier verifier;
-
- if ( resultsLevel == Query.Level.REFS ) {
- verifier = new CollectionRefsVerifier();
- }
- else if ( resultsLevel == Query.Level.IDS ) {
- verifier = new IdsVerifier();
- }
- else {
- verifier = new EntityVerifier( Query.MAX_LIMIT );
- }
-
- return new FilteringLoader( entityCollectionManager, applicationEntityIndex, verifier, applicationScope,
- scope );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java
deleted file mode 100644
index 6b7bdde..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.usergrid.persistence.ConnectionRef;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import static org.apache.usergrid.persistence.SimpleEntityRef.ref;
-
-
-/**
- * Verifier for creating connections
- */
-public class ConnectionRefsVerifier extends VersionVerifier {
-
-
- private final EntityRef ownerId;
- private final String connectionType;
-
-
- public ConnectionRefsVerifier( final EntityRef ownerId, final String connectionType ) {
- this.ownerId = ownerId;
- this.connectionType = connectionType;
- }
-
- @Override
- public Results getResults( final Collection<Id> ids ) {
- List<ConnectionRef> refs = new ArrayList<>();
- for ( Id id : ids ) {
- refs.add( new ConnectionRefImpl( ownerId, connectionType, ref(id.getType(), id.getUuid()) ));
- }
-
- return Results.fromConnections( refs );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
deleted file mode 100644
index 55b95a9..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-
-
-/**
- * Factory for creating results
- */
-public class ConnectionResultsLoaderFactoryImpl implements ResultsLoaderFactory {
-
- private final EntityCollectionManager entityCollectionManager;
- private final ApplicationEntityIndex applicationEntityIndex;
- private final EntityRef ownerId;
- private final String connectionType;
-
-
- public ConnectionResultsLoaderFactoryImpl( final EntityCollectionManager entityCollectionManager,
- final ApplicationEntityIndex applicationEntityIndex, final EntityRef ownerId,
- final String connectionType ) {
- this.entityCollectionManager = entityCollectionManager;
- this.applicationEntityIndex = applicationEntityIndex;
- this.ownerId = ownerId;
- this.connectionType = connectionType;
- }
-
-
- @Override
- public ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge scope,
- final Query.Level resultsLevel ) {
-
- ResultsVerifier verifier;
-
- if ( resultsLevel == Query.Level.REFS ) {
- verifier = new ConnectionRefsVerifier( ownerId, connectionType );
- }
- else if ( resultsLevel == Query.Level.IDS ) {
- verifier = new ConnectionRefsVerifier( ownerId, connectionType );
- ;
- }
- else {
- verifier = new EntityVerifier( Query.MAX_LIMIT );
- }
-
- return new FilteringLoader( entityCollectionManager, applicationEntityIndex, verifier, applicationScope, scope );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
deleted file mode 100644
index 6e170f8..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.usergrid.persistence.index.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.google.common.base.Optional;
-
-
-public class ElasticSearchQueryExecutor implements Iterable<Results>, Iterator<Results> {
-
- private static final Logger logger = LoggerFactory.getLogger( ElasticSearchQueryExecutor.class );
-
- private final ResultsLoaderFactory resultsLoaderFactory;
-
- private final ApplicationScope applicationScope;
-
- private final ApplicationEntityIndex entityIndex;
-
- private final SearchEdge indexScope;
-
- private final SearchTypes types;
-
- private final String query;
-
- private final Optional<Integer> setOffsetFromCursor;
-
- private final int limit;
-
- private int offset;
-
-
- private Results currentResults;
-
- private boolean moreToLoad = true;
-
-
-
-
- public ElasticSearchQueryExecutor( final ResultsLoaderFactory resultsLoaderFactory, final ApplicationEntityIndex entityIndex,
- final ApplicationScope applicationScope, final SearchEdge indexScope,
- final SearchTypes types, final String query, final Optional<Integer> setOffsetFromCursor, final int limit ) {
- this.resultsLoaderFactory = resultsLoaderFactory;
- this.applicationScope = applicationScope;
- this.entityIndex = entityIndex;
- this.indexScope = indexScope;
- this.types = types;
- this.setOffsetFromCursor = setOffsetFromCursor;
-
- //we must deep copy the query passed. Otherwise we will modify it's state with cursors. Won't fix, not relevant
- //once we start subscribing to streams.
- this.query = query;
- this.limit = limit;
- }
-
-
- @Override
- public Iterator<Results> iterator() {
- return this;
- }
-
-
- private void loadNextPage() {
- // Because of possible stale entities, which are filtered out by buildResults(),
- // we loop until the we've got enough results to satisfy the query limit.
-
- final int maxQueries = 10; // max re-queries to satisfy query limit
-
-
- Results results = null;
- int queryCount = 0;
-
-
- CandidateResults crs = null;
-
- int newLimit = limit;
-
- while ( queryCount++ < maxQueries ) {
-
- crs = entityIndex.search( indexScope, types, query, newLimit , offset);
-
-
- logger.debug( "Calling build results with crs {}", crs );
- results = buildResults( indexScope, crs );
-
- /**
- * In an edge case where we delete stale entities, we could potentially get less results than expected.
- * This will only occur once during the repair phase.
- * We need to ensure that we short circuit before we overflow the requested limit during a repair.
- */
- if ( crs.isEmpty() || !crs.hasOffset() || results.size() > 0 ) { // no results, no cursor, can't get more
- break;
- }
-
-
- //we didn't load anything, but there was a cursor, this means a read repair occured. We have to short
- //circuit to avoid over returning the result set
-
-
- // need to query for more
- // ask for just what we need to satisfy, don't want to exceed limit
- newLimit = newLimit - results.size();
-
- logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] {
- limit, newLimit, queryCount
- } );
- }
-
- //now set our cursor if we have one for the next iteration
- if ( results.hasCursor() ) {
- moreToLoad = true;
- }
-
- else {
- moreToLoad = false;
- }
-
-//
-// //set our select subjects into our query if provided
-// if(crs != null){
-// query.setSelectSubjects( crs.getGetFieldMappings() );
-// }
-//
-
- //set our current results and the flag
- this.currentResults = results;
- }
-
-
-
- /**
- * Build results from a set of candidates, and discard those that represent stale indexes.
- *
- * @param indexScope The index scope to execute the search on
- * @param crs Candidates to be considered for results
- */
- private Results buildResults( final SearchEdge indexScope, final CandidateResults crs ) {
-
- logger.debug( "buildResults() from {} candidates", crs.size() );
-
- //get an instance of our results loader
- final ResultsLoader resultsLoader =
- this.resultsLoaderFactory.getLoader( applicationScope, indexScope, Query.Level.ALL_PROPERTIES );
-
- //load the results
- final Results results = resultsLoader.loadResults(crs);
-
- //signal for post processing
- resultsLoader.postProcess();
-
- //set offset into query
-
- logger.debug( "Returning results size {}", results.size() );
-
- return results;
- }
-
-
- @Override
- public boolean hasNext() {
-
- //we've tried to load and it's empty and we have more to load, load the next page
- if ( currentResults == null ) {
- //there's nothing left to load, nothing to do
- if ( !moreToLoad ) {
- return false;
- }
-
- //load the page
-
- loadNextPage();
- }
-
-
- //see if our current results are not null
- return currentResults != null;
- }
-
-
- @Override
- public Results next() {
- if ( !hasNext() ) {
- throw new NoSuchElementException( "No more results present" );
- }
-
- final Results toReturn = currentResults;
-
- currentResults = null;
-
- return toReturn;
- }
-
- @Override
- public void remove() {
- throw new RuntimeException("Remove not implemented!!");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
deleted file mode 100644
index d73c731..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityFactory;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.base.Optional;
-
-
-/**
- * A loader that verifies versions are correct in cassandra and match elasticsearch
- */
-public class EntityVerifier implements ResultsVerifier {
-
- private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
-
- private EntitySet ids;
-
- private Map<Id, org.apache.usergrid.persistence.model.entity.Entity> entityMapping;
-
-
- public EntityVerifier( final int maxSize ) {
- this.entityMapping = new HashMap<>( maxSize );
- }
-
-
- @Override
- public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) {
- ids = ecm.load( idsToLoad ).toBlocking().last();
- logger.debug("loadResults() asked for {} ids and got {}", idsToLoad.size(), ids.size());
- }
-
-
- @Override
- public boolean isValid( final CandidateResult candidateResult ) {
- final Id entityId = candidateResult.getId();
-
- final MvccEntity savedEntity = ids.getEntity( entityId );
-
- //version wasn't found deindex
- if ( savedEntity == null ) {
- logger.warn( "Version for Entity {}:{} not found", entityId.getType(), entityId.getUuid() );
- return false;
- }
-
- final UUID candidateVersion = candidateResult.getVersion();
- final UUID savedVersion = savedEntity.getVersion();
-
- if ( UUIDComparator.staticCompare( savedVersion, candidateVersion ) > 0 ) {
- logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
- entityId.getUuid(), entityId.getType(), candidateVersion, savedEntity
- } );
-
- return false;
- }
-
-
- final Optional<org.apache.usergrid.persistence.model.entity.Entity> entity = savedEntity.getEntity();
-
- if ( !entity.isPresent() ) {
- logger.warn( "Entity uuid:{} version v:{} is deleted but indexed, this is a bug ",
- entityId.getUuid(), savedEntity.getEntity() );
- return false;
- }
-
- entityMapping.put( entityId, entity.get() );
-
- return true;
- }
-
-
- @Override
- public Results getResults( final Collection<Id> ids ) {
-
- final List<Entity> ugEntities = new ArrayList<>( ids.size() );
-
- for ( final Id id : ids ) {
- final org.apache.usergrid.persistence.model.entity.Entity cpEntity = entityMapping.get( id );
-
- Entity entity = EntityFactory.newEntity( id.getUuid(), id.getType() );
-
- Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
- entity.addProperties( entityMap );
- ugEntities.add( entity );
- }
-
- return Results.fromEntities( ugEntities );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
deleted file mode 100644
index ade64a2..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import javax.annotation.Nullable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-
-
-public class FilteringLoader implements ResultsLoader {
-
- private static final Logger logger = LoggerFactory.getLogger( FilteringLoader.class );
-
- private final EntityCollectionManager entityCollectionManager;
- private final ResultsVerifier resultsVerifier;
- private final ApplicationScope applicationScope;
- private final SearchEdge indexScope;
- private final EntityIndexBatch indexBatch;
-
-
- /**
- * Create an instance of a filter loader
- *
- * @param entityCollectionManager The entityCollectionManagerFactory
- * @param resultsVerifier The verifier to verify the candidate results
- * @param applicationScope The application scope to perform the load
- * @param indexScope The index scope used in the search
- */
- protected FilteringLoader( final EntityCollectionManager entityCollectionManager, final ApplicationEntityIndex applicationEntityIndex, final ResultsVerifier resultsVerifier,
- final ApplicationScope applicationScope, final SearchEdge indexScope ) {
-
- this.entityCollectionManager = entityCollectionManager;
- this.resultsVerifier = resultsVerifier;
- this.applicationScope = applicationScope;
- this.indexScope = indexScope;
-
- indexBatch = applicationEntityIndex.createBatch();
- }
-
-
- @Override
- public Results loadResults( final CandidateResults crs ) {
-
-
- if ( crs.size() == 0 ) {
- return new Results();
- }
-
-
- // For each entity, holds the index it appears in our candidates for keeping ordering correct
- final Map<Id, Integer> orderIndex = new HashMap<>( crs.size() );
-
- // Maps the entity ids to our candidates
- final Map<Id, CandidateResult> maxCandidateMapping = new HashMap<>( crs.size() );
-
-
- final Iterator<CandidateResult> iter = crs.iterator();
-
-
- // TODO, in this case we're "optimizing" due to the limitations of collection scope.
- // Perhaps we should change the API to just be an application, then an "owner" scope?
-
- // Go through the candidates and group them by scope for more efficient retrieval.
- // Also remove duplicates before we even make a network call
- for ( int i = 0; iter.hasNext(); i++ ) {
-
- final CandidateResult currentCandidate = iter.next();
-
- final Id entityId = currentCandidate.getId();
-
- //check if we've seen this candidate by id
- final CandidateResult previousMax = maxCandidateMapping.get( entityId );
-
- //its not been seen, save it
- if ( previousMax == null ) {
- maxCandidateMapping.put( entityId, currentCandidate );
- orderIndex.put( entityId, i );
- continue;
- }
-
- //we have seen it, compare them
-
- final UUID previousMaxVersion = previousMax.getVersion();
-
- final UUID currentVersion = currentCandidate.getVersion();
-
-
- final CandidateResult toRemove;
- final CandidateResult toKeep;
-
- //current is newer than previous. Remove previous and keep current
- if ( UUIDComparator.staticCompare( currentVersion, previousMaxVersion ) > 0 ) {
- toRemove = previousMax;
- toKeep = currentCandidate;
- }
- //previously seen value is newer than current. Remove the current and keep the previously seen value
- else {
- toRemove = currentCandidate;
- toKeep = previousMax;
- }
-
- //this is a newer version, we know we already have a stale entity, add it to be cleaned up
-
-
- //de-index it
- logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
- entityId.getUuid(), entityId.getType(), toRemove.getVersion(), toKeep.getVersion()
- } );
-
- //deindex this document, and remove the previous maxVersion
- //we have to deindex this from our ownerId, since this is what gave us the reference
- indexBatch.deindex( indexScope, toRemove );
-
-
- //TODO, fire the entity repair cleanup task here instead of de-indexing
-
- //replace the value with a more current version
- maxCandidateMapping.put( entityId, toKeep );
- orderIndex.put( entityId, i );
- }
-
-
- //now everything is ordered, and older versions are removed. Batch fetch versions to verify
- // existence and correct versions
-
- final TreeMap<Integer, Id> sortedResults = new TreeMap<>();
-
-
- final Collection<Id> idsToLoad =
- Collections2.transform( maxCandidateMapping.values(), new Function<CandidateResult, Id>() {
- @Nullable
- @Override
- public Id apply( @Nullable final CandidateResult input ) {
- //NOTE this is never null, we won't need to check
- return input.getId();
- }
- } );
-
-
- //now using the scope, load the collection
-
-
-
- //load the results into the loader for this scope for validation
- resultsVerifier.loadResults( idsToLoad, entityCollectionManager );
-
- //now let the loader validate each candidate. For instance, the "max" in this candidate
- //could still be a stale result, so it needs validated
- for ( final Id requestedId : idsToLoad ) {
-
- final CandidateResult cr = maxCandidateMapping.get( requestedId );
-
- //ask the loader if this is valid, if not discard it and de-index it
- if ( !resultsVerifier.isValid( cr ) ) {
- indexBatch.deindex( indexScope, cr );
- continue;
- }
-
- //if we get here we're good, we need to add this to our results
- final int candidateIndex = orderIndex.get( requestedId );
-
- sortedResults.put( candidateIndex, requestedId );
- }
-
-
- // NOTE DO NOT execute the batch here.
- // It changes the results and we need consistent paging until we aggregate all results
- return resultsVerifier.getResults( sortedResults.values() );
- }
-
-
- @Override
- public void postProcess() {
- this.indexBatch.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
deleted file mode 100644
index 4a3bfcd..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public class IdsVerifier extends VersionVerifier {
-
- @Override
- public Results getResults( final Collection<Id> ids ) {
-
- final List<UUID> returnIds = new ArrayList<>( ids.size() );
-
- for ( final Id id : ids ) {
- returnIds.add( id.getUuid() );
- }
-
-
- return Results.fromIdList( returnIds );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
deleted file mode 100644
index c2a3e9a..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.index.CandidateResults;
-
-
-/**
- * Interface for loading results
- */
-public interface ResultsLoader {
-
- /**
- * Using the candidate results, load our results. Should filter stale results
- * @param crs The candidate result set
- * @return Results. Null safe, but may be empty
- */
- public Results loadResults( final CandidateResults crs);
-
- /**
- * Post process the load operation
- */
- public void postProcess();
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
deleted file mode 100644
index 3ccca1b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.SearchEdge;
-
-
-/**
- * Factory for creating results
- */
-public interface ResultsLoaderFactory {
-
- /**
- * Get the loader for results
- * @param applicationScope The application scope used to load results
- * @param indexScope The index scope used in the search
- * @param
- */
- ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge indexScope,
- final Query.Level resultsLevel );
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
deleted file mode 100644
index fe72ca2..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Collection;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public interface ResultsVerifier {
-
- /**
- * Load all the candidate ides for verification
- * @param ids The Id's to load
- * @param ecm The entity collection manager
- */
- public void loadResults(Collection<Id> ids, EntityCollectionManager ecm);
-
- /**
- * Return true if the candidate result is a valid result that should be retained. If it should
- * not it should also be removed from the list of possible return values in this loader
- * @param candidateResult
- */
- public boolean isValid(CandidateResult candidateResult);
-
-
- /**
- * Load the result set with the given ids
- * @return
- */
- public Results getResults(Collection<Id> ids);
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
deleted file mode 100644
index c49fb28..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Collection;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-
-
-/**
- * A loader that verifies versions are correct in Cassandra and match ElasticSearch
- */
-public abstract class VersionVerifier implements ResultsVerifier {
-
- private static final Logger logger = LoggerFactory.getLogger( VersionVerifier.class );
-
- private VersionSet ids;
-
-
- @Override
- public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) {
- ids = ecm.getLatestVersion( idsToLoad ).toBlocking().last();
- }
-
-
- @Override
- public boolean isValid( final CandidateResult candidateResult ) {
- final Id entityId = candidateResult.getId();
-
- final MvccLogEntry version = ids.getMaxVersion( entityId );
-
- //version wasn't found ,deindex
- if ( version == null ) {
- logger.warn( "Version for Entity {}:{} not found",
- entityId.getUuid(), entityId.getUuid() );
-
- return false;
- }
-
- final UUID savedVersion = version.getVersion();
-
- if ( UUIDComparator.staticCompare( savedVersion, candidateResult.getVersion() ) > 0 ) {
- logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
- new Object[] {
- entityId.getUuid(),
- entityId.getType(),
- candidateResult.getVersion(),
- savedVersion
- } );
-
- return false;
- }
-
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
deleted file mode 100644
index 6230147..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.entity;
-
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import rx.Observable;
-
-
-/**
- * This command is a stopgap to make migrating 1.0 code easier. Once full traversal has been implemented, this should
- * be removed
- */
-public class EntityIdFilter extends AbstractPipelineOperation<Id, Id> implements Filter<Id, Id> {
-
- private final Id entityId;
-
-
- @Inject
- public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;}
-
-
-
-
- @Override
- public Observable<Id> call( final Observable<Id> idObservable ) {
- return Observable.just( entityId );
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
deleted file mode 100644
index dd6b9b8..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.entity;
-
-
-import java.util.List;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Loads entities from a set of Ids.
- *
- * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification
- */
-public class EntityLoadCollector extends AbstractPipelineOperation<Id, ResultsPage>
- implements Collector<Id, ResultsPage> {
-
- private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-
-
- @Inject
- public EntityLoadCollector( final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
- this.entityCollectionManagerFactory = entityCollectionManagerFactory;
- }
-
-
- @Override
- public Observable<ResultsPage> call( final Observable<Id> observable ) {
-
-
- final EntityCollectionManager entityCollectionManager =
- entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() );
-
- final Observable<EntitySet> entitySetObservable = observable.buffer( pipelineContext.getLimit() ).flatMap(
- bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> entityCollectionManager.load( ids ) ) );
-
-
- final Observable<ResultsPage> resultsObservable = entitySetObservable
-
- .flatMap( entitySet -> {
-
- //get our entites and filter missing ones, then collect them into a results object
- final Observable<MvccEntity> mvccEntityObservable = Observable.from( entitySet.getEntities() );
-
-
- //convert them to our old entity model, then filter abscent, meaning they weren't found
- final Observable<List<Entity>> entitiesPageObservable =
- mvccEntityObservable.filter( mvccEntity -> mvccEntity.getEntity().isPresent() )
- .map( mvccEntity -> mvccEntity.getEntity().get() ).toList();
-
- //convert them to a list, then map them into results
- return entitiesPageObservable.map( entities -> new ResultsPage( entities ) );
- } );
-
-
- return resultsObservable;
- }
-
- /**
- * Map a new cp entity to an old entity. May be null if not present
- */
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
index e0f69cf..42b352b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
@@ -20,8 +20,9 @@
package org.apache.usergrid.corepersistence.pipeline.read.graph;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.SearchByEdge;
@@ -39,7 +40,7 @@ import rx.Observable;
/**
* Filter should take and Id and a graph edge, and ensure the connection between the two exists
*/
-public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOperation<Id, Id> implements
+public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractFilter<Id, Id> implements
Filter<Id, Id> {
private final GraphManagerFactory graphManagerFactory;
@@ -55,12 +56,13 @@ public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOp
@Override
- public Observable<Id> call( final Observable<Id> idObservable ) {
+ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
final GraphManager gm = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
- return idObservable.flatMap( id -> {
+ return filterValueObservable.flatMap( filterValue -> {
final String edgeTypeName = getEdgeName();
+ final Id id = filterValue.getValue();
//create our search
final SearchByEdge searchByEdge =
@@ -68,7 +70,7 @@ public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOp
Optional.absent() );
//load the versions of the edge, take the first since that's all we need to validate existence, then emit the target node
- return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() );
+ return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() ).map( targetId -> new FilterResult<>(targetId, filterValue.getPath()));
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
index 4dd34fc..503fcf9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
@@ -21,8 +21,9 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph;
import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -38,7 +39,7 @@ import rx.Observable;
/**
* Command for reading graph edges
*/
-public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id, Id, Edge> implements Filter<Id, Id> {
+public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> {
private final GraphManagerFactory graphManagerFactory;
@@ -52,7 +53,8 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id,
@Override
- public Observable<Id> call( final Observable<Id> observable ) {
+ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) {
+
//get the graph manager
final GraphManager graphManager =
@@ -63,10 +65,11 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id,
//return all ids that are emitted from this edge
- return observable.flatMap( id -> {
+ return previousIds.flatMap( previousFilterValue -> {
//set our our constant state
final Optional<Edge> startFromCursor = getSeekValue();
+ final Id id = previousFilterValue.getValue();
final SimpleSearchByEdgeType search =
@@ -78,9 +81,9 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id,
*/
return graphManager.loadEdgesFromSource( search )
//set our cursor every edge we traverse
- .doOnNext( edge -> setCursor( edge ) )
+
//map our id from the target edge
- .map( edge -> edge.getTargetNode() );
+ .map( edge -> createFilterResult( edge.getTargetNode(), edge, previousFilterValue.getPath() ) );
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
new file mode 100644
index 0000000..5a0e026
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+
+/**
+ * This command is a stopgap to make migrating 1.0 code easier. Once full traversal has been implemented, this should
+ * be removed
+ */
+public class EntityIdFilter extends AbstractFilter<Id, Id> implements Filter<Id, Id> {
+
+ private final Id entityId;
+
+
+ @Inject
+ public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;}
+
+
+
+ @Override
+ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
+ //ignore what our input was, and simply emit the id specified
+ return filterValueObservable.map( idFilterResult -> new FilterResult( entityId, idFilterResult.getPath() ));
+
+ }
+}
[06/11] incubator-usergrid git commit: Fixes resume logic by loading
then filtering first id
Posted by to...@apache.org.
Fixes resume logic by loading then filtering first id
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/413f023e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/413f023e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/413f023e
Branch: refs/heads/two-dot-o-dev
Commit: 413f023e243d8e398db6295b05a9c1f6c8c8feed
Parents: 294a7d9
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 09:11:01 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 09:11:01 2015 -0600
----------------------------------------------------------------------
.../corepersistence/pipeline/Pipeline.java | 3 +-
.../pipeline/read/FilterFactory.java | 9 +++
.../pipeline/read/ReadPipelineBuilderImpl.java | 11 +++-
.../pipeline/read/collect/EntityFilter.java | 68 ++++++++++++++++++++
.../read/collect/IdCursorSerializer.java | 41 ++++++++++++
.../persistence/index/usergrid-mappings.json | 1 -
6 files changed, 128 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
index df6a218..26cf346 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import com.google.common.base.Optional;
@@ -77,7 +78,7 @@ public class Pipeline<R> {
public Observable<R> execute(){
- Observable traverseObservable = Observable.just( applicationScope.getApplication() );
+ Observable traverseObservable = Observable.just( new FilterResult<>( applicationScope.getApplication(), Optional.absent() ));
//build our traversal commands
for ( PipelineOperation pipelineOperation : idPipelineOperationList ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
index c465516..a2f1605 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.pipeline.read;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.EntityFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.IdCursorSerializer;
import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateIdFilter;
import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchCollectionFilter;
@@ -131,4 +133,11 @@ public interface FilterFactory {
* @param entityId The entity id to emit
*/
EntityIdFilter getEntityIdFilter( final Id entityId );
+
+
+ /**
+ * Create a new instance of our entity filter
+ * @return
+ */
+ EntityFilter entityFilter();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
index ffb9f7d..28446ad 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
@@ -26,9 +26,9 @@ import java.util.List;
import org.apache.usergrid.corepersistence.pipeline.Pipeline;
import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter;
-import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Optional;
@@ -211,9 +211,14 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
//add our last filter that will generate entities
- final Filter<?, Entity> finalFilter = collectorState.getFinalFilter();
+ final Filter<?, Entity> entityLoadFilter = collectorState.getFinalFilter();
- filters.add( finalFilter );
+ filters.add( entityLoadFilter );
+
+ //add the filter that skips the first result on resume
+ final Filter<Entity, Entity> cursorEntityFilter = filterFactory.entityFilter();
+
+ filters.add( cursorEntityFilter );
//execute our collector
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java
new file mode 100644
index 0000000..daf2e7f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * A filter that is used when we can potentially serialize pages via cursor. This will filter the first result, only if
+ * it matches the Id that was set
+ */
+public class EntityFilter extends AbstractPathFilter<Entity, Entity, Id> implements Filter<Entity, Entity> {
+
+
+ @Override
+ public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Entity>> filterResultObservable ) {
+
+ //filter only the first id, then map into our path for our next pass
+
+
+ return filterResultObservable.skipWhile( filterResult -> {
+
+ final Optional<Id> startFromCursor = getSeekValue();
+
+ return startFromCursor.isPresent() && startFromCursor.get().equals( filterResult.getValue().getId() );
+ } ).map( filterResult -> {
+
+
+ final Entity entity = filterResult.getValue();
+ final Id entityId = entity.getId();
+
+ return createFilterResult( entity, entityId, filterResult.getPath() );
+ } );
+ }
+
+
+ @Override
+ protected CursorSerializer<Id> getCursorSerializer() {
+ return IdCursorSerializer.INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdCursorSerializer.java
new file mode 100644
index 0000000..d96b9f2
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdCursorSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * cursor serializer for Ids
+ */
+public class IdCursorSerializer extends AbstractCursorSerializer<Id> {
+
+
+ public static final IdCursorSerializer INSTANCE = new IdCursorSerializer();
+
+ @Override
+ protected Class<Id> getType() {
+ return Id.class;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
index bee84a2..c22a4ec 100644
--- a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
+++ b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
@@ -73,7 +73,6 @@
},
"string": {
"type": "string",
- "doc_values": true,
"norms": {
"enabled": false
},
[10/11] incubator-usergrid git commit: Merge branch 'two-dot-o-dev'
of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into
two-dot-o-dev
Posted by to...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-dev
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4107ef38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4107ef38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4107ef38
Branch: refs/heads/two-dot-o-dev
Commit: 4107ef38961dceecaf90c03edfed0d43dc67486b
Parents: 6d4847a d18b8ee
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 11:27:38 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 11:27:38 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpRelationManager.java | 34 ++---
.../usergrid/persistence/EntityManagerIT.java | 4 +-
.../graph/impl/SimpleSearchByEdge.java | 5 +-
.../usergrid/persistence/index/IndexFig.java | 11 +-
.../persistence/index/IndexRefreshCommand.java | 2 +-
.../index/impl/EsEntityIndexImpl.java | 2 +-
.../index/impl/IndexRefreshCommandImpl.java | 136 +++++++++++--------
.../persistence/index/impl/EntityIndexTest.java | 123 ++++++++---------
8 files changed, 173 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
[04/11] incubator-usergrid git commit: Massive refactor. Paths for
cursor generation are now part of our I/O results. This allows the collector
to take until satisfied, then generate a serializable path.
Posted by to...@apache.org.
Massive refactor. Paths for cursor generation are now part of our I/O results. This allows the collector to take until satisfied, then generate a serializable path.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cd983d66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cd983d66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cd983d66
Branch: refs/heads/two-dot-o-dev
Commit: cd983d66260222985431a775454183c2ed2305ea
Parents: 6d4847a
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Apr 30 17:40:52 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Apr 30 17:40:52 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpRelationManager.java | 5 +-
.../corepersistence/pipeline/Pipeline.java | 9 +-
.../pipeline/PipelineContext.java | 16 +-
.../pipeline/PipelineOperation.java | 39 ++++
.../pipeline/PipelineResult.java | 57 -----
.../pipeline/cursor/ResponseCursor.java | 81 ++++---
.../pipeline/read/AbstractFilter.java | 45 ++++
.../pipeline/read/AbstractPathFilter.java | 109 +++++++++
.../read/AbstractPipelineOperation.java | 44 ----
.../pipeline/read/AbstractSeekingFilter.java | 102 --------
.../pipeline/read/CandidateResultsFilter.java | 31 ---
.../pipeline/read/Collector.java | 13 +-
.../pipeline/read/CollectorFactory.java | 12 +-
.../corepersistence/pipeline/read/EdgePath.java | 79 +++++++
.../corepersistence/pipeline/read/Filter.java | 9 +-
.../pipeline/read/FilterFactory.java | 31 ++-
.../pipeline/read/FilterResult.java | 56 +++++
.../pipeline/read/PipelineOperation.java | 38 ---
.../pipeline/read/ReadPipelineBuilder.java | 5 +-
.../pipeline/read/ReadPipelineBuilderImpl.java | 75 +++---
.../pipeline/read/ResultsPage.java | 26 ++-
.../read/collect/AbstractCollector.java | 46 ++++
.../read/collect/ResultsPageCollector.java | 80 +++++++
.../AbstractElasticSearchFilter.java | 47 ++--
.../pipeline/read/elasticsearch/Candidate.java | 55 +++++
.../elasticsearch/CandidateEntityFilter.java | 234 +++++++++++++++++++
.../read/elasticsearch/CandidateIdFilter.java | 201 ++++++++++++++++
.../CandidateResultsEntityResultsCollector.java | 217 -----------------
.../CandidateResultsIdVerifyFilter.java | 193 ---------------
.../impl/CollectionRefsVerifier.java | 44 ----
.../CollectionResultsLoaderFactoryImpl.java | 65 ------
.../impl/ConnectionRefsVerifier.java | 59 -----
.../ConnectionResultsLoaderFactoryImpl.java | 73 ------
.../impl/ElasticSearchQueryExecutor.java | 224 ------------------
.../read/elasticsearch/impl/EntityVerifier.java | 127 ----------
.../elasticsearch/impl/FilteringLoader.java | 219 -----------------
.../read/elasticsearch/impl/IdsVerifier.java | 46 ----
.../read/elasticsearch/impl/ResultsLoader.java | 43 ----
.../impl/ResultsLoaderFactory.java | 41 ----
.../elasticsearch/impl/ResultsVerifier.java | 52 -----
.../elasticsearch/impl/VersionVerifier.java | 85 -------
.../pipeline/read/entity/EntityIdFilter.java | 53 -----
.../read/entity/EntityLoadCollector.java | 94 --------
.../graph/AbstractReadGraphEdgeByIdFilter.java | 12 +-
.../read/graph/AbstractReadGraphFilter.java | 15 +-
.../pipeline/read/graph/EntityIdFilter.java | 54 +++++
.../pipeline/read/graph/EntityLoadFilter.java | 155 ++++++++++++
.../graph/ReadGraphConnectionByTypeFilter.java | 20 +-
.../results/ObservableQueryExecutor.java | 24 +-
.../pipeline/cursor/CursorTest.java | 20 +-
.../persistence/index/CandidateResults.java | 11 +-
.../impl/EsApplicationEntityIndexImpl.java | 7 +-
52 files changed, 1402 insertions(+), 2096 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 3119934..2790ee1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -32,7 +32,6 @@ import org.springframework.util.Assert;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory;
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor;
@@ -648,7 +647,7 @@ public class CpRelationManager implements RelationManager {
}
- final Observable<PipelineResult<ResultsPage>> resultsObservable = readPipelineBuilder.execute();
+ final Observable<ResultsPage> resultsObservable = readPipelineBuilder.execute();
return new ObservableQueryExecutor( resultsObservable ).next();
}
@@ -917,7 +916,7 @@ public class CpRelationManager implements RelationManager {
}
- final Observable<PipelineResult<ResultsPage>> resultsObservable = readPipelineBuilder.execute();
+ final Observable<ResultsPage> resultsObservable = readPipelineBuilder.execute();
return new ObservableQueryExecutor( resultsObservable ).next();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
index bc93b6c..df6a218 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -25,7 +25,6 @@ import java.util.List;
import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.PipelineOperation;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import com.google.common.base.Optional;
@@ -47,7 +46,6 @@ public class Pipeline<R> {
private final List<PipelineOperation> idPipelineOperationList;
private final Collector<?, R> collector;
private final RequestCursor requestCursor;
- private final ResponseCursor responseCursor;
private final int limit;
@@ -69,7 +67,6 @@ public class Pipeline<R> {
this.limit = limit;
this.requestCursor = new RequestCursor( cursor );
- this.responseCursor = new ResponseCursor();
}
@@ -77,7 +74,7 @@ public class Pipeline<R> {
* Execute the pipline construction, returning an observable of results
* @return
*/
- public Observable<PipelineResult<R>> execute(){
+ public Observable<R> execute(){
Observable traverseObservable = Observable.just( applicationScope.getApplication() );
@@ -99,7 +96,7 @@ public class Pipeline<R> {
//append the optional cursor into the response for the caller to use
- return response.map( result -> new PipelineResult<>( result, responseCursor ) );
+ return response;
}
@@ -111,7 +108,7 @@ public class Pipeline<R> {
private void setState( final PipelineOperation pipelineOperation ) {
- final PipelineContext context = new PipelineContext( applicationScope, requestCursor, responseCursor,
+ final PipelineContext context = new PipelineContext( applicationScope, requestCursor,
limit, idCount );
pipelineOperation.setContext( context );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
index 325f876..018abb7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
@@ -38,16 +38,13 @@ public class PipelineContext {
private final int id;
private final ApplicationScope applicationScope;
private final RequestCursor requestCursor;
- private final ResponseCursor responseCursor;
private final int limit;
- public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor,
- final ResponseCursor responseCursor, final int limit, final int id ) {
+ public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor, final int limit, final int id ) {
this.applicationScope = applicationScope;
this.requestCursor = requestCursor;
- this.responseCursor = responseCursor;
this.limit = limit;
this.id = id;
}
@@ -64,7 +61,7 @@ public class PipelineContext {
/**
- * Get our cursor value if present
+ * Get our cursor value if present from our pipline
* @param serializer
*/
public <T extends Serializable> Optional<T> getCursor( final CursorSerializer<T> serializer ) {
@@ -73,15 +70,6 @@ public class PipelineContext {
return Optional.fromNullable( value );
}
-
- /**
- * Set the cursor value into our resposne
- */
- public <T extends Serializable> void setCursorValue( final T value, final CursorSerializer<T> serializer ) {
- responseCursor.setCursor( id, value, serializer );
- }
-
-
/**
* Get the limit for this execution
* @return
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
new file mode 100644
index 0000000..d2fa16c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+
+import rx.Observable;
+
+
+/**
+ * Interface for filtering commands. All filters must take an observable of Id's as an input. Output is then determined by subclasses.
+ * This takes an input of Id, performs some operation, and emits values for further processing in the Observable
+ * pipeline
+ * @param <T> The input type of the filter value
+ * @param <R> The output type of the filter value
+ */
+public interface PipelineOperation<T, R> extends Observable.Transformer<FilterResult<T>, R> {
+
+ void setContext(final PipelineContext pipelineContext);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
deleted file mode 100644
index fe8604e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline;
-
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
-
-import com.google.common.base.Optional;
-
-
-/**
- * Intermediate observable that will return results, as well as an optional cursor
- * @param <R>
- */
-public class PipelineResult<R> {
-
-
- private final R result;
-
- private final ResponseCursor responseCursor;
-
-
- public PipelineResult( final R result, final ResponseCursor responseCursor ) {
- this.result = result;
- this.responseCursor = responseCursor;
- }
-
-
- /**
- * If the user requests our cursor, return the cursor
- * @return
- */
- public Optional<String> getCursor(){
- return this.responseCursor.encodeAsString();
- }
-
- public R getResult(){
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
index f1c8c24..dbd8b88 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
@@ -20,12 +20,10 @@
package org.apache.usergrid.corepersistence.pipeline.cursor;
-import java.io.Serializable;
import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
-import com.fasterxml.jackson.core.Base64Variant;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -41,71 +39,72 @@ public class ResponseCursor {
private static final ObjectMapper MAPPER = CursorSerializerUtil.getMapper();
+
/**
- * We use a map b/c some indexes might be skipped
+ * The pointer to the first edge path. Evaluation is lazily performed in the case the caller does not care about
+ * the cursor.
*/
- private Map<Integer, CursorEntry<?>> cursors = new HashMap<>();
+ private final Optional<EdgePath> edgePath;
+ private Optional<String> encodedValue = null;
- /**
- * Set the possible cursor value into the index. DOES NOT parse the cursor. This is intentional for performance
- */
- public <T extends Serializable> void setCursor( final int id, final T cursor,
- final CursorSerializer<T> serializer ) {
- final CursorEntry<T> newEntry = new CursorEntry<>( cursor, serializer );
- cursors.put( id, newEntry );
- }
+ public ResponseCursor( final Optional<EdgePath> edgePath ) {this.edgePath = edgePath;}
/**
- * now we're done, encode as a string
+ * Lazyily encoded deliberately. If the user doesn't care about a cursor and is using streams, we dont' want to take the
+ * time to calculate it
*/
public Optional<String> encodeAsString() {
- try {
- if(cursors.isEmpty()){
- return Optional.absent();
- }
+ //always return cached if we are called 2x
+ if ( encodedValue != null ) {
+ return encodedValue;
+ }
+
+ if ( !edgePath.isPresent() ) {
+ encodedValue = Optional.absent();
+ return encodedValue;
+ }
+
+
+ try {
+ //no edge path, short circuit
final ObjectNode map = MAPPER.createObjectNode();
- for ( Map.Entry<Integer, CursorEntry<?>> entry : cursors.entrySet() ) {
- final CursorEntry cursorEntry = entry.getValue();
+ Optional<EdgePath> current = edgePath;
- final JsonNode serialized = cursorEntry.serializer.toNode( MAPPER, cursorEntry.cursor );
- map.put( entry.getKey().toString(), serialized );
- }
+ //traverse each edge and add them to our json
+ do {
+
+ final EdgePath edgePath = current.get();
+ final Object cursorValue = edgePath.getCursorValue();
+ final CursorSerializer serializer = edgePath.getSerializer();
+ final int filterId = edgePath.getFilterId();
+
+ final JsonNode serialized = serializer.toNode( MAPPER, cursorValue );
+ map.put( String.valueOf( filterId ), serialized );
+ current = current.get().getPrevious();
+ }
+ while ( current.isPresent() );
- final byte[] output = MAPPER.writeValueAsBytes(map);
+ final byte[] output = MAPPER.writeValueAsBytes( map );
//generate a base64 url save string
final String value = Base64.getUrlEncoder().encodeToString( output );
- return Optional.of( value );
-
+ encodedValue = Optional.of( value );
}
catch ( JsonProcessingException e ) {
throw new CursorParseException( "Unable to serialize cursor", e );
}
- }
-
- /**
- * Interal pointer to the cursor and it's serialzed value
- */
- private static final class CursorEntry<T> {
- private final T cursor;
- private final CursorSerializer<T> serializer;
-
-
- private CursorEntry( final T cursor, final CursorSerializer<T> serializer ) {
- this.cursor = cursor;
- this.serializer = serializer;
- }
+ return encodedValue;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
new file mode 100644
index 0000000..e4d5d44
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+
+
+/**
+ * Basic functionality for our commands to handle cursor IO
+ * @param <T> the input type
+ * @param <R> The output Type
+ */
+public abstract class AbstractFilter<T, R> implements Filter<T, R> {
+
+
+ protected PipelineContext pipelineContext;
+
+
+ @Override
+ public void setContext( final PipelineContext pipelineContext ) {
+ this.pipelineContext = pipelineContext;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
new file mode 100644
index 0000000..c68dc4a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import java.io.Serializable;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Abstract class for filters to extend that require a cursor
+ * @param <T> The input type
+ * @param <R> The response type
+ * @param <C> The cursor type
+ */
+public abstract class AbstractPathFilter<T, R, C extends Serializable> extends AbstractFilter<T, R> implements Filter<T, R> {
+
+
+
+ //TODO not a big fan of this, but not sure how to build resume otherwise
+ private CursorSeek<C> cursorSeek;
+
+
+ /**
+ * Return the parsed value of the cursor from the last request, if it exists
+ */
+ protected Optional<C> getSeekValue() {
+
+ if(cursorSeek == null) {
+ final Optional<C> cursor = pipelineContext.getCursor( getCursorSerializer() );
+ cursorSeek = new CursorSeek<>( cursor );
+ }
+
+ return cursorSeek.getSeekValue();
+
+ }
+
+
+ /**
+ * Sets the cursor into our pipeline context
+ */
+ protected FilterResult<R> createFilterResult( final R emit, final C cursorValue, final Optional<EdgePath> parent ){
+
+
+ //create a current path, and append our parent path to it
+ final EdgePath<C> newEdgePath =
+ new EdgePath<>( pipelineContext.getId(), cursorValue, getCursorSerializer(), parent );
+
+ //emit our value with the parent path
+ return new FilterResult<>( emit, Optional.of( newEdgePath ) );
+
+ }
+
+
+ /**
+ * Return the class to be used when parsing the cursor
+ */
+ protected abstract CursorSerializer<C> getCursorSerializer();
+
+
+ /**
+ * An internal class that holds a mutable state. When resuming, we only ever honor the seek value on the first call. Afterwards, we will seek from the beginning on newly emitted values.
+ * Calling get will return the first value to seek, or absent if not specified. Subsequent calls will return absent. Callers should treat the results as seek values for each operation
+ */
+ protected static class CursorSeek<C> {
+
+ private Optional<C> seek;
+
+ private CursorSeek(final Optional<C> cursorValue){
+ seek = cursorValue;
+ }
+
+
+ /**
+ * Get the seek value to use when searching
+ * @return
+ */
+ public Optional<C> getSeekValue(){
+ final Optional<C> toReturn = seek;
+
+ seek = Optional.absent();
+
+ return toReturn;
+ }
+
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
deleted file mode 100644
index 8d7f106..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
-
-
-/**
- * Basic functionality for our commands to handle cursor IO
- * @param <T> the input type
- * @param <R> The output Type
- */
-public abstract class AbstractPipelineOperation<T, R> implements PipelineOperation<T, R> {
-
-
- protected PipelineContext pipelineContext;
-
-
- @Override
- public void setContext( final PipelineContext pipelineContext ) {
- this.pipelineContext = pipelineContext;
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
deleted file mode 100644
index c23a1b7..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import java.io.Serializable;
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-
-import com.google.common.base.Optional;
-
-
-/**
- * Abstract class for filters to extend that require a cursor
- * @param <T> The input type
- * @param <R> The response type
- * @param <C> The cursor type
- */
-public abstract class AbstractSeekingFilter<T, R, C extends Serializable> extends AbstractPipelineOperation<T, R> implements Filter<T, R> {
-
-
-
- //TODO not a big fan of this, but not sure how to build resume otherwise
- private CursorSeek<C> cursorSeek;
-
-
- /**
- * Return the parsed value of the cursor from the last request, if it exists
- */
- protected Optional<C> getSeekValue() {
-
- if(cursorSeek == null) {
- final Optional<C> cursor = pipelineContext.getCursor( getCursorSerializer() );
- cursorSeek = new CursorSeek<>( cursor );
- }
-
- return cursorSeek.getSeekValue();
-
- }
-
-
- /**
- * Sets the cursor into our pipeline context
- * @param newValue
- */
- protected void setCursor(final C newValue){
- pipelineContext.setCursorValue( newValue, getCursorSerializer() );
- }
-
-
- /**
- * Return the class to be used when parsing the cursor
- */
- protected abstract CursorSerializer<C> getCursorSerializer();
-
-
- /**
- * An internal class that holds a mutable state. When resuming, we only ever honor the seek value on the first call. Afterwards, we will seek from the beginning on newly emitted values.
- * Calling get will return the first value to seek, or absent if not specified. Subsequent calls will return absent. Callers should treat the results as seek values for each operation
- */
- protected static class CursorSeek<C> {
-
- private Optional<C> seek;
-
- private CursorSeek(final Optional<C> cursorValue){
- seek = cursorValue;
- }
-
-
- /**
- * Get the seek value to use when searching
- * @return
- */
- public Optional<C> getSeekValue(){
- final Optional<C> toReturn = seek;
-
- seek = Optional.absent();
-
- return toReturn;
- }
-
-
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
deleted file mode 100644
index 4e6d06e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- * Traverses edges in the graph. Either by query or graph traversal. Take an observable of ids, and emits
- * an observable of ids
- */
-public interface CandidateResultsFilter extends PipelineOperation<Id, CandidateResults> {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
index 69d929c..e28ce44 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
@@ -20,11 +20,18 @@
package org.apache.usergrid.corepersistence.pipeline.read;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+
+
/**
- * A command that is used to reduce our stream of results into a final output
- * @param <T>
+ * A command that is used to reduce our stream of results into a stream of final batch outputs. When used
+ * no further transformation or encoding should occur. Otherwise EdgePath data will be lost, and serialization cannot occur
+ * across requests
+ *
+ * @param <T> The input type
+ * @param <R> The output type
*/
-public interface Collector<T, R> extends PipelineOperation<T, R> {
+public interface Collector<T, R> extends PipelineOperation<T,R> {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
index 6893b34..dd200b5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
@@ -20,8 +20,7 @@
package org.apache.usergrid.corepersistence.pipeline.read;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsEntityResultsCollector;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollector;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
/**
@@ -29,16 +28,11 @@ import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollec
*/
public interface CollectorFactory {
- /**
- * Generate a new instance of the command with the specified parameters
- */
- EntityLoadCollector entityLoadCollector();
/**
- * Get the collector for collection candidate results to entities
+ * Get the results page collector
* @return
*/
- CandidateResultsEntityResultsCollector candidateResultsEntityResultsCollector();
-
+ ResultsPageCollector getResultsPageCollector();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java
new file mode 100644
index 0000000..c560fad
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A path from our input element to our emitted element. A list of EdgePaths comprise a path through the graph. The chains of edge paths will result
+ * in a cursor when aggregated. If a graph traversal is the following
+ *
+ * applicationId(1) - "users" -> userId(2) - "devices" -> deviceId(3). There would be 2 EdgePath
+ *
+ * EdgePath("users"->userId(2)) <- parent - EdgePath("devices" -> deviceId(3))
+ */
+public class EdgePath<C> {
+
+
+ private final int filterId;
+ private final C cursorValue;
+ private final CursorSerializer<C> serializer;
+ private final Optional<EdgePath> previous;
+
+
+ /**
+ *
+ * @param filterId The id of the filter that generated this path
+ * @param cursorValue The value to resume seeking on the path
+ * @param serializer The serializer to serialize the value
+ * @param parent The parent graph path edge to reach this path
+ */
+ public EdgePath( final int filterId, final C cursorValue, final CursorSerializer<C> serializer,
+ final Optional<EdgePath> parent ) {
+ this.filterId = filterId;
+ this.cursorValue = cursorValue;
+ this.serializer = serializer;
+ this.previous = parent;
+ }
+
+
+ public C getCursorValue() {
+ return cursorValue;
+ }
+
+
+ public int getFilterId() {
+ return filterId;
+ }
+
+
+ public Optional<EdgePath> getPrevious() {
+ return previous;
+ }
+
+
+ public CursorSerializer<C> getSerializer() {
+ return serializer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
index ace62db..054a85a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
@@ -20,11 +20,12 @@
package org.apache.usergrid.corepersistence.pipeline.read;
-import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
/**
- * Traverses edges in the graph. Either by query or graph traversal. Take an observable of ids, and emits
- * an observable of ids
+ * Traverses edges in the graph. Either by query or graph traversal. Take an observable of FilterResult, and emits
+ * an observable of FilterResults. Filters should never emit groups or objects that represent collections. Items should
+ * always be emitted 1 at a time. It is the responsibility of the collector to aggregate results.
*/
-public interface Filter<T, R> extends PipelineOperation<T, R> {}
+public interface Filter<T, R> extends PipelineOperation<T, FilterResult<R>> {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
index 078d981..c465516 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -20,10 +20,12 @@
package org.apache.usergrid.corepersistence.pipeline.read;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsIdVerifyFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateIdFilter;
import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchCollectionFilter;
import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchConnectionFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter;
import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter;
import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter;
import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter;
@@ -43,6 +45,7 @@ public interface FilterFactory {
/**
* Generate a new instance of the command with the specified parameters
+ *
* @param collectionName The collection name to use when reading the graph
*/
ReadGraphCollectionFilter readGraphCollectionFilter( final String collectionName );
@@ -57,12 +60,14 @@ public interface FilterFactory {
/**
* Generate a new instance of the command with the specified parameters
+ *
* @param connectionName The connection name to use when traversing the graph
*/
ReadGraphConnectionFilter readGraphConnectionFilter( final String connectionName );
/**
* Generate a new instance of the command with the specified parameters
+ *
* @param connectionName The connection name to use when traversing the graph
* @param entityType The entity type to use when traversing the graph
*/
@@ -72,13 +77,15 @@ public interface FilterFactory {
/**
* Read a connection directly between two identifiers
+ *
* @param connectionName The connection name to use when traversing the graph
- * @param targetId The target Id to use when traversing the graph
+ * @param targetId The target Id to use when traversing the graph
*/
ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter( final String connectionName, final Id targetId );
/**
* Generate a new instance of the command with the specified parameters
+ *
* @param query The query to use when querying the entities in the collection
* @param collectionName The collection name to use when querying
*/
@@ -90,6 +97,7 @@ public interface FilterFactory {
/**
* Generate a new instance of the command with the specified parameters
+ *
* @param query The query to use when querying the entities in the connection
* @param connectionName The type of connection to query
* @param connectedEntityType The type of entity in the connection. Leave absent to query all entity types
@@ -102,13 +110,24 @@ public interface FilterFactory {
/**
- * Get a candidate ids verifier for collection results. Should be inserted into pipelines where a query filter is an intermediate step,
- * not a final filter before collectors
+ * Generate a new instance of the command with the specified parameters
+ */
+ EntityLoadFilter entityLoadFilter();
+
+ /**
+ * Get the collector for collection candidate results to entities
*/
- CandidateResultsIdVerifyFilter candidateResultsIdVerifyFilter();
+ CandidateEntityFilter candidateEntityFilter();
+
+ /**
+ * Get a candidate ids verifier for collection results. Should be inserted into pipelines where a query filter is
+ * an intermediate step, not a final filter before collectors
+ */
+ CandidateIdFilter candidateResultsIdVerifyFilter();
/**
* Get an entity id filter. Used as a 1.0->2.0 bridge since we're not doing full traversals
+ *
* @param entityId The entity id to emit
*/
EntityIdFilter getEntityIdFilter( final Id entityId );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
new file mode 100644
index 0000000..3c41a2b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A bean that is passed between filters with immutable cursor state
+ * @param <T>
+ */
+public class FilterResult<T> {
+ private final T value;
+ private final Optional<EdgePath> path;
+
+
+ /**
+ * Create a new immutable filtervalue
+ * @param value The value the filter emits
+ * @param path The path to this value, if created
+ */
+ public FilterResult( final T value, final Optional<EdgePath> path ) {
+ this.value = value;
+ this.path = path;
+ }
+
+
+ public T getValue() {
+ return value;
+ }
+
+
+ public Optional<EdgePath> getPath() {
+ return path;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
deleted file mode 100644
index 28bba36..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
-
-import rx.Observable;
-
-
-/**
- * Interface for filtering commands. All filters must take an observable of Id's as an input. Output is then determined by subclasses.
- * This takes an input of Id, performs some operation, and emits values for further processing in the Observable
- * pipeline
- * @param <T> The input type
- * @param <R>
- */
-public interface PipelineOperation< T, R> extends Observable.Transformer<T, R> {
-
- void setContext(final PipelineContext pipelineContext);
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
index 25ab03e..d0e87b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
@@ -20,9 +20,6 @@
package org.apache.usergrid.corepersistence.pipeline.read;
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Optional;
@@ -103,5 +100,5 @@ public interface ReadPipelineBuilder {
* Load our entity results when our previous filter calls graph
* @return
*/
- Observable<PipelineResult<ResultsPage>> execute();
+ Observable<ResultsPage> execute();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
index 4ecfb47..ffb9f7d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
@@ -24,9 +24,9 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.usergrid.corepersistence.pipeline.Pipeline;
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsEntityResultsCollector;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollector;
+import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter;
+import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -52,6 +52,8 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
private final ApplicationScope applicationScope;
+ private final CollectorFactory collectorFactory;
+
/**
* Our pointer to our collect filter. Set or cleared with each operation that's performed so the correct results are
@@ -70,6 +72,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
this.filterFactory = filterFactory;
this.applicationScope = applicationScope;
+ this.collectorFactory = collectorFactory;
//init our cursor to empty
this.cursor = Optional.absent();
@@ -78,7 +81,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
this.limit = DEFAULT_LIMIT;
- this.collectorState = new CollectorState( collectorFactory );
+ this.collectorState = new CollectorState( );
this.filters = new ArrayList<>();
}
@@ -120,7 +123,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
filters.add( filterFactory.readGraphCollectionByIdFilter( collectionName, entityId ) );
- this.collectorState.setEntityLoaderCollector();
+ this.collectorState.setIdEntityLoaderFilter();
return this;
}
@@ -132,7 +135,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
filters.add( filterFactory.readGraphCollectionFilter( collectionName ) );
- this.collectorState.setEntityLoaderCollector();
+ this.collectorState.setIdEntityLoaderFilter();
return this;
}
@@ -147,7 +150,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
filters.add( filterFactory.elasticSearchCollectionFilter( query, collectionName, entityType ) );
- this.collectorState.setCandidateResultsEntityResultsCollector();
+ this.collectorState.setCandidateEntityFilter();
return this;
}
@@ -159,7 +162,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
ValidationUtils.verifyIdentity( entityId );
filters.add( filterFactory.readGraphConnectionByIdFilter( connectionName, entityId ) );
- collectorState.setEntityLoaderCollector();
+ collectorState.setIdEntityLoaderFilter();
return this;
}
@@ -169,7 +172,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
public ReadPipelineBuilder getConnection( final String connectionName ) {
Preconditions.checkNotNull( connectionName, "connectionName must not be null" );
filters.add( filterFactory.readGraphConnectionFilter( connectionName ) );
- collectorState.setEntityLoaderCollector();
+ collectorState.setIdEntityLoaderFilter();
return this;
}
@@ -182,7 +185,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
filters.add( filterFactory.readGraphConnectionByTypeFilter( connectionName, entityType ) );
- collectorState.setEntityLoaderCollector();
+ collectorState.setIdEntityLoaderFilter();
return this;
}
@@ -196,17 +199,25 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
Preconditions.checkNotNull( query, "query must not be null" );
filters.add( filterFactory.elasticSearchConnectionFilter( query, connectionName, entityType ) );
- collectorState.setCandidateResultsEntityResultsCollector();
+ collectorState.setCandidateEntityFilter();
return this;
}
@Override
- public Observable<PipelineResult<ResultsPage>> execute() {
+ public Observable<ResultsPage> execute() {
ValidationUtils.validateApplicationScope( applicationScope );
- final Collector<?, ResultsPage> collector = collectorState.getCollector();
+
+ //add our last filter that will generate entities
+ final Filter<?, Entity> finalFilter = collectorState.getFinalFilter();
+
+ filters.add( finalFilter );
+
+
+ //execute our collector
+ final Collector<?, ResultsPage> collector = collectorFactory.getResultsPageCollector();
Preconditions.checkNotNull( collector,
"You have not specified an operation that creates a collection filter. This is required for loading "
@@ -229,46 +240,52 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
* A mutable state for our collectors. Rather than create a new instance each time, we create a singleton
* collector
*/
- private static final class CollectorState {
- private final CollectorFactory collectorFactory;
+ private final class CollectorState {
+
- private EntityLoadCollector entityLoadCollector;
+ private EntityLoadFilter entityLoadCollector;
- private CandidateResultsEntityResultsCollector candidateResultsEntityResultsCollector;
+ private CandidateEntityFilter candidateEntityFilter;
+ private Filter entityLoadFilter;
- private Collector<?, ResultsPage> collector = null;
- private CollectorState( final CollectorFactory collectorFactory ) {this.collectorFactory = collectorFactory;}
+ private CollectorState( ){}
- public void setEntityLoaderCollector() {
+ /**
+ * Set our final filter to be a load entity by Id filter
+ */
+ public void setIdEntityLoaderFilter() {
if ( entityLoadCollector == null ) {
- entityLoadCollector = collectorFactory.entityLoadCollector();
+ entityLoadCollector = filterFactory.entityLoadFilter();
}
- collector = entityLoadCollector;
+ entityLoadFilter = entityLoadCollector;
}
- public void setCandidateResultsEntityResultsCollector() {
- if ( candidateResultsEntityResultsCollector == null ) {
- candidateResultsEntityResultsCollector = collectorFactory.candidateResultsEntityResultsCollector();
+ /**
+ * Set our final filter to be a load entity by candidate filter
+ */
+ public void setCandidateEntityFilter() {
+ if ( candidateEntityFilter == null ) {
+ candidateEntityFilter = filterFactory.candidateEntityFilter();
}
- collector = candidateResultsEntityResultsCollector;
+ entityLoadFilter = candidateEntityFilter;
}
public void clear() {
- collector = null;
+ entityLoadFilter = null;
}
- public Collector<?, ResultsPage> getCollector() {
- return collector;
+ public Filter<?, Entity> getFinalFilter() {
+ return entityLoadFilter;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
index 198ac67..1810d65 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
@@ -22,18 +22,28 @@ package org.apache.usergrid.corepersistence.pipeline.read;
import java.util.List;
+import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
import org.apache.usergrid.persistence.model.entity.Entity;
/**
- * An encapsulation of entities as a group of responses. Ordered by the requesting filters. Each set should be considered a "page" of results.
+ * An encapsulation of entities as a group of responses. Ordered by the requesting filters. Each set should be
+ * considered a "page" of results. A hold over from 1.0. We shouldn't need this when we fully move away from the EM/RM
*/
public class ResultsPage {
private final List<Entity> entityList;
+ private final int limit;
- public ResultsPage( final List<Entity> entityList ) {this.entityList = entityList;}
+ private final ResponseCursor responseCursor;
+
+
+ public ResultsPage( final List<Entity> entityList, final ResponseCursor responseCursor, final int limit ) {
+ this.entityList = entityList;
+ this.responseCursor = responseCursor;
+ this.limit = limit;
+ }
public List<Entity> getEntityList() {
@@ -43,9 +53,15 @@ public class ResultsPage {
/**
* Return true if the results page is empty
- * @return
*/
- public boolean isEmpty(){
- return entityList == null || entityList.isEmpty();
+ public boolean hasMoreResults() {
+ return entityList != null && entityList.size() == limit;
+ }
+
+
+
+
+ public ResponseCursor getResponseCursor() {
+ return responseCursor;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
new file mode 100644
index 0000000..1c5175d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+
+
+/**
+ * Basic functionality for our commands to handle cursor IO
+ * @param <T> the input type
+ * @param <R> The output Type
+ */
+public abstract class AbstractCollector<T, R> implements Collector<T, R> {
+
+
+ protected PipelineContext pipelineContext;
+
+
+ @Override
+ public void setContext( final PipelineContext pipelineContext ) {
+ this.pipelineContext = pipelineContext;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
new file mode 100644
index 0000000..84654aa
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
+import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * Takes entities and collects them into results. This mostly exists for 1.0 compatibility. Eventually this will
+ * become the only collector in our pipline and be used when rendering results, both on GET, PUT and POST.
+ */
+public class ResultsPageCollector extends AbstractCollector<Entity, ResultsPage>
+ implements Collector<Entity, ResultsPage> {
+
+
+ @Override
+ public Observable<ResultsPage> call( final Observable<FilterResult<Entity>> filterResultObservable ) {
+
+ final int limit = pipelineContext.getLimit();
+
+ return filterResultObservable.buffer( limit ).flatMap( buffer -> Observable.from( buffer ).collect(
+ () -> new ResultsPageWithCursorCollector( limit ), ( collector, entity ) -> {
+ collector.add( entity );
+ } ) ).map( resultsPageCollector -> new ResultsPage( resultsPageCollector.results,
+ new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit() ) );
+ }
+
+
+ /**
+ * A collector that will aggregate our results together
+ */
+ private static class ResultsPageWithCursorCollector {
+
+
+ private final List<Entity> results;
+
+ private Optional<EdgePath> lastPath;
+
+
+ private ResultsPageWithCursorCollector( final int limit ) {
+ this.results = new ArrayList<>( limit );
+ }
+
+
+ public void add( final FilterResult<Entity> result ) {
+ this.results.add( result.getValue() );
+ this.lastPath = result.getPath();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
index eac8a65..004a696 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
@@ -24,11 +24,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.CandidateResultsFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
import org.apache.usergrid.persistence.index.CandidateResults;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.SearchEdge;
@@ -44,8 +46,8 @@ import rx.Observable;
/**
* Command for reading graph edges
*/
-public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<Id, CandidateResults, Integer>
- implements CandidateResultsFilter {
+public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id, Candidate, Integer>
+ implements Filter<Id, Candidate> {
private static final Logger log = LoggerFactory.getLogger( AbstractElasticSearchFilter.class );
@@ -66,7 +68,7 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
@Override
- public Observable<CandidateResults> call( final Observable<Id> observable ) {
+ public Observable<FilterResult<Candidate>> call( final Observable<FilterResult<Id>> observable ) {
//get the graph manager
final ApplicationEntityIndex applicationEntityIndex =
@@ -80,12 +82,12 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
//return all ids that are emitted from this edge
- return observable.flatMap( id -> {
+ return observable.flatMap( idFilterResult -> {
- final SearchEdge searchEdge = getSearchEdge( id );
+ final SearchEdge searchEdge = getSearchEdge( idFilterResult.getValue() );
- final Observable<CandidateResults> candidates = Observable.create( subscriber -> {
+ final Observable<FilterResult<Candidate>> candidates = Observable.create( subscriber -> {
//our offset to our start value. This will be set the first time we emit
//after we receive new ids, we want to reset this to 0
@@ -98,19 +100,14 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
subscriber.onStart();
- //emit while we have values from ES
- while ( true ) {
+ //emit while we have values from ES and someone is subscribed
+ while ( !subscriber.isUnsubscribed() ) {
try {
final CandidateResults candidateResults =
applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet );
- currentOffSet += candidateResults.size();
-
- //set the cursor for the next value
- setCursor( currentOffSet );
-
/**
* No candidates, we're done
*/
@@ -119,7 +116,25 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
return;
}
- subscriber.onNext( candidateResults );
+
+ for( CandidateResult candidateResult: candidateResults){
+
+ //our subscriber unsubscribed, break out
+ if(subscriber.isUnsubscribed()){
+ return;
+ }
+
+ final Candidate candidate = new Candidate( candidateResult, searchEdge );
+
+ final FilterResult<Candidate>
+ result = createFilterResult( candidate, currentOffSet, idFilterResult.getPath() );
+
+ subscriber.onNext( result );
+
+ currentOffSet++;
+ }
+
+
}
catch ( Throwable t ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
new file mode 100644
index 0000000..ab9d5d9
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.SearchEdge;
+
+
+/**
+ * Create a candidate. This holds the original candidate, as well as the search scope it was found it
+ */
+public class Candidate {
+
+ private final CandidateResult candidateResult;
+ private final SearchEdge searchEdge;
+
+
+ /**
+ * Create a new Candidate for further processing
+ * @param candidateResult The candidate result
+ * @param searchEdge The search edge this was searched on
+ */
+ public Candidate( final CandidateResult candidateResult, final SearchEdge searchEdge ) {
+ this.candidateResult = candidateResult;
+ this.searchEdge = searchEdge;
+ }
+
+
+ public CandidateResult getCandidateResult() {
+ return candidateResult;
+ }
+
+
+ public SearchEdge getSearchEdge() {
+ return searchEdge;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
new file mode 100644
index 0000000..d30917c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from an incoming CandidateResult emissions into entities, then streams them on
+ * performs internal buffering for efficiency. Note that all entities may not be emitted if our load crosses page boundaries. It is up to the
+ * collector to determine when to stop streaming entities.
+ */
+public class CandidateEntityFilter extends AbstractFilter<Candidate, Entity>
+ implements Filter<Candidate, Entity> {
+
+ private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+ private final EntityIndexFactory entityIndexFactory;
+
+
+ @Inject
+ public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final EntityIndexFactory entityIndexFactory ) {
+ this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ this.entityIndexFactory = entityIndexFactory;
+ }
+
+
+ @Override
+ public Observable<FilterResult<Entity>> call(
+ final Observable<FilterResult<Candidate>> candidateResultsObservable ) {
+
+
+ /**
+ * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results
+ * objects
+ */
+
+ final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
+ final EntityCollectionManager entityCollectionManager =
+ entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+
+ final ApplicationEntityIndex applicationIndex =
+ entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+ //buffer them to get a page size we can make 1 network hop
+ final Observable<FilterResult<Entity>> searchIdSetObservable = candidateResultsObservable.buffer( pipelineContext.getLimit() )
+
+ //load them
+ .flatMap( candidateResults -> {
+ //flatten toa list of ids to load
+ final Observable<List<Id>> candidateIds =
+ Observable.from( candidateResults ).map( filterResultCandidate -> filterResultCandidate.getValue().getCandidateResult().getId() ).toList();
+
+ //load the ids
+ final Observable<EntitySet> entitySetObservable =
+ candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) );
+
+ //now we have a collection, validate our canidate set is correct.
+
+ return entitySetObservable.map(
+ entitySet -> new EntityVerifier( applicationIndex.createBatch(), entitySet,
+ candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() )
+ .flatMap(
+ entityCollector -> Observable.from( entityCollector.getResults() ) );
+ } );
+
+ //if we filter all our results, we want to continue to try the next page
+ return searchIdSetObservable;
+ }
+
+
+
+
+ /**
+ * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally
+ */
+ private static final class EntityVerifier {
+
+ private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
+ private List<FilterResult<Entity>> results = new ArrayList<>();
+
+ private final EntityIndexBatch batch;
+ private final List<FilterResult<Candidate>> candidateResults;
+ private final EntitySet entitySet;
+
+
+ public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet,
+ final List<FilterResult<Candidate>> candidateResults ) {
+ this.batch = batch;
+ this.entitySet = entitySet;
+ this.candidateResults = candidateResults;
+ this.results = new ArrayList<>( entitySet.size() );
+ }
+
+
+ /**
+ * Merge our candidates and our entity set into results
+ */
+ public void merge() {
+
+ for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
+ validate( candidateResult );
+ }
+
+ batch.execute();
+ }
+
+
+ public List<FilterResult<Entity>> getResults() {
+ return results;
+ }
+
+
+ public EntityIndexBatch getBatch() {
+ return batch;
+ }
+
+
+ private void validate( final FilterResult<Candidate> filterResult ) {
+
+ final Candidate candidate = filterResult.getValue();
+ final CandidateResult candidateResult = candidate.getCandidateResult();
+ final SearchEdge searchEdge = candidate.getSearchEdge();
+ final Id candidateId = candidateResult.getId();
+ final UUID candidateVersion = candidateResult.getVersion();
+
+
+ final MvccEntity entity = entitySet.getEntity( candidateId );
+
+
+ //doesn't exist warn and drop
+ if ( entity == null ) {
+ logger.warn(
+ "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra."
+ + " Ignoring since this could be a region sync issue",
+ candidateId, candidateVersion );
+
+
+ //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+ return;
+
+ }
+
+
+ final UUID entityVersion = entity.getVersion();
+ final Id entityId = entity.getId();
+
+
+
+
+
+ //entity is newer than ES version, could be an update or the entity is marked as deleted
+ if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) {
+
+ logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
+ new Object[] { searchEdge, entityId, entityVersion } );
+ batch.deindex( searchEdge, entityId, entityVersion );
+ return;
+ }
+
+ //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
+ //remove the ES record, since the read in cass should cause a read repair, just ignore
+ if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
+
+ logger.warn(
+ "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair "
+ + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
+
+ //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+ return;
+ }
+
+ //they're the same add it
+
+ final Entity returnEntity = entity.getEntity().get();
+
+ final Optional<EdgePath> parent = filterResult.getPath();
+
+ final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
+
+ results.add( toReturn );
+ }
+ }
+}
[02/11] incubator-usergrid git commit: Massive refactor. Paths for
cursor generation are now part of our I/O results. This allows the collector
to take until satisfied, then generate a serializable path.
Posted by to...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
new file mode 100644
index 0000000..d598a2e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from a set of Ids.
+ *
+ * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification
+ */
+public class EntityLoadFilter extends AbstractFilter<Id, Entity> implements Filter<Id, Entity> {
+
+ private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+
+
+ @Inject
+ public EntityLoadFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
+ this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ }
+
+
+ @Override
+ public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
+
+
+ final EntityCollectionManager entityCollectionManager =
+ entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() );
+
+ //it's more efficient to make 1 network hop to get everything, then drop our results if required
+ final Observable<FilterResult<Entity>> entityObservable =
+ filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> {
+
+ final Observable<EntitySet> entitySetObservable =
+ Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
+ .flatMap( ids -> entityCollectionManager.load( ids ) );
+
+
+ //now we have a collection, validate our canidate set is correct.
+
+ return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) )
+ .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+ entityCollector -> Observable.from( entityCollector.getResults() ) );
+ } );
+
+ return entityObservable;
+ }
+
+
+ /**
+ * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this
+ * state is mutable and difficult to represent functionally
+ */
+ private static final class EntityVerifier {
+
+ private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
+ private List<FilterResult<Entity>> results = new ArrayList<>();
+
+ private final List<FilterResult<Id>> candidateResults;
+ private final EntitySet entitySet;
+
+
+ public EntityVerifier( final EntitySet entitySet, final List<FilterResult<Id>> candidateResults ) {
+ this.entitySet = entitySet;
+ this.candidateResults = candidateResults;
+ this.results = new ArrayList<>( entitySet.size() );
+ }
+
+
+ /**
+ * Merge our candidates and our entity set into results
+ */
+ public void merge() {
+
+ for ( final FilterResult<Id> candidateResult : candidateResults ) {
+ validate( candidateResult );
+ }
+ }
+
+
+ public List<FilterResult<Entity>> getResults() {
+ return results;
+ }
+
+
+ private void validate( final FilterResult<Id> filterResult ) {
+
+ final Id candidateId = filterResult.getValue();
+
+
+ final MvccEntity entity = entitySet.getEntity( candidateId );
+
+
+ //doesn't exist warn and drop
+ if ( entity == null || !entity.getEntity().isPresent() ) {
+ logger.warn( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra."
+ + " Ignoring since this could be a region sync issue", candidateId );
+
+
+ //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+ return;
+ }
+
+ //it exists, add it
+
+ final Entity returnEntity = entity.getEntity().get();
+
+ final Optional<EdgePath> parent = filterResult.getPath();
+
+ final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
+
+ results.add( toReturn );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
index 12306fd..7371579 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
@@ -21,8 +21,9 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph;
import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -42,7 +43,7 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeType
/**
* Command for reading graph edges on a connection
*/
-public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, Id, Edge> implements Filter<Id, Id> {
+public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> {
private final GraphManagerFactory graphManagerFactory;
private final String connectionName;
@@ -61,8 +62,9 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I
}
+
@Override
- public Observable<Id> call( final Observable<Id> observable ) {
+ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
//get the graph manager
final GraphManager graphManager = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
@@ -73,20 +75,18 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I
//return all ids that are emitted from this edge
- return observable.flatMap( id -> {
+ return filterResultObservable.flatMap( idFilterResult -> {
//set our our constant state
final Optional<Edge> startFromCursor = getSeekValue();
+ final Id id = idFilterResult.getValue();
final SimpleSearchByIdType search =
new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
entityType, startFromCursor );
- /**
- * TODO, pass a message with pointers to our cursor values to be generated later
- */
- return graphManager.loadEdgesFromSourceByType( search ).doOnNext( edge -> setCursor( edge ) ).map(
- edge -> edge.getTargetNode() );
+ return graphManager.loadEdgesFromSourceByType( search ).map(
+ edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() ));
} );
}
@@ -95,4 +95,6 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I
protected CursorSerializer<Edge> getCursorSerializer() {
return EdgeCursorSerializer.INSTANCE;
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index f3c2a9c..0260d1d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -26,12 +26,10 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
import org.apache.usergrid.persistence.EntityFactory;
import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -51,13 +49,17 @@ public class ObservableQueryExecutor implements QueryExecutor {
public Iterator<Results> iterator;
- public ObservableQueryExecutor( final Observable<PipelineResult<ResultsPage>> resultsObservable ) {
+ public ObservableQueryExecutor( final Observable<ResultsPage> resultsObservable) {
//map to our old results objects, return a default empty if required
this.resultsObservable = resultsObservable.map( resultsPage -> createResults( resultsPage ) ).defaultIfEmpty( new Results() );
}
-
+ /**
+ *
+ * @param cpEntity
+ * @return
+ */
private org.apache.usergrid.persistence.Entity mapEntity( final Entity cpEntity ) {
@@ -72,9 +74,8 @@ public class ObservableQueryExecutor implements QueryExecutor {
return entity;
}
- private Results createResults( final PipelineResult<ResultsPage> pipelineResults ){
+ private Results createResults( final ResultsPage resultsPage ){
- final ResultsPage resultsPage = pipelineResults.getResult();
final List<Entity> entityList = resultsPage.getEntityList();
final List<org.apache.usergrid.persistence.Entity> resultsEntities = new ArrayList<>( entityList.size() );
@@ -85,10 +86,15 @@ public class ObservableQueryExecutor implements QueryExecutor {
final Results results = Results.fromEntities( resultsEntities );
- if(pipelineResults.getCursor().isPresent()) {
- results.setCursor( pipelineResults.getCursor().get() );
- }
+ //add the cursor if our limit is the same
+ if(resultsPage.hasMoreResults()) {
+ final Optional<String> cursor = resultsPage.getResponseCursor().encodeAsString();
+
+ if ( cursor.isPresent() ) {
+ results.setCursor( cursor.get() );
+ }
+ }
return results;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
index 6e15d54..fd65ebf 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
@@ -24,11 +24,11 @@ package org.apache.usergrid.corepersistence.pipeline.cursor;
import org.junit.Test;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticsearchCursorSerializer;
import org.apache.usergrid.corepersistence.pipeline.read.graph.EdgeCursorSerializer;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
-import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
import com.google.common.base.Optional;
@@ -41,7 +41,10 @@ public class CursorTest {
@Test
public void testCursors(){
- ResponseCursor responseCursor = new ResponseCursor();
+
+
+
+
//test encoding edge
@@ -58,13 +61,18 @@ public class CursorTest {
final Integer query2 = 20;
- responseCursor.setCursor( 0, edge1, EdgeCursorSerializer.INSTANCE );
- responseCursor.setCursor( 1, query1, ElasticsearchCursorSerializer.INSTANCE );
+ final EdgePath<Integer> filter3Path = new EdgePath<>( 3, query2, ElasticsearchCursorSerializer.INSTANCE, Optional.absent() );
+
+ final EdgePath<Edge> filter2Path = new EdgePath<Edge>(2, edge2, EdgeCursorSerializer.INSTANCE, Optional.of( filter3Path ));
+
+ final EdgePath<Integer> filter1Path = new EdgePath<>( 1, query1, ElasticsearchCursorSerializer.INSTANCE, Optional.of(filter2Path) );
+
+ final EdgePath<Edge> filter0Path = new EdgePath<>( 0, edge1, EdgeCursorSerializer.INSTANCE, Optional.of( filter1Path ) );
+
- responseCursor.setCursor(2, edge2, EdgeCursorSerializer.INSTANCE);
- responseCursor.setCursor(3, query2, ElasticsearchCursorSerializer.INSTANCE);
+ ResponseCursor responseCursor = new ResponseCursor( Optional.of(filter0Path) );
final Optional<String> cursor = responseCursor.encodeAsString();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
index 1e63df1..a157e47 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
@@ -42,14 +42,10 @@ public class CandidateResults implements Iterable<CandidateResult> {
private final List<CandidateResult> candidates;
private final Collection<SelectFieldMapping> getFieldMappings;
- private final SearchEdge searchEdge;
-
- public CandidateResults( List<CandidateResult> candidates, final Collection<SelectFieldMapping> getFieldMappings,
- final SearchEdge searchEdge ) {
+ public CandidateResults( List<CandidateResult> candidates, final Collection<SelectFieldMapping> getFieldMappings) {
this.candidates = candidates;
this.getFieldMappings = getFieldMappings;
- this.searchEdge = searchEdge;
offset = Optional.absent();
}
@@ -91,11 +87,6 @@ public class CandidateResults implements Iterable<CandidateResult> {
}
- public SearchEdge getSearchEdge() {
- return searchEdge;
- }
-
-
/**
* Get the candidates
* @return
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index 2d4696a..5b67060 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -169,7 +169,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
}
failureMonitor.success();
- return parseResults(searchResponse, parsedQuery, searchEdge, limit, offset);
+ return parseResults(searchResponse, parsedQuery, limit, offset);
}
@@ -227,7 +227,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
/**
* Parse the results and return the canddiate results
*/
- private CandidateResults parseResults( final SearchResponse searchResponse, final ParsedQuery query, final SearchEdge searchEdge,
+ private CandidateResults parseResults( final SearchResponse searchResponse, final ParsedQuery query,
final int limit, final int from ) {
final SearchHits searchHits = searchResponse.getHits();
@@ -244,8 +244,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
candidates.add( candidateResult );
}
- final CandidateResults candidateResults = new CandidateResults( candidates, query.getSelectFieldMappings(),
- searchEdge );
+ final CandidateResults candidateResults = new CandidateResults( candidates, query.getSelectFieldMappings());
// >= seems odd. However if we get an overflow, we need to account for it.
if ( hits.length >= limit ) {
[09/11] incubator-usergrid git commit: Updates observable short
circuit and fixes NPE on empty results
Posted by to...@apache.org.
Updates observable short circuit and fixes NPE on empty results
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5bb77985
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5bb77985
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5bb77985
Branch: refs/heads/two-dot-o-dev
Commit: 5bb77985ba386b72d66a5584057a5d98b3f3e111
Parents: a673c81
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 11:27:25 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 11:27:25 2015 -0600
----------------------------------------------------------------------
.../elasticsearch/AbstractElasticSearchFilter.java | 16 ++++++++--------
.../org/apache/usergrid/persistence/Results.java | 2 +-
2 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5bb77985/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
index 004a696..f403e21 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
@@ -108,20 +108,13 @@ public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id,
final CandidateResults candidateResults =
applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet );
- /**
- * No candidates, we're done
- */
- if ( candidateResults.size() == 0 ) {
- subscriber.onCompleted();
- return;
- }
for( CandidateResult candidateResult: candidateResults){
//our subscriber unsubscribed, break out
if(subscriber.isUnsubscribed()){
- return;
+ return;
}
final Candidate candidate = new Candidate( candidateResult, searchEdge );
@@ -134,6 +127,13 @@ public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id,
currentOffSet++;
}
+ /**
+ * No candidates, we're done
+ */
+ if (candidateResults.size() < limit) {
+ subscriber.onCompleted();
+ return;
+ }
}
catch ( Throwable t ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5bb77985/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
index e572070..fa221f5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
@@ -1293,7 +1293,7 @@ public class Results implements Iterable<Entity> {
/** uses cursor to get next batch of Results (returns null if no cursor) */
public Results getNextPageResults() throws Exception {
if ( queryExecutor == null || !queryExecutor.hasNext() ) {
- return null;
+ return new Results();
}
[05/11] incubator-usergrid git commit: Merge branch 'two-dot-o-dev'
of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into
USERGRID-587
Posted by to...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-587
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/294a7d90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/294a7d90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/294a7d90
Branch: refs/heads/two-dot-o-dev
Commit: 294a7d900784283ff7c038585728418d799b7010
Parents: cd983d6 faafa37
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Apr 30 17:41:05 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Apr 30 17:41:05 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpRelationManager.java | 34 ++---
.../usergrid/persistence/EntityManagerIT.java | 4 +-
.../graph/impl/SimpleSearchByEdge.java | 5 +-
.../usergrid/persistence/index/IndexFig.java | 11 +-
.../persistence/index/IndexRefreshCommand.java | 2 +-
.../index/impl/EsEntityIndexImpl.java | 2 +-
.../index/impl/IndexRefreshCommandImpl.java | 136 +++++++++++--------
.../persistence/index/impl/EntityIndexTest.java | 2 +-
8 files changed, 112 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/294a7d90/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------