You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/05/04 19:32:52 UTC

[02/12] incubator-usergrid git commit: Massive refactor. Paths for cursor generation are now part of our I/O results. This allows the collector to take until satisfied, then generate a serializable path.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
new file mode 100644
index 0000000..d598a2e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from a set of Ids.
+ *
+ * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification
+ */
+public class EntityLoadFilter extends AbstractFilter<Id, Entity> implements Filter<Id, Entity> {
+
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+
+
+    @Inject
+    public EntityLoadFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+    }
+
+
+    @Override
+    public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
+
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() );
+
+        //it's more efficient to make 1 network hop to get everything, then drop our results if required
+        final Observable<FilterResult<Entity>> entityObservable =
+            filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> {
+
+                    final Observable<EntitySet> entitySetObservable =
+                        Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
+                                  .flatMap( ids -> entityCollectionManager.load( ids ) );
+
+
+                    //now we have a collection, validate our canidate set is correct.
+
+                    return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) )
+                                              .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+                            entityCollector -> Observable.from( entityCollector.getResults() ) );
+                } );
+
+        return entityObservable;
+    }
+
+
+    /**
+     * Our collector to collect entities.  Not quite a true collector, but works within our operational flow as this
+     * state is mutable and difficult to represent functionally
+     */
+    private static final class EntityVerifier {
+
+        private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
+        private List<FilterResult<Entity>> results = new ArrayList<>();
+
+        private final List<FilterResult<Id>> candidateResults;
+        private final EntitySet entitySet;
+
+
+        public EntityVerifier( final EntitySet entitySet, final List<FilterResult<Id>> candidateResults ) {
+            this.entitySet = entitySet;
+            this.candidateResults = candidateResults;
+            this.results = new ArrayList<>( entitySet.size() );
+        }
+
+
+        /**
+         * Merge our candidates and our entity set into results
+         */
+        public void merge() {
+
+            for ( final FilterResult<Id> candidateResult : candidateResults ) {
+                validate( candidateResult );
+            }
+        }
+
+
+        public List<FilterResult<Entity>> getResults() {
+            return results;
+        }
+
+
+        private void validate( final FilterResult<Id> filterResult ) {
+
+            final Id candidateId = filterResult.getValue();
+
+
+            final MvccEntity entity = entitySet.getEntity( candidateId );
+
+
+            //doesn't exist warn and drop
+            if ( entity == null || !entity.getEntity().isPresent() ) {
+                logger.warn( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra."
+                        + "  Ignoring since this could be a region sync issue", candidateId );
+
+
+                //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+                return;
+            }
+
+            //it exists, add it
+
+            final Entity returnEntity = entity.getEntity().get();
+
+            final Optional<EdgePath> parent = filterResult.getPath();
+
+            final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
+
+            results.add( toReturn );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
index 12306fd..7371579 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
@@ -21,8 +21,9 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -42,7 +43,7 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeType
 /**
  * Command for reading graph edges on a connection
  */
-public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, Id, Edge> implements Filter<Id, Id> {
+public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> {
 
     private final GraphManagerFactory graphManagerFactory;
     private final String connectionName;
@@ -61,8 +62,9 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I
     }
 
 
+
     @Override
-    public Observable<Id> call( final Observable<Id> observable ) {
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
 
         //get the graph manager
         final GraphManager graphManager = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
@@ -73,20 +75,18 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I
 
 
         //return all ids that are emitted from this edge
-        return observable.flatMap( id -> {
+        return filterResultObservable.flatMap( idFilterResult -> {
 
               //set our our constant state
             final Optional<Edge> startFromCursor = getSeekValue();
+            final Id id = idFilterResult.getValue();
 
             final SimpleSearchByIdType search =
                 new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
                     entityType, startFromCursor );
 
-            /**
-             * TODO, pass a message with pointers to our cursor values to be generated later
-             */
-            return graphManager.loadEdgesFromSourceByType( search ).doOnNext( edge -> setCursor( edge ) ).map(
-                edge -> edge.getTargetNode() );
+            return graphManager.loadEdgesFromSourceByType( search ).map(
+                edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() ));
         } );
     }
 
@@ -95,4 +95,6 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I
     protected CursorSerializer<Edge> getCursorSerializer() {
         return EdgeCursorSerializer.INSTANCE;
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index f3c2a9c..0260d1d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -26,12 +26,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.persistence.EntityFactory;
 import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -51,13 +49,17 @@ public class ObservableQueryExecutor implements QueryExecutor {
     public Iterator<Results> iterator;
 
 
-    public ObservableQueryExecutor( final Observable<PipelineResult<ResultsPage>> resultsObservable ) {
+    public ObservableQueryExecutor( final Observable<ResultsPage> resultsObservable) {
        //map to our old results objects, return a default empty if required
         this.resultsObservable = resultsObservable.map( resultsPage -> createResults( resultsPage ) ).defaultIfEmpty( new Results() );
     }
 
 
-
+    /**
+     *
+     * @param cpEntity
+     * @return
+     */
     private org.apache.usergrid.persistence.Entity mapEntity( final Entity cpEntity ) {
 
 
@@ -72,9 +74,8 @@ public class ObservableQueryExecutor implements QueryExecutor {
         return entity;
     }
 
-    private Results createResults( final PipelineResult<ResultsPage> pipelineResults ){
+    private Results createResults( final ResultsPage resultsPage ){
 
-        final ResultsPage resultsPage = pipelineResults.getResult();
         final List<Entity> entityList = resultsPage.getEntityList();
         final List<org.apache.usergrid.persistence.Entity> resultsEntities = new ArrayList<>( entityList.size() );
 
@@ -85,10 +86,15 @@ public class ObservableQueryExecutor implements QueryExecutor {
 
         final Results results = Results.fromEntities( resultsEntities );
 
-        if(pipelineResults.getCursor().isPresent()) {
-            results.setCursor( pipelineResults.getCursor().get() );
-        }
 
+        //add the cursor if our limit is the same
+        if(resultsPage.hasMoreResults()) {
+            final Optional<String> cursor = resultsPage.getResponseCursor().encodeAsString();
+
+            if ( cursor.isPresent() ) {
+                results.setCursor( cursor.get() );
+            }
+        }
         return results;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
index 6e15d54..fd65ebf 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
@@ -24,11 +24,11 @@ package org.apache.usergrid.corepersistence.pipeline.cursor;
 
 import org.junit.Test;
 
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticsearchCursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.graph.EdgeCursorSerializer;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
-import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
 
 import com.google.common.base.Optional;
 
@@ -41,7 +41,10 @@ public class CursorTest {
     @Test
     public void testCursors(){
 
-        ResponseCursor responseCursor = new ResponseCursor();
+
+
+
+
 
 
         //test encoding edge
@@ -58,13 +61,18 @@ public class CursorTest {
         final Integer query2 = 20;
 
 
-        responseCursor.setCursor( 0, edge1, EdgeCursorSerializer.INSTANCE );
 
-        responseCursor.setCursor( 1, query1, ElasticsearchCursorSerializer.INSTANCE );
+        final EdgePath<Integer> filter3Path = new EdgePath<>( 3, query2, ElasticsearchCursorSerializer.INSTANCE, Optional.absent() );
+
+        final EdgePath<Edge> filter2Path = new EdgePath<Edge>(2, edge2, EdgeCursorSerializer.INSTANCE, Optional.of( filter3Path ));
+
+        final EdgePath<Integer> filter1Path = new EdgePath<>( 1, query1, ElasticsearchCursorSerializer.INSTANCE, Optional.of(filter2Path) );
+
+        final EdgePath<Edge> filter0Path = new EdgePath<>( 0, edge1, EdgeCursorSerializer.INSTANCE, Optional.of( filter1Path ) );
+
 
-        responseCursor.setCursor(2, edge2, EdgeCursorSerializer.INSTANCE);
 
-        responseCursor.setCursor(3, query2, ElasticsearchCursorSerializer.INSTANCE);
+        ResponseCursor responseCursor = new ResponseCursor( Optional.of(filter0Path) );
 
         final Optional<String> cursor = responseCursor.encodeAsString();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
index 1e63df1..a157e47 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
@@ -42,14 +42,10 @@ public class CandidateResults implements Iterable<CandidateResult> {
 
     private final List<CandidateResult> candidates;
     private final Collection<SelectFieldMapping> getFieldMappings;
-    private final SearchEdge searchEdge;
 
-
-    public CandidateResults( List<CandidateResult> candidates, final Collection<SelectFieldMapping> getFieldMappings,
-                             final SearchEdge searchEdge ) {
+    public CandidateResults( List<CandidateResult> candidates, final Collection<SelectFieldMapping> getFieldMappings) {
         this.candidates = candidates;
         this.getFieldMappings = getFieldMappings;
-        this.searchEdge = searchEdge;
         offset = Optional.absent();
     }
 
@@ -91,11 +87,6 @@ public class CandidateResults implements Iterable<CandidateResult> {
     }
 
 
-    public SearchEdge getSearchEdge() {
-        return searchEdge;
-    }
-
-
     /**
      * Get the candidates
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index 2d4696a..5b67060 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -169,7 +169,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
         }
         failureMonitor.success();
 
-        return parseResults(searchResponse, parsedQuery, searchEdge, limit, offset);
+        return parseResults(searchResponse, parsedQuery, limit, offset);
     }
 
 
@@ -227,7 +227,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
     /**
      * Parse the results and return the canddiate results
      */
-    private CandidateResults parseResults( final SearchResponse searchResponse, final ParsedQuery query, final SearchEdge searchEdge,
+    private CandidateResults parseResults( final SearchResponse searchResponse, final ParsedQuery query,
                                            final int limit, final int from ) {
 
         final SearchHits searchHits = searchResponse.getHits();
@@ -244,8 +244,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
             candidates.add( candidateResult );
         }
 
-        final CandidateResults candidateResults = new CandidateResults( candidates, query.getSelectFieldMappings(),
-            searchEdge );
+        final CandidateResults candidateResults = new CandidateResults( candidates, query.getSelectFieldMappings());
 
         // >= seems odd.  However if we get an overflow, we need to account for it.
         if (  hits.length >= limit ) {