You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/30 19:20:01 UTC
[23/25] incubator-usergrid git commit: Fixes shard allocation. Keeps
old allocation logic for deletion.
Fixes shard allocation. Keeps old allocation logic for deletion.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d1ca419f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d1ca419f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d1ca419f
Branch: refs/heads/master
Commit: d1ca419f3def8e92cefec8c84508185823d3d4ac
Parents: a22c996
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Jul 9 13:44:50 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Jul 9 13:44:50 2015 -0600
----------------------------------------------------------------------
.../persistence/cassandra/GeoIndexManager.java | 78 ++++++++++++++----
.../cassandra/QueryExecutorServiceImpl.java | 25 +++++-
.../cassandra/RelationManagerImpl.java | 86 ++++++++++++++++----
.../query/ir/result/ConnectionShardFilter.java | 26 ++----
.../ir/result/SearchConnectionVisitor.java | 4 +-
5 files changed, 165 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d1ca419f/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java
index ce5fab8..109881f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java
@@ -95,33 +95,34 @@ public class GeoIndexManager {
private static Mutator<ByteBuffer> batchAddConnectionIndexEntries( Mutator<ByteBuffer> m,
- IndexBucketLocator locator, UUID appId,
+ IndexBucketLocator locator, UUID entityId,
String propertyName, String geoCell,
UUID[] index_keys, ByteBuffer columnName,
ByteBuffer columnValue, long timestamp ) {
+ final String bucket = locator.getBucket( entityId );
+
+
// entity_id,prop_name
Object property_index_key =
key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL, geoCell,
- locator.getBucket(index_keys[ConnectionRefImpl.ALL] ) );
+ bucket );
// entity_id,entity_type,prop_name
Object entity_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL,
geoCell,
- locator.getBucket( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE] ) );
+ bucket );
// entity_id,connection_type,prop_name
Object connection_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, propertyName,
- DICTIONARY_GEOCELL, geoCell, locator.getBucket(
- index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE] ) );
+ DICTIONARY_GEOCELL, geoCell, bucket );
// entity_id,connection_type,entity_type,prop_name
Object connection_type_and_entity_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName,
- DICTIONARY_GEOCELL, geoCell, locator.getBucket(
- index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE] ) );
+ DICTIONARY_GEOCELL, geoCell, bucket );
// composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
addInsertToMutator( m, ENTITY_INDEX, property_index_key, columnName, columnValue, timestamp );
@@ -151,7 +152,7 @@ public class GeoIndexManager {
ByteBuffer columnValue = location.getColumnValue().serialize();
long ts = location.getTimestampInMicros();
for ( String cell : cells ) {
- batchAddConnectionIndexEntries( m, locator, appId, propertyName, cell, index_keys, columnName, columnValue,
+ batchAddConnectionIndexEntries( m, locator, location.getUuid(), propertyName, cell, index_keys, columnName, columnValue,
ts );
}
@@ -177,11 +178,15 @@ public class GeoIndexManager {
private static Mutator<ByteBuffer> batchDeleteConnectionIndexEntries( Mutator<ByteBuffer> m,
- IndexBucketLocator locator, UUID appId,
+ IndexBucketLocator locator, UUID entityId,
String propertyName, String geoCell,
UUID[] index_keys, ByteBuffer columnName,
long timestamp ) {
+ /**
+ * Legacy key scheme
+ */
+
// entity_id,prop_name
Object property_index_key =
key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL, geoCell,
@@ -221,6 +226,49 @@ public class GeoIndexManager {
m.addDeletion( bytebuffer( connection_type_and_entity_type_prop_index_key ), ENTITY_INDEX.toString(),
columnName, ByteBufferSerializer.get(), timestamp );
+
+ /**
+ * New key scheme
+ */
+
+ final String bucket = locator.getBucket( entityId );
+
+ // entity_id,prop_name
+ Object property_index_key_new =
+ key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL, geoCell,
+ bucket );
+
+ // entity_id,entity_type,prop_name
+ Object entity_type_prop_index_key_new =
+ key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL,
+ geoCell, bucket );
+
+ // entity_id,connection_type,prop_name
+ Object connection_type_prop_index_key_new =
+ key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, propertyName,
+ DICTIONARY_GEOCELL, geoCell, bucket );
+
+ // entity_id,connection_type,entity_type,prop_name
+ Object connection_type_and_entity_type_prop_index_key_new =
+ key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName,
+ DICTIONARY_GEOCELL, geoCell, bucket );
+
+ // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
+ m.addDeletion( bytebuffer( property_index_key_new ), ENTITY_INDEX.toString(), columnName,
+ ByteBufferSerializer.get(), timestamp );
+
+ // composite(property_value,connected_entity_id,connection_type,entry_timestamp)
+ m.addDeletion( bytebuffer( entity_type_prop_index_key_new ), ENTITY_INDEX.toString(), columnName,
+ ByteBufferSerializer.get(), timestamp );
+
+ // composite(property_value,connected_entity_id,entity_type,entry_timestamp)
+ m.addDeletion( bytebuffer( connection_type_prop_index_key_new ), ENTITY_INDEX.toString(), columnName,
+ ByteBufferSerializer.get(), timestamp );
+
+ // composite(property_value,connected_entity_id,entry_timestamp)
+ m.addDeletion( bytebuffer( connection_type_and_entity_type_prop_index_key_new ), ENTITY_INDEX.toString(),
+ columnName, ByteBufferSerializer.get(), timestamp );
+
return m;
}
@@ -238,7 +286,7 @@ public class GeoIndexManager {
for ( String cell : cells ) {
- batchDeleteConnectionIndexEntries( m, locator, appId, propertyName, cell, index_keys, columnName, ts );
+ batchDeleteConnectionIndexEntries( m, locator, location.getUuid(), propertyName, cell, index_keys, columnName, ts );
}
logger.info( "Geocells to be saved for Point({} , {} ) are: {}", new Object[] {
@@ -247,8 +295,7 @@ public class GeoIndexManager {
}
- public static void batchStoreLocationInCollectionIndex( Mutator<ByteBuffer> m, IndexBucketLocator locator,
- UUID appId, Object key, UUID entityId,
+ public static void batchStoreLocationInCollectionIndex( Mutator<ByteBuffer> m, IndexBucketLocator locator, Object key, UUID entityId,
EntityLocationRef location ) {
Point p = location.getPoint();
@@ -276,15 +323,14 @@ public class GeoIndexManager {
Keyspace ko = cass.getApplicationKeyspace( em.getApplicationId() );
Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, ByteBufferSerializer.get() );
- batchStoreLocationInCollectionIndex( m, em.getIndexBucketLocator(), em.getApplicationId(),
+ batchStoreLocationInCollectionIndex( m, em.getIndexBucketLocator(),
key( owner.getUuid(), collectionName, propertyName ), owner.getUuid(), location );
batchExecute( m, CassandraService.RETRY_COUNT );
}
- public static void batchRemoveLocationFromCollectionIndex( Mutator<ByteBuffer> m, IndexBucketLocator locator,
- UUID appId, Object key, EntityLocationRef location ) {
+ public static void batchRemoveLocationFromCollectionIndex( Mutator<ByteBuffer> m, IndexBucketLocator locator, Object key, EntityLocationRef location ) {
Point p = location.getPoint();
List<String> cells = GeocellManager.generateGeoCell( p );
@@ -314,7 +360,7 @@ public class GeoIndexManager {
Keyspace ko = cass.getApplicationKeyspace( em.getApplicationId() );
Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, ByteBufferSerializer.get() );
- batchRemoveLocationFromCollectionIndex( m, em.getIndexBucketLocator(), em.getApplicationId(),
+ batchRemoveLocationFromCollectionIndex( m, em.getIndexBucketLocator(),
key( owner.getUuid(), collectionName, propertyName ), location );
batchExecute( m, CassandraService.RETRY_COUNT );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d1ca419f/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryExecutorServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryExecutorServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryExecutorServiceImpl.java
index 7376641..3504a3f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryExecutorServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryExecutorServiceImpl.java
@@ -21,8 +21,10 @@ package org.apache.usergrid.persistence.cassandra;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +72,7 @@ public class QueryExecutorServiceImpl implements QueryExecutorService {
}
- executorService = new ThreadPoolExecutor( threadCount, threadCount, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>( ), new CallerRunsExecutionHandler() );
+ executorService = new ThreadPoolExecutor( threadCount, threadCount, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>( ), new QueryThreadFactory(), new CallerRunsExecutionHandler() );
return executorService;
}
@@ -92,4 +94,25 @@ public class QueryExecutorServiceImpl implements QueryExecutorService {
}
}
+ /**
+ * Simple factory for labeling job worker threads for easier debugging
+ */
+ private static final class QueryThreadFactory implements ThreadFactory {
+
+ public static final QueryThreadFactory INSTANCE = new QueryThreadFactory();
+
+ private static final String NAME = "query-";
+ private final AtomicLong counter = new AtomicLong();
+
+
+ @Override
+ public Thread newThread( final Runnable r ) {
+
+ Thread newThread = new Thread( r, NAME + counter.incrementAndGet() );
+ newThread.setDaemon( true );
+
+ return newThread;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d1ca419f/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
index 86e1690..587f17d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
@@ -250,7 +250,7 @@ public class RelationManagerImpl implements RelationManager {
EntityLocationRef loc =
new EntityLocationRef( indexUpdate.getEntity(), indexEntry.getTimestampUuid(),
indexEntry.getValue().toString() );
- batchStoreLocationInCollectionIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
+ batchStoreLocationInCollectionIndex( indexUpdate.getBatch(), indexBucketLocator,
index_name, indexedEntity.getUuid(), loc );
}
@@ -555,6 +555,12 @@ public class RelationManagerImpl implements RelationManager {
ConnectionRefImpl connection, UUID[] index_keys )
throws Exception {
+
+ /**
+ * Original bucket scheme. Incorrect and legacy, but we need to keep it b/c we're not sure if the original write was the
+ * incorrect legacy system
+ */
+
// entity_id,prop_name
Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(),
indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.ALL] ) );
@@ -562,20 +568,19 @@ public class RelationManagerImpl implements RelationManager {
// entity_id,entity_type,prop_name
Object entity_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
- indexBucketLocator.getBucket(
- index_keys[ConnectionRefImpl.BY_ENTITY_TYPE] ) );
+ indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE] ) );
// entity_id,connection_type,prop_name
Object connection_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(),
- indexBucketLocator.getBucket(
- index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE]) );
+ indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE] ) );
// entity_id,connection_type,entity_type,prop_name
Object connection_type_and_entity_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
- indexBucketLocator.getBucket(
- index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE]) );
+ indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE] ) );
+
+
// composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key,
@@ -596,35 +601,85 @@ public class RelationManagerImpl implements RelationManager {
addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key,
entry.getIndexComposite( connection.getConnectedEntityId() ), indexUpdate.getTimestamp() );
+
+ /**
+ * New bucket scheme for deletes
+ */
+
+ final UUID entityId = connection.getConnectedEntityId();
+ final String bucket = indexBucketLocator.getBucket( entityId );
+
+
+ // entity_id,prop_name
+ Object property_index_key_new =
+ key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(), bucket );
+
+ // entity_id,entity_type,prop_name
+ Object entity_type_prop_index_key_new =
+ key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(), bucket );
+
+ // entity_id,connection_type,prop_name
+ Object connection_type_prop_index_key_new =
+ key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(), bucket );
+
+ // entity_id,connection_type,entity_type,prop_name
+ Object connection_type_and_entity_type_prop_index_key_new =
+ key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
+ bucket );
+
+
+ // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
+ addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key_new,
+ entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType(),
+ connection.getConnectedEntityType() ), indexUpdate.getTimestamp() );
+
+ // composite(property_value,connected_entity_id,connection_type,entry_timestamp)
+ addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, entity_type_prop_index_key_new,
+ entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType() ),
+ indexUpdate.getTimestamp() );
+
+ // composite(property_value,connected_entity_id,entity_type,entry_timestamp)
+ addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_prop_index_key_new,
+ entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectedEntityType() ),
+ indexUpdate.getTimestamp() );
+
+ // composite(property_value,connected_entity_id,entry_timestamp)
+ addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key_new,
+ entry.getIndexComposite( connection.getConnectedEntityId() ), indexUpdate.getTimestamp() );
+
+
return indexUpdate.getBatch();
}
+
+
@Metered(group = "core", name = "RelationManager_batchAddConnectionIndexEntries")
public Mutator<ByteBuffer> batchAddConnectionIndexEntries( IndexUpdate indexUpdate, IndexEntry entry,
ConnectionRefImpl connection, UUID[] index_keys ) {
+ final UUID entityId = connection.getConnectedEntityId();
+
+ final String bucket = indexBucketLocator.getBucket( entityId );
+
// entity_id,prop_name
Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(),
- indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.ALL] ) );
+ bucket );
// entity_id,entity_type,prop_name
Object entity_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
- indexBucketLocator.getBucket(
- index_keys[ConnectionRefImpl.BY_ENTITY_TYPE]) );
+ bucket );
// entity_id,connection_type,prop_name
Object connection_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(),
- indexBucketLocator.getBucket(
- index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE] ) );
+ bucket );
// entity_id,connection_type,entity_type,prop_name
Object connection_type_and_entity_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
- indexBucketLocator.getBucket(
- index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE]) );
+ bucket );
// composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key,
@@ -725,7 +780,8 @@ public class RelationManagerImpl implements RelationManager {
public Set<String> getConnectionIndexes( ConnectionRefImpl connection ) throws Exception {
List<HColumn<String, String>> results =
cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_DICTIONARIES,
- key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES ), Serializers.se, Serializers.se );
+ key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES ), Serializers.se,
+ Serializers.se );
Set<String> indexes = new TreeSet<String>();
if ( results != null ) {
for ( HColumn<String, String> column : results ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d1ca419f/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionShardFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionShardFilter.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionShardFilter.java
index f56dc2b..2fec250 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionShardFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionShardFilter.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.query.ir.result;
import java.util.UUID;
import org.apache.usergrid.persistence.IndexBucketLocator;
-import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
/**
@@ -30,35 +29,22 @@ import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
public final class ConnectionShardFilter implements ShardFilter {
private final IndexBucketLocator indexBucketLocator;
private final String expectedBucket;
- private final ConnectionRefImpl searchConnection;
- public ConnectionShardFilter( final IndexBucketLocator indexBucketLocator, final String expectedBucket,
- final ConnectionRefImpl connection ) {
+ public ConnectionShardFilter( final IndexBucketLocator indexBucketLocator, final String expectedBucket ) {
this.indexBucketLocator = indexBucketLocator;
this.expectedBucket = expectedBucket;
- this.searchConnection = connection;
-
-
}
-
-
-
public boolean isInShard( final ScanColumn scanColumn ) {
- //shard hashing is currently based on source. this is a placeholder for when this is fixed.
-// UUID[] indexIds = searchConnection.getIndexIds();
-//
-// final String shard = indexBucketLocator.getBucket(indexIds[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE] );
-//
-// return expectedBucket.equals( shard );
-
- return true;
-//
- }
+ final UUID entityId = scanColumn.getUUID();
+ //not for our current processing shard, discard
+ final String shard = indexBucketLocator.getBucket( entityId );
+ return expectedBucket.equals( shard );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d1ca419f/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
index f518297..909ae5d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
@@ -136,7 +136,7 @@ public class SearchConnectionVisitor extends SearchVisitor {
final ConnectionShardFilter
- validator = new ConnectionShardFilter(indexBucketLocator, bucket, connection );
+ validator = new ConnectionShardFilter(indexBucketLocator, bucket );
this.results.push( new ShardFilterIterator( validator, itr, size ) );
@@ -213,7 +213,7 @@ public class SearchConnectionVisitor extends SearchVisitor {
//we have to create our wrapper so validate the data we read is correct for our shard
- final ConnectionShardFilter connectionShardFilter = new ConnectionShardFilter( indexBucketLocator, bucket, connection);
+ final ConnectionShardFilter connectionShardFilter = new ConnectionShardFilter( indexBucketLocator, bucket);
final SliceIterator sliceIterator = new SliceIterator( connectionScanner, connectionParser );