You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/30 19:20:01 UTC

[23/25] incubator-usergrid git commit: Fixes shard allocation. Keeps old allocation logic for deletion.

Fixes shard allocation.  Keeps old allocation logic for deletion.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d1ca419f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d1ca419f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d1ca419f

Branch: refs/heads/master
Commit: d1ca419f3def8e92cefec8c84508185823d3d4ac
Parents: a22c996
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Jul 9 13:44:50 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Jul 9 13:44:50 2015 -0600

----------------------------------------------------------------------
 .../persistence/cassandra/GeoIndexManager.java  | 78 ++++++++++++++----
 .../cassandra/QueryExecutorServiceImpl.java     | 25 +++++-
 .../cassandra/RelationManagerImpl.java          | 86 ++++++++++++++++----
 .../query/ir/result/ConnectionShardFilter.java  | 26 ++----
 .../ir/result/SearchConnectionVisitor.java      |  4 +-
 5 files changed, 165 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d1ca419f/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java
index ce5fab8..109881f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java
@@ -95,33 +95,34 @@ public class GeoIndexManager {
 
 
     private static Mutator<ByteBuffer> batchAddConnectionIndexEntries( Mutator<ByteBuffer> m,
-                                                                       IndexBucketLocator locator, UUID appId,
+                                                                       IndexBucketLocator locator, UUID entityId,
                                                                        String propertyName, String geoCell,
                                                                        UUID[] index_keys, ByteBuffer columnName,
                                                                        ByteBuffer columnValue, long timestamp ) {
 
+        final String bucket = locator.getBucket( entityId );
+
+
         // entity_id,prop_name
         Object property_index_key =
                 key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL, geoCell,
-                        locator.getBucket(index_keys[ConnectionRefImpl.ALL] ) );
+                        bucket );
 
         // entity_id,entity_type,prop_name
         Object entity_type_prop_index_key =
                 key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL,
                         geoCell,
-                        locator.getBucket( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE] ) );
+                        bucket );
 
         // entity_id,connection_type,prop_name
         Object connection_type_prop_index_key =
                 key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, propertyName,
-                        DICTIONARY_GEOCELL, geoCell, locator.getBucket(
-                        index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE] ) );
+                        DICTIONARY_GEOCELL, geoCell, bucket );
 
         // entity_id,connection_type,entity_type,prop_name
         Object connection_type_and_entity_type_prop_index_key =
                 key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName,
-                        DICTIONARY_GEOCELL, geoCell, locator.getBucket(
-                        index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE] ) );
+                        DICTIONARY_GEOCELL, geoCell, bucket );
 
         // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
         addInsertToMutator( m, ENTITY_INDEX, property_index_key, columnName, columnValue, timestamp );
@@ -151,7 +152,7 @@ public class GeoIndexManager {
         ByteBuffer columnValue = location.getColumnValue().serialize();
         long ts = location.getTimestampInMicros();
         for ( String cell : cells ) {
-            batchAddConnectionIndexEntries( m, locator, appId, propertyName, cell, index_keys, columnName, columnValue,
+            batchAddConnectionIndexEntries( m, locator, location.getUuid(), propertyName, cell, index_keys, columnName, columnValue,
                     ts );
         }
 
@@ -177,11 +178,15 @@ public class GeoIndexManager {
 
 
     private static Mutator<ByteBuffer> batchDeleteConnectionIndexEntries( Mutator<ByteBuffer> m,
-                                                                          IndexBucketLocator locator, UUID appId,
+                                                                          IndexBucketLocator locator, UUID entityId,
                                                                           String propertyName, String geoCell,
                                                                           UUID[] index_keys, ByteBuffer columnName,
                                                                           long timestamp ) {
 
+        /**
+         * Legacy key scheme
+         */
+
         // entity_id,prop_name
         Object property_index_key =
                 key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL, geoCell,
@@ -221,6 +226,49 @@ public class GeoIndexManager {
         m.addDeletion( bytebuffer( connection_type_and_entity_type_prop_index_key ), ENTITY_INDEX.toString(),
                 columnName, ByteBufferSerializer.get(), timestamp );
 
+
+        /**
+         * New key scheme
+         */
+
+        final String bucket = locator.getBucket( entityId );
+
+        // entity_id,prop_name
+        Object property_index_key_new =
+                key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL, geoCell,
+                        bucket );
+
+        // entity_id,entity_type,prop_name
+        Object entity_type_prop_index_key_new =
+                key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL,
+                        geoCell, bucket );
+
+        // entity_id,connection_type,prop_name
+        Object connection_type_prop_index_key_new =
+                key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, propertyName,
+                        DICTIONARY_GEOCELL, geoCell, bucket );
+
+        // entity_id,connection_type,entity_type,prop_name
+        Object connection_type_and_entity_type_prop_index_key_new =
+                key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName,
+                        DICTIONARY_GEOCELL, geoCell, bucket );
+
+        // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
+        m.addDeletion( bytebuffer( property_index_key_new ), ENTITY_INDEX.toString(), columnName,
+                ByteBufferSerializer.get(), timestamp );
+
+        // composite(property_value,connected_entity_id,connection_type,entry_timestamp)
+        m.addDeletion( bytebuffer( entity_type_prop_index_key_new ), ENTITY_INDEX.toString(), columnName,
+                ByteBufferSerializer.get(), timestamp );
+
+        // composite(property_value,connected_entity_id,entity_type,entry_timestamp)
+        m.addDeletion( bytebuffer( connection_type_prop_index_key_new ), ENTITY_INDEX.toString(), columnName,
+                ByteBufferSerializer.get(), timestamp );
+
+        // composite(property_value,connected_entity_id,entry_timestamp)
+        m.addDeletion( bytebuffer( connection_type_and_entity_type_prop_index_key_new ), ENTITY_INDEX.toString(),
+                columnName, ByteBufferSerializer.get(), timestamp );
+
         return m;
     }
 
@@ -238,7 +286,7 @@ public class GeoIndexManager {
 
         for ( String cell : cells ) {
 
-            batchDeleteConnectionIndexEntries( m, locator, appId, propertyName, cell, index_keys, columnName, ts );
+            batchDeleteConnectionIndexEntries( m, locator, location.getUuid(), propertyName, cell, index_keys, columnName, ts );
         }
 
         logger.info( "Geocells to be saved for Point({} , {} ) are: {}", new Object[] {
@@ -247,8 +295,7 @@ public class GeoIndexManager {
     }
 
 
-    public static void batchStoreLocationInCollectionIndex( Mutator<ByteBuffer> m, IndexBucketLocator locator,
-                                                            UUID appId, Object key, UUID entityId,
+    public static void batchStoreLocationInCollectionIndex( Mutator<ByteBuffer> m, IndexBucketLocator locator, Object key, UUID entityId,
                                                             EntityLocationRef location ) {
 
         Point p = location.getPoint();
@@ -276,15 +323,14 @@ public class GeoIndexManager {
         Keyspace ko = cass.getApplicationKeyspace( em.getApplicationId() );
         Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, ByteBufferSerializer.get() );
 
-        batchStoreLocationInCollectionIndex( m, em.getIndexBucketLocator(), em.getApplicationId(),
+        batchStoreLocationInCollectionIndex( m, em.getIndexBucketLocator(),
                 key( owner.getUuid(), collectionName, propertyName ), owner.getUuid(), location );
 
         batchExecute( m, CassandraService.RETRY_COUNT );
     }
 
 
-    public static void batchRemoveLocationFromCollectionIndex( Mutator<ByteBuffer> m, IndexBucketLocator locator,
-                                                               UUID appId, Object key, EntityLocationRef location ) {
+    public static void batchRemoveLocationFromCollectionIndex( Mutator<ByteBuffer> m, IndexBucketLocator locator, Object key, EntityLocationRef location ) {
 
         Point p = location.getPoint();
         List<String> cells = GeocellManager.generateGeoCell( p );
@@ -314,7 +360,7 @@ public class GeoIndexManager {
         Keyspace ko = cass.getApplicationKeyspace( em.getApplicationId() );
         Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, ByteBufferSerializer.get() );
 
-        batchRemoveLocationFromCollectionIndex( m, em.getIndexBucketLocator(), em.getApplicationId(),
+        batchRemoveLocationFromCollectionIndex( m, em.getIndexBucketLocator(),
                 key( owner.getUuid(), collectionName, propertyName ), location );
 
         batchExecute( m, CassandraService.RETRY_COUNT );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d1ca419f/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryExecutorServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryExecutorServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryExecutorServiceImpl.java
index 7376641..3504a3f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryExecutorServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryExecutorServiceImpl.java
@@ -21,8 +21,10 @@ package org.apache.usergrid.persistence.cassandra;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,7 +72,7 @@ public class QueryExecutorServiceImpl implements QueryExecutorService {
        }
 
 
-        executorService = new ThreadPoolExecutor( threadCount, threadCount, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>( ), new CallerRunsExecutionHandler() );
+        executorService = new ThreadPoolExecutor( threadCount, threadCount, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>( ), new QueryThreadFactory(), new CallerRunsExecutionHandler() );
         return executorService;
     }
 
@@ -92,4 +94,25 @@ public class QueryExecutorServiceImpl implements QueryExecutorService {
         }
     }
 
+    /**
+      * Simple factory for labeling job worker threads for easier debugging
+      */
+     private static final class QueryThreadFactory implements ThreadFactory {
+
+         public static final QueryThreadFactory INSTANCE = new QueryThreadFactory();
+
+         private static final String NAME = "query-";
+         private final AtomicLong counter = new AtomicLong();
+
+
+         @Override
+         public Thread newThread( final Runnable r ) {
+
+             Thread newThread = new Thread( r, NAME + counter.incrementAndGet() );
+             newThread.setDaemon( true );
+
+             return newThread;
+         }
+     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d1ca419f/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
index 86e1690..587f17d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
@@ -250,7 +250,7 @@ public class RelationManagerImpl implements RelationManager {
                     EntityLocationRef loc =
                             new EntityLocationRef( indexUpdate.getEntity(), indexEntry.getTimestampUuid(),
                                     indexEntry.getValue().toString() );
-                    batchStoreLocationInCollectionIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
+                    batchStoreLocationInCollectionIndex( indexUpdate.getBatch(), indexBucketLocator,
                             index_name, indexedEntity.getUuid(), loc );
                 }
 
@@ -555,6 +555,12 @@ public class RelationManagerImpl implements RelationManager {
                                                                   ConnectionRefImpl connection, UUID[] index_keys )
             throws Exception {
 
+
+        /**
+         * Original bucket scheme.  Incorrect and legacy, but we need to keep it b/c we're not sure if the original write was the
+         * incorrect legacy system
+         */
+
         // entity_id,prop_name
         Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(),
                 indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.ALL] ) );
@@ -562,20 +568,19 @@ public class RelationManagerImpl implements RelationManager {
         // entity_id,entity_type,prop_name
         Object entity_type_prop_index_key =
                 key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket(
-                                index_keys[ConnectionRefImpl.BY_ENTITY_TYPE] ) );
+                        indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE] ) );
 
         // entity_id,connection_type,prop_name
         Object connection_type_prop_index_key =
                 key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket(
-                                index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE]) );
+                        indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE] ) );
 
         // entity_id,connection_type,entity_type,prop_name
         Object connection_type_and_entity_type_prop_index_key =
                 key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket(
-                                index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE]) );
+                        indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE] ) );
+
+
 
         // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
         addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key,
@@ -596,35 +601,85 @@ public class RelationManagerImpl implements RelationManager {
         addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key,
                 entry.getIndexComposite( connection.getConnectedEntityId() ), indexUpdate.getTimestamp() );
 
+
+        /**
+         * New bucket scheme for deletes
+         */
+
+        final UUID entityId = connection.getConnectedEntityId();
+        final String bucket = indexBucketLocator.getBucket( entityId );
+
+
+        // entity_id,prop_name
+        Object property_index_key_new =
+                key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(), bucket );
+
+        // entity_id,entity_type,prop_name
+        Object entity_type_prop_index_key_new =
+                key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(), bucket );
+
+        // entity_id,connection_type,prop_name
+        Object connection_type_prop_index_key_new =
+                key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(), bucket );
+
+        // entity_id,connection_type,entity_type,prop_name
+        Object connection_type_and_entity_type_prop_index_key_new =
+                key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
+                        bucket );
+
+
+        // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
+        addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key_new,
+                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType(),
+                        connection.getConnectedEntityType() ), indexUpdate.getTimestamp() );
+
+        // composite(property_value,connected_entity_id,connection_type,entry_timestamp)
+        addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, entity_type_prop_index_key_new,
+                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType() ),
+                indexUpdate.getTimestamp() );
+
+        // composite(property_value,connected_entity_id,entity_type,entry_timestamp)
+        addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_prop_index_key_new,
+                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectedEntityType() ),
+                indexUpdate.getTimestamp() );
+
+        // composite(property_value,connected_entity_id,entry_timestamp)
+        addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key_new,
+                entry.getIndexComposite( connection.getConnectedEntityId() ), indexUpdate.getTimestamp() );
+
+
         return indexUpdate.getBatch();
     }
 
 
+
+
     @Metered(group = "core", name = "RelationManager_batchAddConnectionIndexEntries")
     public Mutator<ByteBuffer> batchAddConnectionIndexEntries( IndexUpdate indexUpdate, IndexEntry entry,
                                                                ConnectionRefImpl connection, UUID[] index_keys ) {
 
+        final UUID entityId = connection.getConnectedEntityId();
+
+        final String bucket = indexBucketLocator.getBucket( entityId );
+
         // entity_id,prop_name
         Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(),
-                indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.ALL] ) );
+                bucket );
 
         // entity_id,entity_type,prop_name
         Object entity_type_prop_index_key =
                 key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket(
-                                index_keys[ConnectionRefImpl.BY_ENTITY_TYPE]) );
+                        bucket );
 
         // entity_id,connection_type,prop_name
         Object connection_type_prop_index_key =
                 key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket(
-                                index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE] ) );
+                        bucket );
 
         // entity_id,connection_type,entity_type,prop_name
         Object connection_type_and_entity_type_prop_index_key =
                 key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket(
-                                index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE]) );
+                        bucket );
 
         // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
         addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key,
@@ -725,7 +780,8 @@ public class RelationManagerImpl implements RelationManager {
     public Set<String> getConnectionIndexes( ConnectionRefImpl connection ) throws Exception {
         List<HColumn<String, String>> results =
                 cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_DICTIONARIES,
-                        key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES ), Serializers.se, Serializers.se );
+                        key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES ), Serializers.se,
+                        Serializers.se );
         Set<String> indexes = new TreeSet<String>();
         if ( results != null ) {
             for ( HColumn<String, String> column : results ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d1ca419f/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionShardFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionShardFilter.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionShardFilter.java
index f56dc2b..2fec250 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionShardFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionShardFilter.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.query.ir.result;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.IndexBucketLocator;
-import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
 
 
 /**
@@ -30,35 +29,22 @@ import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
 public final class ConnectionShardFilter implements ShardFilter {
     private final IndexBucketLocator indexBucketLocator;
     private final String expectedBucket;
-    private final ConnectionRefImpl searchConnection;
 
 
-    public ConnectionShardFilter( final IndexBucketLocator indexBucketLocator, final String expectedBucket,
-                                  final ConnectionRefImpl connection ) {
+    public ConnectionShardFilter( final IndexBucketLocator indexBucketLocator, final String expectedBucket ) {
         this.indexBucketLocator = indexBucketLocator;
         this.expectedBucket = expectedBucket;
-        this.searchConnection = connection;
-
-
     }
 
 
-
-
-
     public boolean isInShard( final ScanColumn scanColumn ) {
 
 
-        //shard hashing is currently based on source.  this is a placeholder for when this is fixed.
-//        UUID[] indexIds = searchConnection.getIndexIds();
-//
-//        final String shard = indexBucketLocator.getBucket(indexIds[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE] );
-//
-//        return expectedBucket.equals( shard );
-
-        return true;
-//
-    }
+        final UUID entityId = scanColumn.getUUID();
 
+        //not for our current processing shard, discard
+        final String shard = indexBucketLocator.getBucket( entityId );
 
+        return expectedBucket.equals( shard );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d1ca419f/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
index f518297..909ae5d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
@@ -136,7 +136,7 @@ public class SearchConnectionVisitor extends SearchVisitor {
 
 
         final ConnectionShardFilter
-                validator = new ConnectionShardFilter(indexBucketLocator, bucket, connection );
+                validator = new ConnectionShardFilter(indexBucketLocator, bucket );
 
 
         this.results.push( new ShardFilterIterator( validator, itr, size ) );
@@ -213,7 +213,7 @@ public class SearchConnectionVisitor extends SearchVisitor {
         //we have to create our wrapper so validate the data we read is correct for our shard
 
 
-        final ConnectionShardFilter connectionShardFilter = new ConnectionShardFilter( indexBucketLocator, bucket, connection);
+        final ConnectionShardFilter connectionShardFilter = new ConnectionShardFilter( indexBucketLocator, bucket);
 
 
         final SliceIterator sliceIterator = new SliceIterator( connectionScanner, connectionParser );