You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/05/04 19:32:54 UTC
[04/12] incubator-usergrid git commit: Massive refactor. Paths for
cursor generation are now part of our I/O results. This allows the collector
to take until satisfied, then generate a serializable path.
Massive refactor. Paths for cursor generation are now part of our I/O results. This allows the collector to take until satisfied, then generate a serializable path.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cd983d66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cd983d66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cd983d66
Branch: refs/heads/USERGRID-609
Commit: cd983d66260222985431a775454183c2ed2305ea
Parents: 6d4847a
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Apr 30 17:40:52 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Apr 30 17:40:52 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpRelationManager.java | 5 +-
.../corepersistence/pipeline/Pipeline.java | 9 +-
.../pipeline/PipelineContext.java | 16 +-
.../pipeline/PipelineOperation.java | 39 ++++
.../pipeline/PipelineResult.java | 57 -----
.../pipeline/cursor/ResponseCursor.java | 81 ++++---
.../pipeline/read/AbstractFilter.java | 45 ++++
.../pipeline/read/AbstractPathFilter.java | 109 +++++++++
.../read/AbstractPipelineOperation.java | 44 ----
.../pipeline/read/AbstractSeekingFilter.java | 102 --------
.../pipeline/read/CandidateResultsFilter.java | 31 ---
.../pipeline/read/Collector.java | 13 +-
.../pipeline/read/CollectorFactory.java | 12 +-
.../corepersistence/pipeline/read/EdgePath.java | 79 +++++++
.../corepersistence/pipeline/read/Filter.java | 9 +-
.../pipeline/read/FilterFactory.java | 31 ++-
.../pipeline/read/FilterResult.java | 56 +++++
.../pipeline/read/PipelineOperation.java | 38 ---
.../pipeline/read/ReadPipelineBuilder.java | 5 +-
.../pipeline/read/ReadPipelineBuilderImpl.java | 75 +++---
.../pipeline/read/ResultsPage.java | 26 ++-
.../read/collect/AbstractCollector.java | 46 ++++
.../read/collect/ResultsPageCollector.java | 80 +++++++
.../AbstractElasticSearchFilter.java | 47 ++--
.../pipeline/read/elasticsearch/Candidate.java | 55 +++++
.../elasticsearch/CandidateEntityFilter.java | 234 +++++++++++++++++++
.../read/elasticsearch/CandidateIdFilter.java | 201 ++++++++++++++++
.../CandidateResultsEntityResultsCollector.java | 217 -----------------
.../CandidateResultsIdVerifyFilter.java | 193 ---------------
.../impl/CollectionRefsVerifier.java | 44 ----
.../CollectionResultsLoaderFactoryImpl.java | 65 ------
.../impl/ConnectionRefsVerifier.java | 59 -----
.../ConnectionResultsLoaderFactoryImpl.java | 73 ------
.../impl/ElasticSearchQueryExecutor.java | 224 ------------------
.../read/elasticsearch/impl/EntityVerifier.java | 127 ----------
.../elasticsearch/impl/FilteringLoader.java | 219 -----------------
.../read/elasticsearch/impl/IdsVerifier.java | 46 ----
.../read/elasticsearch/impl/ResultsLoader.java | 43 ----
.../impl/ResultsLoaderFactory.java | 41 ----
.../elasticsearch/impl/ResultsVerifier.java | 52 -----
.../elasticsearch/impl/VersionVerifier.java | 85 -------
.../pipeline/read/entity/EntityIdFilter.java | 53 -----
.../read/entity/EntityLoadCollector.java | 94 --------
.../graph/AbstractReadGraphEdgeByIdFilter.java | 12 +-
.../read/graph/AbstractReadGraphFilter.java | 15 +-
.../pipeline/read/graph/EntityIdFilter.java | 54 +++++
.../pipeline/read/graph/EntityLoadFilter.java | 155 ++++++++++++
.../graph/ReadGraphConnectionByTypeFilter.java | 20 +-
.../results/ObservableQueryExecutor.java | 24 +-
.../pipeline/cursor/CursorTest.java | 20 +-
.../persistence/index/CandidateResults.java | 11 +-
.../impl/EsApplicationEntityIndexImpl.java | 7 +-
52 files changed, 1402 insertions(+), 2096 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 3119934..2790ee1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -32,7 +32,6 @@ import org.springframework.util.Assert;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory;
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor;
@@ -648,7 +647,7 @@ public class CpRelationManager implements RelationManager {
}
- final Observable<PipelineResult<ResultsPage>> resultsObservable = readPipelineBuilder.execute();
+ final Observable<ResultsPage> resultsObservable = readPipelineBuilder.execute();
return new ObservableQueryExecutor( resultsObservable ).next();
}
@@ -917,7 +916,7 @@ public class CpRelationManager implements RelationManager {
}
- final Observable<PipelineResult<ResultsPage>> resultsObservable = readPipelineBuilder.execute();
+ final Observable<ResultsPage> resultsObservable = readPipelineBuilder.execute();
return new ObservableQueryExecutor( resultsObservable ).next();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
index bc93b6c..df6a218 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -25,7 +25,6 @@ import java.util.List;
import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.PipelineOperation;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import com.google.common.base.Optional;
@@ -47,7 +46,6 @@ public class Pipeline<R> {
private final List<PipelineOperation> idPipelineOperationList;
private final Collector<?, R> collector;
private final RequestCursor requestCursor;
- private final ResponseCursor responseCursor;
private final int limit;
@@ -69,7 +67,6 @@ public class Pipeline<R> {
this.limit = limit;
this.requestCursor = new RequestCursor( cursor );
- this.responseCursor = new ResponseCursor();
}
@@ -77,7 +74,7 @@ public class Pipeline<R> {
* Execute the pipline construction, returning an observable of results
* @return
*/
- public Observable<PipelineResult<R>> execute(){
+ public Observable<R> execute(){
Observable traverseObservable = Observable.just( applicationScope.getApplication() );
@@ -99,7 +96,7 @@ public class Pipeline<R> {
//append the optional cursor into the response for the caller to use
- return response.map( result -> new PipelineResult<>( result, responseCursor ) );
+ return response;
}
@@ -111,7 +108,7 @@ public class Pipeline<R> {
private void setState( final PipelineOperation pipelineOperation ) {
- final PipelineContext context = new PipelineContext( applicationScope, requestCursor, responseCursor,
+ final PipelineContext context = new PipelineContext( applicationScope, requestCursor,
limit, idCount );
pipelineOperation.setContext( context );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
index 325f876..018abb7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
@@ -38,16 +38,13 @@ public class PipelineContext {
private final int id;
private final ApplicationScope applicationScope;
private final RequestCursor requestCursor;
- private final ResponseCursor responseCursor;
private final int limit;
- public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor,
- final ResponseCursor responseCursor, final int limit, final int id ) {
+ public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor, final int limit, final int id ) {
this.applicationScope = applicationScope;
this.requestCursor = requestCursor;
- this.responseCursor = responseCursor;
this.limit = limit;
this.id = id;
}
@@ -64,7 +61,7 @@ public class PipelineContext {
/**
- * Get our cursor value if present
+ * Get our cursor value if present from our pipline
* @param serializer
*/
public <T extends Serializable> Optional<T> getCursor( final CursorSerializer<T> serializer ) {
@@ -73,15 +70,6 @@ public class PipelineContext {
return Optional.fromNullable( value );
}
-
- /**
- * Set the cursor value into our resposne
- */
- public <T extends Serializable> void setCursorValue( final T value, final CursorSerializer<T> serializer ) {
- responseCursor.setCursor( id, value, serializer );
- }
-
-
/**
* Get the limit for this execution
* @return
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
new file mode 100644
index 0000000..d2fa16c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+
+import rx.Observable;
+
+
+/**
+ * Interface for filtering commands. All filters must take an observable of Id's as an input. Output is then determined by subclasses.
+ * This takes an input of Id, performs some operation, and emits values for further processing in the Observable
+ * pipeline
+ * @param <T> The input type of the filter value
+ * @param <R> The output type of the filter value
+ */
+public interface PipelineOperation<T, R> extends Observable.Transformer<FilterResult<T>, R> {
+
+ void setContext(final PipelineContext pipelineContext);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
deleted file mode 100644
index fe8604e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline;
-
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
-
-import com.google.common.base.Optional;
-
-
-/**
- * Intermediate observable that will return results, as well as an optional cursor
- * @param <R>
- */
-public class PipelineResult<R> {
-
-
- private final R result;
-
- private final ResponseCursor responseCursor;
-
-
- public PipelineResult( final R result, final ResponseCursor responseCursor ) {
- this.result = result;
- this.responseCursor = responseCursor;
- }
-
-
- /**
- * If the user requests our cursor, return the cursor
- * @return
- */
- public Optional<String> getCursor(){
- return this.responseCursor.encodeAsString();
- }
-
- public R getResult(){
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
index f1c8c24..dbd8b88 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
@@ -20,12 +20,10 @@
package org.apache.usergrid.corepersistence.pipeline.cursor;
-import java.io.Serializable;
import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
-import com.fasterxml.jackson.core.Base64Variant;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -41,71 +39,72 @@ public class ResponseCursor {
private static final ObjectMapper MAPPER = CursorSerializerUtil.getMapper();
+
/**
- * We use a map b/c some indexes might be skipped
+ * The pointer to the first edge path. Evaluation is lazily performed in the case the caller does not care about
+ * the cursor.
*/
- private Map<Integer, CursorEntry<?>> cursors = new HashMap<>();
+ private final Optional<EdgePath> edgePath;
+ private Optional<String> encodedValue = null;
- /**
- * Set the possible cursor value into the index. DOES NOT parse the cursor. This is intentional for performance
- */
- public <T extends Serializable> void setCursor( final int id, final T cursor,
- final CursorSerializer<T> serializer ) {
- final CursorEntry<T> newEntry = new CursorEntry<>( cursor, serializer );
- cursors.put( id, newEntry );
- }
+ public ResponseCursor( final Optional<EdgePath> edgePath ) {this.edgePath = edgePath;}
/**
- * now we're done, encode as a string
+ * Lazyily encoded deliberately. If the user doesn't care about a cursor and is using streams, we dont' want to take the
+ * time to calculate it
*/
public Optional<String> encodeAsString() {
- try {
- if(cursors.isEmpty()){
- return Optional.absent();
- }
+ //always return cached if we are called 2x
+ if ( encodedValue != null ) {
+ return encodedValue;
+ }
+
+ if ( !edgePath.isPresent() ) {
+ encodedValue = Optional.absent();
+ return encodedValue;
+ }
+
+
+ try {
+ //no edge path, short circuit
final ObjectNode map = MAPPER.createObjectNode();
- for ( Map.Entry<Integer, CursorEntry<?>> entry : cursors.entrySet() ) {
- final CursorEntry cursorEntry = entry.getValue();
+ Optional<EdgePath> current = edgePath;
- final JsonNode serialized = cursorEntry.serializer.toNode( MAPPER, cursorEntry.cursor );
- map.put( entry.getKey().toString(), serialized );
- }
+ //traverse each edge and add them to our json
+ do {
+
+ final EdgePath edgePath = current.get();
+ final Object cursorValue = edgePath.getCursorValue();
+ final CursorSerializer serializer = edgePath.getSerializer();
+ final int filterId = edgePath.getFilterId();
+
+ final JsonNode serialized = serializer.toNode( MAPPER, cursorValue );
+ map.put( String.valueOf( filterId ), serialized );
+ current = current.get().getPrevious();
+ }
+ while ( current.isPresent() );
- final byte[] output = MAPPER.writeValueAsBytes(map);
+ final byte[] output = MAPPER.writeValueAsBytes( map );
//generate a base64 url save string
final String value = Base64.getUrlEncoder().encodeToString( output );
- return Optional.of( value );
-
+ encodedValue = Optional.of( value );
}
catch ( JsonProcessingException e ) {
throw new CursorParseException( "Unable to serialize cursor", e );
}
- }
-
- /**
- * Interal pointer to the cursor and it's serialzed value
- */
- private static final class CursorEntry<T> {
- private final T cursor;
- private final CursorSerializer<T> serializer;
-
-
- private CursorEntry( final T cursor, final CursorSerializer<T> serializer ) {
- this.cursor = cursor;
- this.serializer = serializer;
- }
+ return encodedValue;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
new file mode 100644
index 0000000..e4d5d44
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+
+
+/**
+ * Basic functionality for our commands to handle cursor IO
+ * @param <T> the input type
+ * @param <R> The output Type
+ */
+public abstract class AbstractFilter<T, R> implements Filter<T, R> {
+
+
+ protected PipelineContext pipelineContext;
+
+
+ @Override
+ public void setContext( final PipelineContext pipelineContext ) {
+ this.pipelineContext = pipelineContext;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
new file mode 100644
index 0000000..c68dc4a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import java.io.Serializable;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Abstract class for filters to extend that require a cursor
+ * @param <T> The input type
+ * @param <R> The response type
+ * @param <C> The cursor type
+ */
+public abstract class AbstractPathFilter<T, R, C extends Serializable> extends AbstractFilter<T, R> implements Filter<T, R> {
+
+
+
+ //TODO not a big fan of this, but not sure how to build resume otherwise
+ private CursorSeek<C> cursorSeek;
+
+
+ /**
+ * Return the parsed value of the cursor from the last request, if it exists
+ */
+ protected Optional<C> getSeekValue() {
+
+ if(cursorSeek == null) {
+ final Optional<C> cursor = pipelineContext.getCursor( getCursorSerializer() );
+ cursorSeek = new CursorSeek<>( cursor );
+ }
+
+ return cursorSeek.getSeekValue();
+
+ }
+
+
+ /**
+ * Sets the cursor into our pipeline context
+ */
+ protected FilterResult<R> createFilterResult( final R emit, final C cursorValue, final Optional<EdgePath> parent ){
+
+
+ //create a current path, and append our parent path to it
+ final EdgePath<C> newEdgePath =
+ new EdgePath<>( pipelineContext.getId(), cursorValue, getCursorSerializer(), parent );
+
+ //emit our value with the parent path
+ return new FilterResult<>( emit, Optional.of( newEdgePath ) );
+
+ }
+
+
+ /**
+ * Return the class to be used when parsing the cursor
+ */
+ protected abstract CursorSerializer<C> getCursorSerializer();
+
+
+ /**
+ * An internal class that holds a mutable state. When resuming, we only ever honor the seek value on the first call. Afterwards, we will seek from the beginning on newly emitted values.
+ * Calling get will return the first value to seek, or absent if not specified. Subsequent calls will return absent. Callers should treat the results as seek values for each operation
+ */
+ protected static class CursorSeek<C> {
+
+ private Optional<C> seek;
+
+ private CursorSeek(final Optional<C> cursorValue){
+ seek = cursorValue;
+ }
+
+
+ /**
+ * Get the seek value to use when searching
+ * @return
+ */
+ public Optional<C> getSeekValue(){
+ final Optional<C> toReturn = seek;
+
+ seek = Optional.absent();
+
+ return toReturn;
+ }
+
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
deleted file mode 100644
index 8d7f106..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
-
-
-/**
- * Basic functionality for our commands to handle cursor IO
- * @param <T> the input type
- * @param <R> The output Type
- */
-public abstract class AbstractPipelineOperation<T, R> implements PipelineOperation<T, R> {
-
-
- protected PipelineContext pipelineContext;
-
-
- @Override
- public void setContext( final PipelineContext pipelineContext ) {
- this.pipelineContext = pipelineContext;
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
deleted file mode 100644
index c23a1b7..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import java.io.Serializable;
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-
-import com.google.common.base.Optional;
-
-
-/**
- * Abstract class for filters to extend that require a cursor
- * @param <T> The input type
- * @param <R> The response type
- * @param <C> The cursor type
- */
-public abstract class AbstractSeekingFilter<T, R, C extends Serializable> extends AbstractPipelineOperation<T, R> implements Filter<T, R> {
-
-
-
- //TODO not a big fan of this, but not sure how to build resume otherwise
- private CursorSeek<C> cursorSeek;
-
-
- /**
- * Return the parsed value of the cursor from the last request, if it exists
- */
- protected Optional<C> getSeekValue() {
-
- if(cursorSeek == null) {
- final Optional<C> cursor = pipelineContext.getCursor( getCursorSerializer() );
- cursorSeek = new CursorSeek<>( cursor );
- }
-
- return cursorSeek.getSeekValue();
-
- }
-
-
- /**
- * Sets the cursor into our pipeline context
- * @param newValue
- */
- protected void setCursor(final C newValue){
- pipelineContext.setCursorValue( newValue, getCursorSerializer() );
- }
-
-
- /**
- * Return the class to be used when parsing the cursor
- */
- protected abstract CursorSerializer<C> getCursorSerializer();
-
-
- /**
- * An internal class that holds a mutable state. When resuming, we only ever honor the seek value on the first call. Afterwards, we will seek from the beginning on newly emitted values.
- * Calling get will return the first value to seek, or absent if not specified. Subsequent calls will return absent. Callers should treat the results as seek values for each operation
- */
- protected static class CursorSeek<C> {
-
- private Optional<C> seek;
-
- private CursorSeek(final Optional<C> cursorValue){
- seek = cursorValue;
- }
-
-
- /**
- * Get the seek value to use when searching
- * @return
- */
- public Optional<C> getSeekValue(){
- final Optional<C> toReturn = seek;
-
- seek = Optional.absent();
-
- return toReturn;
- }
-
-
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
deleted file mode 100644
index 4e6d06e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- * Traverses edges in the graph. Either by query or graph traversal. Take an observable of ids, and emits
- * an observable of ids
- */
-public interface CandidateResultsFilter extends PipelineOperation<Id, CandidateResults> {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
index 69d929c..e28ce44 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
@@ -20,11 +20,18 @@
package org.apache.usergrid.corepersistence.pipeline.read;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+
+
/**
- * A command that is used to reduce our stream of results into a final output
- * @param <T>
+ * A command that is used to reduce our stream of results into a stream of final batch outputs. When used
+ * no further transformation or encoding should occur. Otherwise EdgePath data will be lost, and serialization cannot occur
+ * across requests
+ *
+ * @param <T> The input type
+ * @param <R> The output type
*/
-public interface Collector<T, R> extends PipelineOperation<T, R> {
+public interface Collector<T, R> extends PipelineOperation<T,R> {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
index 6893b34..dd200b5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
@@ -20,8 +20,7 @@
package org.apache.usergrid.corepersistence.pipeline.read;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsEntityResultsCollector;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollector;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
/**
@@ -29,16 +28,11 @@ import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollec
*/
public interface CollectorFactory {
- /**
- * Generate a new instance of the command with the specified parameters
- */
- EntityLoadCollector entityLoadCollector();
/**
- * Get the collector for collection candidate results to entities
+ * Get the results page collector
* @return
*/
- CandidateResultsEntityResultsCollector candidateResultsEntityResultsCollector();
-
+ ResultsPageCollector getResultsPageCollector();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java
new file mode 100644
index 0000000..c560fad
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A path from our input element to our emitted element. A list of EdgePaths comprise a path through the graph. The chains of edge paths will result
+ * in a cursor when aggregated. If a graph traversal is the following
+ *
+ * applicationId(1) - "users" -> userId(2) - "devices" -> deviceId(3). There would be 2 EdgePath
+ *
+ * EdgePath("users"->userId(2)) <- parent - EdgePath("devices" -> deviceId(3))
+ */
+public class EdgePath<C> {
+
+
+ private final int filterId;
+ private final C cursorValue;
+ private final CursorSerializer<C> serializer;
+ private final Optional<EdgePath> previous;
+
+
+ /**
+ *
+ * @param filterId The id of the filter that generated this path
+ * @param cursorValue The value to resume seeking on the path
+ * @param serializer The serializer to serialize the value
+ * @param parent The parent graph path edge to reach this path
+ */
+ public EdgePath( final int filterId, final C cursorValue, final CursorSerializer<C> serializer,
+ final Optional<EdgePath> parent ) {
+ this.filterId = filterId;
+ this.cursorValue = cursorValue;
+ this.serializer = serializer;
+ this.previous = parent;
+ }
+
+
+ public C getCursorValue() {
+ return cursorValue;
+ }
+
+
+ public int getFilterId() {
+ return filterId;
+ }
+
+
+ public Optional<EdgePath> getPrevious() {
+ return previous;
+ }
+
+
+ public CursorSerializer<C> getSerializer() {
+ return serializer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
index ace62db..054a85a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
@@ -20,11 +20,12 @@
package org.apache.usergrid.corepersistence.pipeline.read;
-import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
/**
- * Traverses edges in the graph. Either by query or graph traversal. Take an observable of ids, and emits
- * an observable of ids
+ * Traverses edges in the graph. Either by query or graph traversal. Take an observable of FilterResult, and emits
+ * an observable of FilterResults. Filters should never emit groups or objects that represent collections. Items should
+ * always be emitted 1 at a time. It is the responsibility of the collector to aggregate results.
*/
-public interface Filter<T, R> extends PipelineOperation<T, R> {}
+public interface Filter<T, R> extends PipelineOperation<T, FilterResult<R>> {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
index 078d981..c465516 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -20,10 +20,12 @@
package org.apache.usergrid.corepersistence.pipeline.read;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsIdVerifyFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateIdFilter;
import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchCollectionFilter;
import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchConnectionFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter;
import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter;
import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter;
import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter;
@@ -43,6 +45,7 @@ public interface FilterFactory {
/**
* Generate a new instance of the command with the specified parameters
+ *
* @param collectionName The collection name to use when reading the graph
*/
ReadGraphCollectionFilter readGraphCollectionFilter( final String collectionName );
@@ -57,12 +60,14 @@ public interface FilterFactory {
/**
* Generate a new instance of the command with the specified parameters
+ *
* @param connectionName The connection name to use when traversing the graph
*/
ReadGraphConnectionFilter readGraphConnectionFilter( final String connectionName );
/**
* Generate a new instance of the command with the specified parameters
+ *
* @param connectionName The connection name to use when traversing the graph
* @param entityType The entity type to use when traversing the graph
*/
@@ -72,13 +77,15 @@ public interface FilterFactory {
/**
* Read a connection directly between two identifiers
+ *
* @param connectionName The connection name to use when traversing the graph
- * @param targetId The target Id to use when traversing the graph
+ * @param targetId The target Id to use when traversing the graph
*/
ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter( final String connectionName, final Id targetId );
/**
* Generate a new instance of the command with the specified parameters
+ *
* @param query The query to use when querying the entities in the collection
* @param collectionName The collection name to use when querying
*/
@@ -90,6 +97,7 @@ public interface FilterFactory {
/**
* Generate a new instance of the command with the specified parameters
+ *
* @param query The query to use when querying the entities in the connection
* @param connectionName The type of connection to query
* @param connectedEntityType The type of entity in the connection. Leave absent to query all entity types
@@ -102,13 +110,24 @@ public interface FilterFactory {
/**
- * Get a candidate ids verifier for collection results. Should be inserted into pipelines where a query filter is an intermediate step,
- * not a final filter before collectors
+ * Generate a new instance of the command with the specified parameters
+ */
+ EntityLoadFilter entityLoadFilter();
+
+ /**
+ * Get the collector for collection candidate results to entities
*/
- CandidateResultsIdVerifyFilter candidateResultsIdVerifyFilter();
+ CandidateEntityFilter candidateEntityFilter();
+
+ /**
+ * Get a candidate ids verifier for collection results. Should be inserted into pipelines where a query filter is
+ * an intermediate step, not a final filter before collectors
+ */
+ CandidateIdFilter candidateResultsIdVerifyFilter();
/**
* Get an entity id filter. Used as a 1.0->2.0 bridge since we're not doing full traversals
+ *
* @param entityId The entity id to emit
*/
EntityIdFilter getEntityIdFilter( final Id entityId );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
new file mode 100644
index 0000000..3c41a2b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A bean that is passed between filters with immutable cursor state
+ * @param <T>
+ */
+public class FilterResult<T> {
+ private final T value;
+ private final Optional<EdgePath> path;
+
+
+ /**
+ * Create a new immutable filtervalue
+ * @param value The value the filter emits
+ * @param path The path to this value, if created
+ */
+ public FilterResult( final T value, final Optional<EdgePath> path ) {
+ this.value = value;
+ this.path = path;
+ }
+
+
+ public T getValue() {
+ return value;
+ }
+
+
+ public Optional<EdgePath> getPath() {
+ return path;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
deleted file mode 100644
index 28bba36..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
-
-import rx.Observable;
-
-
-/**
- * Interface for filtering commands. All filters must take an observable of Id's as an input. Output is then determined by subclasses.
- * This takes an input of Id, performs some operation, and emits values for further processing in the Observable
- * pipeline
- * @param <T> The input type
- * @param <R>
- */
-public interface PipelineOperation< T, R> extends Observable.Transformer<T, R> {
-
- void setContext(final PipelineContext pipelineContext);
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
index 25ab03e..d0e87b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
@@ -20,9 +20,6 @@
package org.apache.usergrid.corepersistence.pipeline.read;
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Optional;
@@ -103,5 +100,5 @@ public interface ReadPipelineBuilder {
* Load our entity results when our previous filter calls graph
* @return
*/
- Observable<PipelineResult<ResultsPage>> execute();
+ Observable<ResultsPage> execute();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
index 4ecfb47..ffb9f7d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
@@ -24,9 +24,9 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.usergrid.corepersistence.pipeline.Pipeline;
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsEntityResultsCollector;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollector;
+import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter;
+import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -52,6 +52,8 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
private final ApplicationScope applicationScope;
+ private final CollectorFactory collectorFactory;
+
/**
* Our pointer to our collect filter. Set or cleared with each operation that's performed so the correct results are
@@ -70,6 +72,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
this.filterFactory = filterFactory;
this.applicationScope = applicationScope;
+ this.collectorFactory = collectorFactory;
//init our cursor to empty
this.cursor = Optional.absent();
@@ -78,7 +81,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
this.limit = DEFAULT_LIMIT;
- this.collectorState = new CollectorState( collectorFactory );
+ this.collectorState = new CollectorState( );
this.filters = new ArrayList<>();
}
@@ -120,7 +123,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
filters.add( filterFactory.readGraphCollectionByIdFilter( collectionName, entityId ) );
- this.collectorState.setEntityLoaderCollector();
+ this.collectorState.setIdEntityLoaderFilter();
return this;
}
@@ -132,7 +135,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
filters.add( filterFactory.readGraphCollectionFilter( collectionName ) );
- this.collectorState.setEntityLoaderCollector();
+ this.collectorState.setIdEntityLoaderFilter();
return this;
}
@@ -147,7 +150,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
filters.add( filterFactory.elasticSearchCollectionFilter( query, collectionName, entityType ) );
- this.collectorState.setCandidateResultsEntityResultsCollector();
+ this.collectorState.setCandidateEntityFilter();
return this;
}
@@ -159,7 +162,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
ValidationUtils.verifyIdentity( entityId );
filters.add( filterFactory.readGraphConnectionByIdFilter( connectionName, entityId ) );
- collectorState.setEntityLoaderCollector();
+ collectorState.setIdEntityLoaderFilter();
return this;
}
@@ -169,7 +172,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
public ReadPipelineBuilder getConnection( final String connectionName ) {
Preconditions.checkNotNull( connectionName, "connectionName must not be null" );
filters.add( filterFactory.readGraphConnectionFilter( connectionName ) );
- collectorState.setEntityLoaderCollector();
+ collectorState.setIdEntityLoaderFilter();
return this;
}
@@ -182,7 +185,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
filters.add( filterFactory.readGraphConnectionByTypeFilter( connectionName, entityType ) );
- collectorState.setEntityLoaderCollector();
+ collectorState.setIdEntityLoaderFilter();
return this;
}
@@ -196,17 +199,25 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
Preconditions.checkNotNull( query, "query must not be null" );
filters.add( filterFactory.elasticSearchConnectionFilter( query, connectionName, entityType ) );
- collectorState.setCandidateResultsEntityResultsCollector();
+ collectorState.setCandidateEntityFilter();
return this;
}
@Override
- public Observable<PipelineResult<ResultsPage>> execute() {
+ public Observable<ResultsPage> execute() {
ValidationUtils.validateApplicationScope( applicationScope );
- final Collector<?, ResultsPage> collector = collectorState.getCollector();
+
+ //add our last filter that will generate entities
+ final Filter<?, Entity> finalFilter = collectorState.getFinalFilter();
+
+ filters.add( finalFilter );
+
+
+ //execute our collector
+ final Collector<?, ResultsPage> collector = collectorFactory.getResultsPageCollector();
Preconditions.checkNotNull( collector,
"You have not specified an operation that creates a collection filter. This is required for loading "
@@ -229,46 +240,52 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
* A mutable state for our collectors. Rather than create a new instance each time, we create a singleton
* collector
*/
- private static final class CollectorState {
- private final CollectorFactory collectorFactory;
+ private final class CollectorState {
+
- private EntityLoadCollector entityLoadCollector;
+ private EntityLoadFilter entityLoadCollector;
- private CandidateResultsEntityResultsCollector candidateResultsEntityResultsCollector;
+ private CandidateEntityFilter candidateEntityFilter;
+ private Filter entityLoadFilter;
- private Collector<?, ResultsPage> collector = null;
- private CollectorState( final CollectorFactory collectorFactory ) {this.collectorFactory = collectorFactory;}
+ private CollectorState( ){}
- public void setEntityLoaderCollector() {
+ /**
+ * Set our final filter to be a load entity by Id filter
+ */
+ public void setIdEntityLoaderFilter() {
if ( entityLoadCollector == null ) {
- entityLoadCollector = collectorFactory.entityLoadCollector();
+ entityLoadCollector = filterFactory.entityLoadFilter();
}
- collector = entityLoadCollector;
+ entityLoadFilter = entityLoadCollector;
}
- public void setCandidateResultsEntityResultsCollector() {
- if ( candidateResultsEntityResultsCollector == null ) {
- candidateResultsEntityResultsCollector = collectorFactory.candidateResultsEntityResultsCollector();
+ /**
+ * Set our final filter to be a load entity by candidate filter
+ */
+ public void setCandidateEntityFilter() {
+ if ( candidateEntityFilter == null ) {
+ candidateEntityFilter = filterFactory.candidateEntityFilter();
}
- collector = candidateResultsEntityResultsCollector;
+ entityLoadFilter = candidateEntityFilter;
}
public void clear() {
- collector = null;
+ entityLoadFilter = null;
}
- public Collector<?, ResultsPage> getCollector() {
- return collector;
+ public Filter<?, Entity> getFinalFilter() {
+ return entityLoadFilter;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
index 198ac67..1810d65 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
@@ -22,18 +22,28 @@ package org.apache.usergrid.corepersistence.pipeline.read;
import java.util.List;
+import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
import org.apache.usergrid.persistence.model.entity.Entity;
/**
- * An encapsulation of entities as a group of responses. Ordered by the requesting filters. Each set should be considered a "page" of results.
+ * An encapsulation of entities as a group of responses. Ordered by the requesting filters. Each set should be
+ * considered a "page" of results. A hold over from 1.0. We shouldn't need this when we fully move away from the EM/RM
*/
public class ResultsPage {
private final List<Entity> entityList;
+ private final int limit;
- public ResultsPage( final List<Entity> entityList ) {this.entityList = entityList;}
+ private final ResponseCursor responseCursor;
+
+
+ public ResultsPage( final List<Entity> entityList, final ResponseCursor responseCursor, final int limit ) {
+ this.entityList = entityList;
+ this.responseCursor = responseCursor;
+ this.limit = limit;
+ }
public List<Entity> getEntityList() {
@@ -43,9 +53,15 @@ public class ResultsPage {
/**
* Return true if the results page is empty
- * @return
*/
- public boolean isEmpty(){
- return entityList == null || entityList.isEmpty();
+ public boolean hasMoreResults() {
+ return entityList != null && entityList.size() == limit;
+ }
+
+
+
+
+ public ResponseCursor getResponseCursor() {
+ return responseCursor;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
new file mode 100644
index 0000000..1c5175d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+
+
+/**
+ * Basic functionality for our commands to handle cursor IO
+ * @param <T> the input type
+ * @param <R> The output Type
+ */
+public abstract class AbstractCollector<T, R> implements Collector<T, R> {
+
+
+ protected PipelineContext pipelineContext;
+
+
+ @Override
+ public void setContext( final PipelineContext pipelineContext ) {
+ this.pipelineContext = pipelineContext;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
new file mode 100644
index 0000000..84654aa
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
+import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * Takes entities and collects them into results. This mostly exists for 1.0 compatibility. Eventually this will
+ * become the only collector in our pipline and be used when rendering results, both on GET, PUT and POST.
+ */
+public class ResultsPageCollector extends AbstractCollector<Entity, ResultsPage>
+ implements Collector<Entity, ResultsPage> {
+
+
+ @Override
+ public Observable<ResultsPage> call( final Observable<FilterResult<Entity>> filterResultObservable ) {
+
+ final int limit = pipelineContext.getLimit();
+
+ return filterResultObservable.buffer( limit ).flatMap( buffer -> Observable.from( buffer ).collect(
+ () -> new ResultsPageWithCursorCollector( limit ), ( collector, entity ) -> {
+ collector.add( entity );
+ } ) ).map( resultsPageCollector -> new ResultsPage( resultsPageCollector.results,
+ new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit() ) );
+ }
+
+
+ /**
+ * A collector that will aggregate our results together
+ */
+ private static class ResultsPageWithCursorCollector {
+
+
+ private final List<Entity> results;
+
+ private Optional<EdgePath> lastPath;
+
+
+ private ResultsPageWithCursorCollector( final int limit ) {
+ this.results = new ArrayList<>( limit );
+ }
+
+
+ public void add( final FilterResult<Entity> result ) {
+ this.results.add( result.getValue() );
+ this.lastPath = result.getPath();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
index eac8a65..004a696 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
@@ -24,11 +24,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.CandidateResultsFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
import org.apache.usergrid.persistence.index.CandidateResults;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.SearchEdge;
@@ -44,8 +46,8 @@ import rx.Observable;
/**
* Command for reading graph edges
*/
-public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<Id, CandidateResults, Integer>
- implements CandidateResultsFilter {
+public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id, Candidate, Integer>
+ implements Filter<Id, Candidate> {
private static final Logger log = LoggerFactory.getLogger( AbstractElasticSearchFilter.class );
@@ -66,7 +68,7 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
@Override
- public Observable<CandidateResults> call( final Observable<Id> observable ) {
+ public Observable<FilterResult<Candidate>> call( final Observable<FilterResult<Id>> observable ) {
//get the graph manager
final ApplicationEntityIndex applicationEntityIndex =
@@ -80,12 +82,12 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
//return all ids that are emitted from this edge
- return observable.flatMap( id -> {
+ return observable.flatMap( idFilterResult -> {
- final SearchEdge searchEdge = getSearchEdge( id );
+ final SearchEdge searchEdge = getSearchEdge( idFilterResult.getValue() );
- final Observable<CandidateResults> candidates = Observable.create( subscriber -> {
+ final Observable<FilterResult<Candidate>> candidates = Observable.create( subscriber -> {
//our offset to our start value. This will be set the first time we emit
//after we receive new ids, we want to reset this to 0
@@ -98,19 +100,14 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
subscriber.onStart();
- //emit while we have values from ES
- while ( true ) {
+ //emit while we have values from ES and someone is subscribed
+ while ( !subscriber.isUnsubscribed() ) {
try {
final CandidateResults candidateResults =
applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet );
- currentOffSet += candidateResults.size();
-
- //set the cursor for the next value
- setCursor( currentOffSet );
-
/**
* No candidates, we're done
*/
@@ -119,7 +116,25 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
return;
}
- subscriber.onNext( candidateResults );
+
+ for( CandidateResult candidateResult: candidateResults){
+
+ //our subscriber unsubscribed, break out
+ if(subscriber.isUnsubscribed()){
+ return;
+ }
+
+ final Candidate candidate = new Candidate( candidateResult, searchEdge );
+
+ final FilterResult<Candidate>
+ result = createFilterResult( candidate, currentOffSet, idFilterResult.getPath() );
+
+ subscriber.onNext( result );
+
+ currentOffSet++;
+ }
+
+
}
catch ( Throwable t ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
new file mode 100644
index 0000000..ab9d5d9
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.SearchEdge;
+
+
+/**
+ * Create a candidate. This holds the original candidate, as well as the search scope it was found it
+ */
+public class Candidate {
+
+ private final CandidateResult candidateResult;
+ private final SearchEdge searchEdge;
+
+
+ /**
+ * Create a new Candidate for further processing
+ * @param candidateResult The candidate result
+ * @param searchEdge The search edge this was searched on
+ */
+ public Candidate( final CandidateResult candidateResult, final SearchEdge searchEdge ) {
+ this.candidateResult = candidateResult;
+ this.searchEdge = searchEdge;
+ }
+
+
+ public CandidateResult getCandidateResult() {
+ return candidateResult;
+ }
+
+
+ public SearchEdge getSearchEdge() {
+ return searchEdge;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
new file mode 100644
index 0000000..d30917c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from an incoming CandidateResult emissions into entities, then streams them on
+ * performs internal buffering for efficiency. Note that all entities may not be emitted if our load crosses page boundaries. It is up to the
+ * collector to determine when to stop streaming entities.
+ */
+public class CandidateEntityFilter extends AbstractFilter<Candidate, Entity>
+ implements Filter<Candidate, Entity> {
+
+ private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+ private final EntityIndexFactory entityIndexFactory;
+
+
+ @Inject
+ public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final EntityIndexFactory entityIndexFactory ) {
+ this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ this.entityIndexFactory = entityIndexFactory;
+ }
+
+
+ @Override
+ public Observable<FilterResult<Entity>> call(
+ final Observable<FilterResult<Candidate>> candidateResultsObservable ) {
+
+
+ /**
+ * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results
+ * objects
+ */
+
+ final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
+ final EntityCollectionManager entityCollectionManager =
+ entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+
+ final ApplicationEntityIndex applicationIndex =
+ entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+ //buffer them to get a page size we can make 1 network hop
+ final Observable<FilterResult<Entity>> searchIdSetObservable = candidateResultsObservable.buffer( pipelineContext.getLimit() )
+
+ //load them
+ .flatMap( candidateResults -> {
+ //flatten toa list of ids to load
+ final Observable<List<Id>> candidateIds =
+ Observable.from( candidateResults ).map( filterResultCandidate -> filterResultCandidate.getValue().getCandidateResult().getId() ).toList();
+
+ //load the ids
+ final Observable<EntitySet> entitySetObservable =
+ candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) );
+
+ //now we have a collection, validate our canidate set is correct.
+
+ return entitySetObservable.map(
+ entitySet -> new EntityVerifier( applicationIndex.createBatch(), entitySet,
+ candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() )
+ .flatMap(
+ entityCollector -> Observable.from( entityCollector.getResults() ) );
+ } );
+
+ //if we filter all our results, we want to continue to try the next page
+ return searchIdSetObservable;
+ }
+
+
+
+
+ /**
+ * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally
+ */
+ private static final class EntityVerifier {
+
+ private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
+ private List<FilterResult<Entity>> results = new ArrayList<>();
+
+ private final EntityIndexBatch batch;
+ private final List<FilterResult<Candidate>> candidateResults;
+ private final EntitySet entitySet;
+
+
+ public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet,
+ final List<FilterResult<Candidate>> candidateResults ) {
+ this.batch = batch;
+ this.entitySet = entitySet;
+ this.candidateResults = candidateResults;
+ this.results = new ArrayList<>( entitySet.size() );
+ }
+
+
+ /**
+ * Merge our candidates and our entity set into results
+ */
+ public void merge() {
+
+ for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
+ validate( candidateResult );
+ }
+
+ batch.execute();
+ }
+
+
+ public List<FilterResult<Entity>> getResults() {
+ return results;
+ }
+
+
+ public EntityIndexBatch getBatch() {
+ return batch;
+ }
+
+
+ private void validate( final FilterResult<Candidate> filterResult ) {
+
+ final Candidate candidate = filterResult.getValue();
+ final CandidateResult candidateResult = candidate.getCandidateResult();
+ final SearchEdge searchEdge = candidate.getSearchEdge();
+ final Id candidateId = candidateResult.getId();
+ final UUID candidateVersion = candidateResult.getVersion();
+
+
+ final MvccEntity entity = entitySet.getEntity( candidateId );
+
+
+ //doesn't exist warn and drop
+ if ( entity == null ) {
+ logger.warn(
+ "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra."
+ + " Ignoring since this could be a region sync issue",
+ candidateId, candidateVersion );
+
+
+ //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+ return;
+
+ }
+
+
+ final UUID entityVersion = entity.getVersion();
+ final Id entityId = entity.getId();
+
+
+
+
+
+ //entity is newer than ES version, could be an update or the entity is marked as deleted
+ if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) {
+
+ logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
+ new Object[] { searchEdge, entityId, entityVersion } );
+ batch.deindex( searchEdge, entityId, entityVersion );
+ return;
+ }
+
+ //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
+ //remove the ES record, since the read in cass should cause a read repair, just ignore
+ if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
+
+ logger.warn(
+ "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair "
+ + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
+
+ //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+ return;
+ }
+
+ //they're the same add it
+
+ final Entity returnEntity = entity.getEntity().get();
+
+ final Optional<EdgePath> parent = filterResult.getPath();
+
+ final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
+
+ results.add( toReturn );
+ }
+ }
+}