You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/30 19:19:45 UTC

[07/25] incubator-usergrid git commit: Wraps connection all scanners with shard filters to ensure we only process entities relevant to our shards

Wraps connection all scanners with shard filters to ensure we only process entities relevant to our shards


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/aa794884
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/aa794884
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/aa794884

Branch: refs/heads/master
Commit: aa79488419bbd1c6a3ff3bfb8d187fb30bd9e046
Parents: 68c7eae
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Jun 24 13:20:22 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Jun 25 11:14:52 2015 -0600

----------------------------------------------------------------------
 .../cassandra/RelationManagerImpl.java          |  12 +-
 .../cassandra/index/ConnectedIndexScanner.java  |  11 +-
 .../persistence/query/ir/SearchVisitor.java     |   2 +-
 .../result/ConnectionSearchVisitorFactory.java  |   2 +-
 .../query/ir/result/GatherIterator.java         |   6 -
 .../ir/result/SearchConnectionVisitor.java      |  10 +-
 .../query/ir/result/SliceIterator.java          |  25 +++-
 .../query/ir/result/SliceShardIterator.java     | 118 +++++++++++++++++++
 .../query/ir/result/UnionIterator.java          |  94 ++++++++++++---
 9 files changed, 236 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa794884/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
index 977dad0..ebe2aec 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
@@ -1910,10 +1910,6 @@ public class RelationManagerImpl implements RelationManager {
 
         ConnectionRefImpl connectionRef =
                 new ConnectionRefImpl( sourceEntity, connectionType, new SimpleEntityRef( connectedEntityType, null ) );
-        //        EntityRef connectedEntity) {
-        //    ConnectionRefImpl connectionRef = new ConnectionRefImpl(new ConnectedEntityRefImpl(connectionType,
-        // connectedEntityType, null, true), sourceEntity );
-
 
         final ConnectionResultsLoaderFactory factory = new ConnectionResultsLoaderFactory( connectionRef );
 
@@ -1921,8 +1917,6 @@ public class RelationManagerImpl implements RelationManager {
 
         ConnectionSearchVisitorFactory collectionSearchVisitorFactory = new ConnectionSearchVisitorFactory( cass, indexBucketLocator, qp, applicationId, headEntity, connectionRef, true, "" );
 
-//        SearchConnectionVisitor visitor = new SearchConnectionVisitor( this, qp, connectionRef, true );
-
         return qp.getResults( collectionSearchVisitorFactory );
     }
 
@@ -1967,8 +1961,6 @@ public class RelationManagerImpl implements RelationManager {
 
         ConnectionSearchVisitorFactory collectionSearchVisitorFactory = new ConnectionSearchVisitorFactory( cass, indexBucketLocator, qp, applicationId, headEntity, connectionRef, false, "" );
 
-//        SearchConnectionVisitor visitor = new SearchConnectionVisitor( this, qp, connectionRef, false );
-
         return qp.getResults( collectionSearchVisitorFactory );
 	}
 
@@ -2007,9 +1999,7 @@ public class RelationManagerImpl implements RelationManager {
 
         QueryProcessor qp = new QueryProcessor( query, null, em, factory );
 
-        ConnectionSearchVisitorFactory collectionSearchVisitorFactory = new ConnectionSearchVisitorFactory( cass, indexBucketLocator, qp, applicationId, headEntity, connectionRef, false, "" );
-
-//        SearchConnectionVisitor visitor = new SearchConnectionVisitor( this, qp, connectionRef, true );
+        ConnectionSearchVisitorFactory collectionSearchVisitorFactory = new ConnectionSearchVisitorFactory( cass, indexBucketLocator, qp, applicationId, headEntity, connectionRef, true, "" );
 
         return qp.getResults( collectionSearchVisitorFactory );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa794884/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
index 6c6152a..4b1a049 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
@@ -18,6 +18,7 @@ package org.apache.usergrid.persistence.cassandra.index;
 
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -134,14 +135,16 @@ public class ConnectedIndexScanner implements IndexScanner {
         //go through each connection type until we exhaust the result sets
         while ( currentConnectionType != null ) {
 
+            final int lastResultsSize = lastResults == null? 0: lastResults.size();
+
             //only load a delta size to get this next page
-            int selectSize = totalSelectSize - lastResults.size();
+            final int selectSize = totalSelectSize - lastResultsSize;
 
 
-            Object key = key( entityId, dictionaryType, currentConnectionType );
+            final Object key = key( entityId, dictionaryType, currentConnectionType );
 
 
-            List<HColumn<ByteBuffer, ByteBuffer>> results =
+            final List<HColumn<ByteBuffer, ByteBuffer>> results =
                     cass.getColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_COMPOSITE_DICTIONARIES, key,
                             start, null, selectSize, reversed );
 
@@ -180,7 +183,7 @@ public class ConnectedIndexScanner implements IndexScanner {
 
         if ( hasMore && lastResults !=null && lastResults.size() > 0 ) {
             // set the bytebuffer for the next pass
-            lastResults.remove( lastResults.size() - 1 );
+            start = lastResults.remove( lastResults.size() - 1 ).getName();
         }
 
         return lastResults != null && lastResults.size() > 0;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa794884/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
index fc2df4d..271fc67 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
@@ -160,7 +160,7 @@ public abstract class SearchVisitor implements NodeVisitor {
 
         final int nodeId = node.getId();
 
-        UnionIterator union = new UnionIterator( queryProcessor.getPageSizeHint( node ), nodeId );
+        UnionIterator union = new UnionIterator( queryProcessor.getPageSizeHint( node ), nodeId, queryProcessor.getCursorCache( nodeId ) );
 
         if ( left != null ) {
             union.addIterator( left );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa794884/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
index 54eeb01..6a340b9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
@@ -41,7 +41,7 @@ public class ConnectionSearchVisitorFactory implements SearchVisitorFactory {
     private final boolean outgoing;
     private final String[] prefix;
 
-
+                   //cass, indexBucketLocator, qp, applicationId, headEntity, connectionRef, false, ""
     public ConnectionSearchVisitorFactory( final CassandraService cassandraService,
                                             final IndexBucketLocator indexBucketLocator,
                                             final QueryProcessor queryProcessor, final UUID applicationId,

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa794884/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
index 5bc91e0..eaae6b2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
@@ -171,12 +171,6 @@ public class GatherIterator implements ResultIterator {
             results.add(next);
             cursorMap.put( next.getUUID(), iterator );
 
-//            //results are too large, trim them
-//            if(results.size() > pageSize){
-//               final ScanColumn toRemove =  results.pollLast();
-//                cursorMap.remove( toRemove.getUUID() );
-//            }
-
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa794884/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
index 6f833db..ff08245 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
@@ -53,7 +53,7 @@ public class SearchConnectionVisitor extends SearchVisitor {
      * @param queryProcessor They query processor to use
      * @param applicationId
      * @param connection The connection refernce
-     * @param outgoing The direction to search.  True if we should search from source->target edges.  False if we
+     * @param outgoing The direction to search.  True if we should search from source->target edges.  False if we are searching target<-source
      */
     public SearchConnectionVisitor( final CassandraService cassandraService,
                                     final IndexBucketLocator indexBucketLocator, final QueryProcessor queryProcessor,
@@ -120,6 +120,9 @@ public class SearchConnectionVisitor extends SearchVisitor {
 
     @Override
     public void visit( AllNode node ) throws Exception {
+
+        //todo, use a cache for this...
+
         QuerySlice slice = node.getSlice();
 
         queryProcessor.applyCursorAndSort( slice );
@@ -179,7 +182,10 @@ public class SearchConnectionVisitor extends SearchVisitor {
                 new ConnectedIndexScanner( cassandraService, dictionaryType, applicationId, entityIdToUse, connectionTypes,
                         start, slice.isReversed(), size, skipFirst );
 
-        this.results.push( new SliceIterator( slice, connectionScanner, connectionParser ) );
+        //we have to create our wrapper so validate the data we read is correct for our shard
+        final SliceShardIterator.ShardBucketValidator validator = new SliceShardIterator.ShardBucketValidator(indexBucketLocator, bucket, applicationId, IndexBucketLocator.IndexType.CONNECTION, "" );
+
+        this.results.push(  new SliceShardIterator( validator, slice, connectionScanner, connectionParser ));
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa794884/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
index a386159..4f5bff5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
@@ -46,10 +46,10 @@ public class SliceIterator implements ResultIterator {
 
     private final LinkedHashMap<UUID, ScanColumn> cols;
     private final QuerySlice slice;
-    private final SliceParser parser;
-    private final IndexScanner scanner;
+    protected final SliceParser parser;
+    protected final IndexScanner scanner;
     private final int pageSize;
-    private final boolean isReversed;
+    protected final boolean isReversed;
 
 
     /**
@@ -128,9 +128,9 @@ public class SliceIterator implements ResultIterator {
 
         while ( results.hasNext() ) {
 
-            final ByteBuffer colName = results.next().getName().duplicate();
+            final HColumn<ByteBuffer, ByteBuffer> column = results.next();
 
-            final ScanColumn parsed = parser.parse( colName, isReversed );
+            final ScanColumn parsed = parse(column);
 
             //skip this value, the parser has discarded it
             if ( parsed == null ) {
@@ -151,6 +151,21 @@ public class SliceIterator implements ResultIterator {
     }
 
 
+    /**
+     * Parses the column.  If the column should be discarded, null should be returned
+     * @param column
+     * @return
+     */
+    protected ScanColumn parse( HColumn<ByteBuffer, ByteBuffer> column){
+
+        final ByteBuffer colName = column.getName().duplicate();
+
+        final ScanColumn parsed = parser.parse( colName, isReversed );
+
+        return parsed;
+    }
+
+
     /*
      * (non-Javadoc)
      *

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa794884/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardIterator.java
new file mode 100644
index 0000000..279ba2c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardIterator.java
@@ -0,0 +1,118 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.usergrid.persistence.query.ir.result;
+
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
+import org.apache.usergrid.persistence.query.ir.QuerySlice;
+
+import me.prettyprint.hector.api.beans.HColumn;
+
+
+/**
+ * An iterator that will check if the parsed column is part of this shard.  This is required due to a legacy storage
+ * format
+ *
+ * Connections are not sharded by target entity, as a result, we get data partition mismatches when performing
+ * intersections and seeks.  This is meant to discard target entities that are not part of the current shard
+ *
+ * @author tnine
+ */
+public class SliceShardIterator extends SliceIterator {
+
+    private static final Logger logger = LoggerFactory.getLogger( SliceShardIterator.class );
+
+    private final ShardBucketValidator shardBucketValidator;
+
+
+    /**
+     * @param slice The slice used in the scanner
+     * @param scanner The scanner to use to read the cols
+     * @param parser The parser for the scanner results
+     */
+    public SliceShardIterator( final ShardBucketValidator shardBucketValidator, final QuerySlice slice,
+                               final IndexScanner scanner, final SliceParser parser ) {
+        super( slice, scanner, parser );
+
+        this.shardBucketValidator = shardBucketValidator;
+    }
+
+
+    /**
+     * Parses the column.  If the column should be discarded, null should be returned
+     */
+    protected ScanColumn parse( HColumn<ByteBuffer, ByteBuffer> column ) {
+
+        final ByteBuffer colName = column.getName().duplicate();
+
+        final ScanColumn parsed = parser.parse( colName, isReversed );
+
+        if(parsed == null){
+            return null;
+        }
+
+        final UUID entityId = parsed.getUUID();
+
+
+        //not for our current processing shard, discard
+        if(!shardBucketValidator.isInShard( entityId )){
+            return null;
+        }
+
+        return parsed;
+    }
+
+
+    /**
+     * Class that performs validation on an entity to ensure it's in the shard we expecte
+     */
+    public static final class ShardBucketValidator {
+        private final IndexBucketLocator indexBucketLocator;
+        private final String expectedBucket;
+        private final UUID applicationId;
+        private final IndexBucketLocator.IndexType type;
+        private final String[] components;
+
+
+        public ShardBucketValidator( final IndexBucketLocator indexBucketLocator, final String expectedBucket,
+                                     final UUID applicationId, final IndexBucketLocator.IndexType type,
+                                     final String... components ) {
+            this.indexBucketLocator = indexBucketLocator;
+            this.expectedBucket = expectedBucket;
+            this.applicationId = applicationId;
+            this.type = type;
+            this.components = components;
+        }
+
+
+        public boolean isInShard( final UUID entityId ) {
+            //not for our current processing shard, discard
+            final String shard = indexBucketLocator.getBucket( applicationId, type, entityId, components );
+
+            return expectedBucket.equals( shard );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa794884/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
index aca9852..b6e44d8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
@@ -28,6 +28,8 @@ import java.util.UUID;
 
 import org.apache.usergrid.persistence.cassandra.CursorCache;
 
+import com.fasterxml.uuid.UUIDComparator;
+
 import static org.apache.usergrid.persistence.cassandra.Serializers.*;
 
 /**
@@ -131,19 +133,19 @@ public class UnionIterator extends MultiIterator {
 
         private final int maxSize;
 
-        private final List<ScanColumn> list;
+        private final List<UnionScanColumn> list;
 
-//        private UUIDColumn min;
+        private UnionScanColumn min;
 
 
         public SortedColumnList( final int maxSize, final UUID minUuid ) {
             //we need to allocate the extra space if required
-            this.list = new ArrayList<ScanColumn>( maxSize );
+            this.list = new ArrayList<UnionScanColumn>( maxSize );
             this.maxSize = maxSize;
 
-//            if ( minUuid != null ) {
-//                min = new UUIDColumn( minUuid, 1 ) ;
-//            }
+            if ( minUuid != null ) {
+                min = new UnionScanColumn(new UUIDColumn( minUuid, 1 )) ;
+            }
         }
 
 
@@ -151,12 +153,15 @@ public class UnionIterator extends MultiIterator {
          * Add the column to this list
          */
         public void add( ScanColumn col ) {
+
+            final UnionScanColumn unionScanColumn = new UnionScanColumn(col);
+
             //less than our min, don't add
-//            if ( min != null && min.compareTo( col ) >= 0 ) {
-//                return;
-//            }
+            if ( min != null && min.compareTo( unionScanColumn ) >= 0 ) {
+                return;
+            }
 
-            int index = Collections.binarySearch( this.list, col );
+            int index = Collections.binarySearch( this.list, unionScanColumn );
 
             //already present
             if ( index > -1 ) {
@@ -170,7 +175,7 @@ public class UnionIterator extends MultiIterator {
                 return;
             }
 
-            this.list.add( index, col );
+            this.list.add( index, unionScanColumn );
 
             final int size = this.list.size();
 
@@ -214,8 +219,7 @@ public class UnionIterator extends MultiIterator {
                 return;
             }
 
-            final UUID oldMin = this.list.get( size - 1 ).getUUID();
-//            min = new UUIDColumn( oldMin, 1 );
+            min = this.list.get( size - 1 );
         }
 
 
@@ -228,7 +232,69 @@ public class UnionIterator extends MultiIterator {
 
         public void reset(){
             clear();
-//            this.min = null;
+            this.min = null;
+        }
+    }
+
+    private static class UnionScanColumn implements ScanColumn{
+
+        private final ScanColumn delegate;
+        private ScanColumn child;
+
+
+        private UnionScanColumn( final ScanColumn delegate ) {
+            super();
+            this.delegate = delegate;}
+
+
+        @Override
+        public int compareTo( final ScanColumn o ) {
+            return UUIDComparator.staticCompare( delegate.getUUID(), o.getUUID() );
+        }
+
+
+        @Override
+        public UUID getUUID() {
+            return delegate.getUUID();
+        }
+
+
+        @Override
+        public ByteBuffer getCursorValue() {
+            return ue.toByteBuffer( delegate.getUUID() );
+        }
+
+
+        @Override
+        public void setChild( final ScanColumn childColumn ) {
+           //intentionally a no-op, since child is on the delegate
+        }
+
+
+        @Override
+        public ScanColumn getChild() {
+            return delegate.getChild();
+        }
+
+
+        @Override
+        public boolean equals( final Object o ) {
+            if ( this == o ) {
+                return true;
+            }
+            if ( !( o instanceof UnionScanColumn ) ) {
+                return false;
+            }
+
+            final UnionScanColumn that = ( UnionScanColumn ) o;
+
+            return delegate.getUUID().equals( that.delegate.getUUID() );
+        }
+
+
+        @Override
+        public int hashCode() {
+            return delegate.getUUID().hashCode();
         }
     }
 }