You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/30 20:44:31 UTC

[08/35] incubator-usergrid git commit: Finished refactor of shard slice iterator. Need to refactor cursor generation to be encapsulated within the ScanColumn.

Finished refactor of shard slice iterator.  Need to refactor cursor generation to be encapsulated within the ScanColumn.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f087924d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f087924d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f087924d

Branch: refs/heads/ug2-doc-update
Commit: f087924dd0533de5ce320f815180cf8ae25a8415
Parents: c115744
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Jun 25 17:43:16 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Jun 25 17:43:16 2015 -0600

----------------------------------------------------------------------
 .../query/ir/result/GeoIterator.java            |  17 +-
 .../ir/result/SearchCollectionVisitor.java      |  16 +-
 .../ir/result/SearchConnectionVisitor.java      |  21 ++-
 .../query/ir/result/SliceIterator.java          |  18 +-
 .../ir/result/SliceShardFilterIterator.java     | 165 +++++++++++++++++++
 .../query/ir/result/SliceShardIterator.java     | 118 -------------
 6 files changed, 211 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f087924d/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GeoIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GeoIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GeoIterator.java
index 4ecbb5a..587bb49 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GeoIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GeoIterator.java
@@ -35,6 +35,8 @@ import org.apache.usergrid.persistence.geo.GeoIndexSearcher.SearchResults;
 import org.apache.usergrid.persistence.geo.model.Point;
 import org.apache.usergrid.persistence.query.ir.QuerySlice;
 
+import com.fasterxml.uuid.UUIDComparator;
+
 import static org.apache.usergrid.persistence.cassandra.Serializers.*;
 
 /**
@@ -223,7 +225,7 @@ public class GeoIterator implements ResultIterator {
         LocationScanColumn col = idOrder.get( uuid );
 
         if ( col == null ) {
-            return;
+            throw new IllegalArgumentException( "Could not generate cursor for column because column could not be found" );
         }
 
         final EntityLocationRef location = col.location;
@@ -369,7 +371,18 @@ public class GeoIterator implements ResultIterator {
                 throw new UnsupportedOperationException( "Cannot compare another ScanColumn that is not an instance of LocationScanColumn" );
             }
 
-            return this.location.compareTo( ((LocationScanColumn)o).location );
+            //sort by location (closest first) if that's the same compare uuids
+            final int locationCompare = this.location.compareTo( ( ( LocationScanColumn ) o ).location ) ;
+
+            if(locationCompare == 0) {
+                //same distance, return compare by uuid
+                final int uuidCompare = UUIDComparator.staticCompare( getUUID(), o.getUUID() );
+
+
+                return uuidCompare;
+            }
+
+            return locationCompare;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f087924d/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
index c20baf4..a3fe116 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
@@ -69,7 +69,7 @@ public class SearchCollectionVisitor extends SearchVisitor {
         // change the hash value of the slice
         queryProcessor.applyCursorAndSort( slice );
 
-        IndexScanner columns = null;
+        IndexScanner columns;
 
         // nothing left to search for this range
         if ( slice.isComplete() ) {
@@ -101,9 +101,8 @@ public class SearchCollectionVisitor extends SearchVisitor {
 
 
         IndexScanner indexScanner = cassandraService
-                .getIdList(
-                        key( headEntity.getUuid(), DICTIONARY_COLLECTIONS, collectionName ), startId, null,
-                        queryProcessor.getPageSizeHint( node ), query.isReversed(), bucket,  applicationId,
+                .getIdList( key( headEntity.getUuid(), DICTIONARY_COLLECTIONS, collectionName ), startId, null,
+                        queryProcessor.getPageSizeHint( node ), query.isReversed(), bucket, applicationId,
                         node.isForceKeepFirst() );
 
         this.results.push( new SliceIterator( slice, indexScanner, UUID_PARSER ) );
@@ -123,12 +122,17 @@ public class SearchCollectionVisitor extends SearchVisitor {
 
         queryProcessor.applyCursorAndSort( slice );
 
+        final int size = queryProcessor.getPageSizeHint( node );
+
         GeoIterator itr = new GeoIterator(
                 new CollectionGeoSearch( em, indexBucketLocator, cassandraService, headEntity, collection.getName() ),
-                query.getLimit(), slice, node.getPropertyName(), new Point( node.getLattitude(), node.getLongitude() ),
+                size, slice, node.getPropertyName(), new Point( node.getLattitude(), node.getLongitude() ),
                 node.getDistance() );
 
-        results.push( itr );
+
+        final SliceShardFilterIterator.ShardBucketValidator validator = new SliceShardFilterIterator.ShardBucketValidator(indexBucketLocator, bucket, applicationId, IndexBucketLocator.IndexType.COLLECTION, collection.getName() );
+
+        this.results.push(  new SliceShardFilterIterator( validator, itr, size));
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f087924d/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
index ff08245..7e7ddf6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
@@ -17,6 +17,7 @@ import org.apache.usergrid.persistence.cassandra.index.DynamicCompositeStartToBy
 import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
 import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
 import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
+import org.apache.usergrid.persistence.geo.CollectionGeoSearch;
 import org.apache.usergrid.persistence.geo.ConnectionGeoSearch;
 import org.apache.usergrid.persistence.geo.model.Point;
 import org.apache.usergrid.persistence.query.ir.AllNode;
@@ -109,12 +110,20 @@ public class SearchConnectionVisitor extends SearchVisitor {
 
         queryProcessor.applyCursorAndSort( slice );
 
+        final int size = queryProcessor.getPageSizeHint( node );
+
+
+
         GeoIterator itr =
                 new GeoIterator( new ConnectionGeoSearch( em, indexBucketLocator, cassandraService, connection.getIndexId() ),
-                        query.getLimit(), slice, node.getPropertyName(),
+                        size, slice, node.getPropertyName(),
                         new Point( node.getLattitude(), node.getLongitude() ), node.getDistance() );
 
-        results.push( itr );
+        final SliceShardFilterIterator.ShardBucketValidator validator = new SliceShardFilterIterator.ShardBucketValidator(indexBucketLocator, bucket, applicationId, IndexBucketLocator.IndexType.CONNECTION, connection.getSearchIndexName() );
+
+
+        this.results.push( new SliceShardFilterIterator( validator, itr, size ) );
+
     }
 
 
@@ -183,9 +192,13 @@ public class SearchConnectionVisitor extends SearchVisitor {
                         start, slice.isReversed(), size, skipFirst );
 
         //we have to create our wrapper so validate the data we read is correct for our shard
-        final SliceShardIterator.ShardBucketValidator validator = new SliceShardIterator.ShardBucketValidator(indexBucketLocator, bucket, applicationId, IndexBucketLocator.IndexType.CONNECTION, "" );
+        final SliceShardFilterIterator.ShardBucketValidator validator = new SliceShardFilterIterator.ShardBucketValidator(indexBucketLocator, bucket, applicationId, IndexBucketLocator.IndexType.CONNECTION, "" );
+
+
+        final SliceIterator sliceIterator = new SliceIterator( slice, connectionScanner, connectionParser );
+
 
-        this.results.push(  new SliceShardIterator( validator, slice, connectionScanner, connectionParser ));
+        this.results.push(  new SliceShardFilterIterator( validator, sliceIterator, size));
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f087924d/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
index 4f5bff5..3f9e86c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
@@ -130,7 +130,10 @@ public class SliceIterator implements ResultIterator {
 
             final HColumn<ByteBuffer, ByteBuffer> column = results.next();
 
-            final ScanColumn parsed = parse(column);
+            final ByteBuffer colName = column.getName().duplicate();
+
+            final ScanColumn parsed = parser.parse( colName, isReversed );
+
 
             //skip this value, the parser has discarded it
             if ( parsed == null ) {
@@ -151,19 +154,6 @@ public class SliceIterator implements ResultIterator {
     }
 
 
-    /**
-     * Parses the column.  If the column should be discarded, null should be returned
-     * @param column
-     * @return
-     */
-    protected ScanColumn parse( HColumn<ByteBuffer, ByteBuffer> column){
-
-        final ByteBuffer colName = column.getName().duplicate();
-
-        final ScanColumn parsed = parser.parse( colName, isReversed );
-
-        return parsed;
-    }
 
 
     /*

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f087924d/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardFilterIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardFilterIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardFilterIterator.java
new file mode 100644
index 0000000..fb0d295
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardFilterIterator.java
@@ -0,0 +1,165 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.usergrid.persistence.query.ir.result;
+
+
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.cassandra.CursorCache;
+
+
+/**
+ * An iterator that will check if the parsed column is part of this shard.  This is required due to a legacy storage
+ * format in both connection pointers, as well as geo points.
+ *
+ * Some formats are not sharded by target entity, as a result, we get data partition mismatches when performing
+ * intersections and seeks.  This is meant to discard target entities that are not part of the current shard
+ *
+ * @author tnine
+ */
+public class SliceShardFilterIterator implements ResultIterator {
+
+    private static final Logger logger = LoggerFactory.getLogger( SliceShardFilterIterator.class );
+
+    private final ShardBucketValidator shardBucketValidator;
+    private final ResultIterator resultsIterator;
+    private final int pageSize;
+
+    private Set<ScanColumn> current;
+
+
+    /**
+     * @param shardBucketValidator The validator to use when validating results belong to a shard
+     * @param resultsIterator The iterator to filter results from
+     * @param pageSize
+     */
+    public SliceShardFilterIterator( final ShardBucketValidator shardBucketValidator,
+                                     final ResultIterator resultsIterator, final int pageSize ) {
+        this.shardBucketValidator = shardBucketValidator;
+        this.resultsIterator = resultsIterator;
+        this.pageSize = pageSize;
+    }
+
+
+
+    @Override
+    public void reset() {
+        current = null;
+        resultsIterator.reset();
+    }
+
+
+    @Override
+    public void finalizeCursor( final CursorCache cache, final UUID lastValue ) {
+        resultsIterator.finalizeCursor( cache, lastValue );
+    }
+
+
+    @Override
+    public Iterator<Set<ScanColumn>> iterator() {
+        return this;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        if(current == null){
+            advance();
+        }
+
+        return current != null && current.size() > 0;
+    }
+
+
+    @Override
+    public Set<ScanColumn> next() {
+
+        final Set<ScanColumn> toReturn = current;
+
+        current = null;
+
+        return toReturn;
+    }
+
+
+    /**
+     * Advance the column pointers
+     */
+    private void advance(){
+
+        final Set<ScanColumn> results = new LinkedHashSet<ScanColumn>(  );
+
+        while(resultsIterator.hasNext()){
+
+            final Iterator<ScanColumn> scanColumns = resultsIterator.next().iterator();
+
+
+            while(results.size() < pageSize && scanColumns.hasNext()){
+                final ScanColumn scanColumn = scanColumns.next();
+
+                if(shardBucketValidator.isInShard( scanColumn.getUUID() )){
+                   results.add( scanColumn );
+                }
+            }
+        }
+
+        current = results;
+
+
+    }
+
+
+
+    /**
+     * Class that performs validation on an entity to ensure it's in the shard we expecte
+     */
+    public static final class ShardBucketValidator {
+        private final IndexBucketLocator indexBucketLocator;
+        private final String expectedBucket;
+        private final UUID applicationId;
+        private final IndexBucketLocator.IndexType type;
+        private final String[] components;
+
+
+        public ShardBucketValidator( final IndexBucketLocator indexBucketLocator, final String expectedBucket,
+                                     final UUID applicationId, final IndexBucketLocator.IndexType type,
+                                     final String... components ) {
+            this.indexBucketLocator = indexBucketLocator;
+            this.expectedBucket = expectedBucket;
+            this.applicationId = applicationId;
+            this.type = type;
+            this.components = components;
+        }
+
+
+        public boolean isInShard( final UUID entityId ) {
+            //not for our current processing shard, discard
+            final String shard = indexBucketLocator.getBucket( applicationId, type, entityId, components );
+
+            return expectedBucket.equals( shard );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f087924d/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardIterator.java
deleted file mode 100644
index 279ba2c..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardIterator.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *      http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.usergrid.persistence.query.ir.result;
-
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.IndexBucketLocator;
-import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
-import org.apache.usergrid.persistence.query.ir.QuerySlice;
-
-import me.prettyprint.hector.api.beans.HColumn;
-
-
-/**
- * An iterator that will check if the parsed column is part of this shard.  This is required due to a legacy storage
- * format
- *
- * Connections are not sharded by target entity, as a result, we get data partition mismatches when performing
- * intersections and seeks.  This is meant to discard target entities that are not part of the current shard
- *
- * @author tnine
- */
-public class SliceShardIterator extends SliceIterator {
-
-    private static final Logger logger = LoggerFactory.getLogger( SliceShardIterator.class );
-
-    private final ShardBucketValidator shardBucketValidator;
-
-
-    /**
-     * @param slice The slice used in the scanner
-     * @param scanner The scanner to use to read the cols
-     * @param parser The parser for the scanner results
-     */
-    public SliceShardIterator( final ShardBucketValidator shardBucketValidator, final QuerySlice slice,
-                               final IndexScanner scanner, final SliceParser parser ) {
-        super( slice, scanner, parser );
-
-        this.shardBucketValidator = shardBucketValidator;
-    }
-
-
-    /**
-     * Parses the column.  If the column should be discarded, null should be returned
-     */
-    protected ScanColumn parse( HColumn<ByteBuffer, ByteBuffer> column ) {
-
-        final ByteBuffer colName = column.getName().duplicate();
-
-        final ScanColumn parsed = parser.parse( colName, isReversed );
-
-        if(parsed == null){
-            return null;
-        }
-
-        final UUID entityId = parsed.getUUID();
-
-
-        //not for our current processing shard, discard
-        if(!shardBucketValidator.isInShard( entityId )){
-            return null;
-        }
-
-        return parsed;
-    }
-
-
-    /**
-     * Class that performs validation on an entity to ensure it's in the shard we expecte
-     */
-    public static final class ShardBucketValidator {
-        private final IndexBucketLocator indexBucketLocator;
-        private final String expectedBucket;
-        private final UUID applicationId;
-        private final IndexBucketLocator.IndexType type;
-        private final String[] components;
-
-
-        public ShardBucketValidator( final IndexBucketLocator indexBucketLocator, final String expectedBucket,
-                                     final UUID applicationId, final IndexBucketLocator.IndexType type,
-                                     final String... components ) {
-            this.indexBucketLocator = indexBucketLocator;
-            this.expectedBucket = expectedBucket;
-            this.applicationId = applicationId;
-            this.type = type;
-            this.components = components;
-        }
-
-
-        public boolean isInShard( final UUID entityId ) {
-            //not for our current processing shard, discard
-            final String shard = indexBucketLocator.getBucket( applicationId, type, entityId, components );
-
-            return expectedBucket.equals( shard );
-        }
-    }
-}