You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/05/04 19:32:52 UTC
[02/12] incubator-usergrid git commit: Massive refactor. Paths for
cursor generation are now part of our I/O results. This allows the collector
to take until satisfied, then generate a serializable path.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
new file mode 100644
index 0000000..d598a2e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from a set of Ids.
+ *
+ * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification
+ */
+public class EntityLoadFilter extends AbstractFilter<Id, Entity> implements Filter<Id, Entity> {
+
+ private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+
+
+ @Inject
+ public EntityLoadFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
+ this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ }
+
+
+ @Override
+ public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
+
+
+ final EntityCollectionManager entityCollectionManager =
+ entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() );
+
+ //it's more efficient to make 1 network hop to get everything, then drop our results if required
+ final Observable<FilterResult<Entity>> entityObservable =
+ filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> {
+
+ final Observable<EntitySet> entitySetObservable =
+ Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
+ .flatMap( ids -> entityCollectionManager.load( ids ) );
+
+
+ //now we have a collection, validate our canidate set is correct.
+
+ return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) )
+ .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+ entityCollector -> Observable.from( entityCollector.getResults() ) );
+ } );
+
+ return entityObservable;
+ }
+
+
+ /**
+ * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this
+ * state is mutable and difficult to represent functionally
+ */
+ private static final class EntityVerifier {
+
+ private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
+ private List<FilterResult<Entity>> results = new ArrayList<>();
+
+ private final List<FilterResult<Id>> candidateResults;
+ private final EntitySet entitySet;
+
+
+ public EntityVerifier( final EntitySet entitySet, final List<FilterResult<Id>> candidateResults ) {
+ this.entitySet = entitySet;
+ this.candidateResults = candidateResults;
+ this.results = new ArrayList<>( entitySet.size() );
+ }
+
+
+ /**
+ * Merge our candidates and our entity set into results
+ */
+ public void merge() {
+
+ for ( final FilterResult<Id> candidateResult : candidateResults ) {
+ validate( candidateResult );
+ }
+ }
+
+
+ public List<FilterResult<Entity>> getResults() {
+ return results;
+ }
+
+
+ private void validate( final FilterResult<Id> filterResult ) {
+
+ final Id candidateId = filterResult.getValue();
+
+
+ final MvccEntity entity = entitySet.getEntity( candidateId );
+
+
+ //doesn't exist warn and drop
+ if ( entity == null || !entity.getEntity().isPresent() ) {
+ logger.warn( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra."
+ + " Ignoring since this could be a region sync issue", candidateId );
+
+
+ //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+ return;
+ }
+
+ //it exists, add it
+
+ final Entity returnEntity = entity.getEntity().get();
+
+ final Optional<EdgePath> parent = filterResult.getPath();
+
+ final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
+
+ results.add( toReturn );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
index 12306fd..7371579 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
@@ -21,8 +21,9 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph;
import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -42,7 +43,7 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeType
/**
* Command for reading graph edges on a connection
*/
-public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, Id, Edge> implements Filter<Id, Id> {
+public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> {
private final GraphManagerFactory graphManagerFactory;
private final String connectionName;
@@ -61,8 +62,9 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I
}
+
@Override
- public Observable<Id> call( final Observable<Id> observable ) {
+ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
//get the graph manager
final GraphManager graphManager = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
@@ -73,20 +75,18 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I
//return all ids that are emitted from this edge
- return observable.flatMap( id -> {
+ return filterResultObservable.flatMap( idFilterResult -> {
//set our our constant state
final Optional<Edge> startFromCursor = getSeekValue();
+ final Id id = idFilterResult.getValue();
final SimpleSearchByIdType search =
new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
entityType, startFromCursor );
- /**
- * TODO, pass a message with pointers to our cursor values to be generated later
- */
- return graphManager.loadEdgesFromSourceByType( search ).doOnNext( edge -> setCursor( edge ) ).map(
- edge -> edge.getTargetNode() );
+ return graphManager.loadEdgesFromSourceByType( search ).map(
+ edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() ));
} );
}
@@ -95,4 +95,6 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I
protected CursorSerializer<Edge> getCursorSerializer() {
return EdgeCursorSerializer.INSTANCE;
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index f3c2a9c..0260d1d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -26,12 +26,10 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
import org.apache.usergrid.persistence.EntityFactory;
import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -51,13 +49,17 @@ public class ObservableQueryExecutor implements QueryExecutor {
public Iterator<Results> iterator;
- public ObservableQueryExecutor( final Observable<PipelineResult<ResultsPage>> resultsObservable ) {
+ public ObservableQueryExecutor( final Observable<ResultsPage> resultsObservable) {
//map to our old results objects, return a default empty if required
this.resultsObservable = resultsObservable.map( resultsPage -> createResults( resultsPage ) ).defaultIfEmpty( new Results() );
}
-
+ /**
+ *
+ * @param cpEntity
+ * @return
+ */
private org.apache.usergrid.persistence.Entity mapEntity( final Entity cpEntity ) {
@@ -72,9 +74,8 @@ public class ObservableQueryExecutor implements QueryExecutor {
return entity;
}
- private Results createResults( final PipelineResult<ResultsPage> pipelineResults ){
+ private Results createResults( final ResultsPage resultsPage ){
- final ResultsPage resultsPage = pipelineResults.getResult();
final List<Entity> entityList = resultsPage.getEntityList();
final List<org.apache.usergrid.persistence.Entity> resultsEntities = new ArrayList<>( entityList.size() );
@@ -85,10 +86,15 @@ public class ObservableQueryExecutor implements QueryExecutor {
final Results results = Results.fromEntities( resultsEntities );
- if(pipelineResults.getCursor().isPresent()) {
- results.setCursor( pipelineResults.getCursor().get() );
- }
+ //add the cursor if our limit is the same
+ if(resultsPage.hasMoreResults()) {
+ final Optional<String> cursor = resultsPage.getResponseCursor().encodeAsString();
+
+ if ( cursor.isPresent() ) {
+ results.setCursor( cursor.get() );
+ }
+ }
return results;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
index 6e15d54..fd65ebf 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
@@ -24,11 +24,11 @@ package org.apache.usergrid.corepersistence.pipeline.cursor;
import org.junit.Test;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticsearchCursorSerializer;
import org.apache.usergrid.corepersistence.pipeline.read.graph.EdgeCursorSerializer;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
-import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
import com.google.common.base.Optional;
@@ -41,7 +41,10 @@ public class CursorTest {
@Test
public void testCursors(){
- ResponseCursor responseCursor = new ResponseCursor();
+
+
+
+
//test encoding edge
@@ -58,13 +61,18 @@ public class CursorTest {
final Integer query2 = 20;
- responseCursor.setCursor( 0, edge1, EdgeCursorSerializer.INSTANCE );
- responseCursor.setCursor( 1, query1, ElasticsearchCursorSerializer.INSTANCE );
+ final EdgePath<Integer> filter3Path = new EdgePath<>( 3, query2, ElasticsearchCursorSerializer.INSTANCE, Optional.absent() );
+
+ final EdgePath<Edge> filter2Path = new EdgePath<Edge>(2, edge2, EdgeCursorSerializer.INSTANCE, Optional.of( filter3Path ));
+
+ final EdgePath<Integer> filter1Path = new EdgePath<>( 1, query1, ElasticsearchCursorSerializer.INSTANCE, Optional.of(filter2Path) );
+
+ final EdgePath<Edge> filter0Path = new EdgePath<>( 0, edge1, EdgeCursorSerializer.INSTANCE, Optional.of( filter1Path ) );
+
- responseCursor.setCursor(2, edge2, EdgeCursorSerializer.INSTANCE);
- responseCursor.setCursor(3, query2, ElasticsearchCursorSerializer.INSTANCE);
+ ResponseCursor responseCursor = new ResponseCursor( Optional.of(filter0Path) );
final Optional<String> cursor = responseCursor.encodeAsString();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
index 1e63df1..a157e47 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
@@ -42,14 +42,10 @@ public class CandidateResults implements Iterable<CandidateResult> {
private final List<CandidateResult> candidates;
private final Collection<SelectFieldMapping> getFieldMappings;
- private final SearchEdge searchEdge;
-
- public CandidateResults( List<CandidateResult> candidates, final Collection<SelectFieldMapping> getFieldMappings,
- final SearchEdge searchEdge ) {
+ public CandidateResults( List<CandidateResult> candidates, final Collection<SelectFieldMapping> getFieldMappings) {
this.candidates = candidates;
this.getFieldMappings = getFieldMappings;
- this.searchEdge = searchEdge;
offset = Optional.absent();
}
@@ -91,11 +87,6 @@ public class CandidateResults implements Iterable<CandidateResult> {
}
- public SearchEdge getSearchEdge() {
- return searchEdge;
- }
-
-
/**
* Get the candidates
* @return
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index 2d4696a..5b67060 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -169,7 +169,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
}
failureMonitor.success();
- return parseResults(searchResponse, parsedQuery, searchEdge, limit, offset);
+ return parseResults(searchResponse, parsedQuery, limit, offset);
}
@@ -227,7 +227,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
/**
* Parse the results and return the canddiate results
*/
- private CandidateResults parseResults( final SearchResponse searchResponse, final ParsedQuery query, final SearchEdge searchEdge,
+ private CandidateResults parseResults( final SearchResponse searchResponse, final ParsedQuery query,
final int limit, final int from ) {
final SearchHits searchHits = searchResponse.getHits();
@@ -244,8 +244,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
candidates.add( candidateResult );
}
- final CandidateResults candidateResults = new CandidateResults( candidates, query.getSelectFieldMappings(),
- searchEdge );
+ final CandidateResults candidateResults = new CandidateResults( candidates, query.getSelectFieldMappings());
// >= seems odd. However if we get an overflow, we need to account for it.
if ( hits.length >= limit ) {