You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/07/08 23:55:15 UTC
[1/2] incubator-usergrid git commit: Refactor of gather iterator to
clean up coordination
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-752 bbd563787 -> 219a425a6
Refactor of gather iterator to clean up coordination
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d872e030
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d872e030
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d872e030
Branch: refs/heads/USERGRID-752
Commit: d872e0305e85739e1f531793560c298ed6739919
Parents: bbd5637
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Jul 8 15:19:13 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Jul 8 15:19:13 2015 -0600
----------------------------------------------------------------------
.../query/ir/result/GatherIterator.java | 125 +++++++++++++------
1 file changed, 86 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d872e030/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
index 36104f9..c05e3cc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;
-import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeSet;
@@ -39,27 +38,15 @@ import org.apache.usergrid.persistence.query.ir.SearchVisitor;
public class GatherIterator implements ResultIterator {
- private List<Future<Void>> iterators;
- private final ConcurrentResultMerge merge;
- private boolean merged;
+ private final WorkerCoordinator workerCoordinator;
public GatherIterator( final int pageSize, final QueryNode rootNode, final Collection<SearchVisitor> searchVisitors,
final ExecutorService executorService ) {
- this.merge = new ConcurrentResultMerge( pageSize );
+ this.workerCoordinator = new WorkerCoordinator( executorService, searchVisitors, rootNode, pageSize );
- this.iterators = new ArrayList<Future<Void>>( searchVisitors.size() );
- this.merged = false;
-
-
- /**
- * Start our search processing
- */
- for ( SearchVisitor visitor : searchVisitors ) {
- final Future<Void> result = executorService.submit( new VisitorExecutor( rootNode, merge, visitor ) );
- iterators.add( result );
- }
+ this.workerCoordinator.start();
}
@@ -77,37 +64,86 @@ public class GatherIterator implements ResultIterator {
@Override
public boolean hasNext() {
- waitForCompletion();
+ return workerCoordinator.getResults().hasResults();
+ }
+
+
+ @Override
+ public Set<ScanColumn> next() {
+ if ( !hasNext() ) {
+ throw new NoSuchElementException( "No more elements" );
+ }
- return merge.results.size() > 0;
+ return workerCoordinator.getResults().copyAndClear();
}
- private void waitForCompletion() {
- if ( merged ) {
- return;
+ /**
+ * Coordinator object for all workers
+ */
+ private final class WorkerCoordinator {
+
+ private final ExecutorService executorService;
+ private final Collection<SearchVisitor> searchVisitors;
+ private final int pageSize;
+ private final QueryNode rootNode;
+ private ConcurrentResultMerge merge;
+ private ArrayList<Future<Void>> workers;
+
+
+ private WorkerCoordinator( final ExecutorService executorService,
+ final Collection<SearchVisitor> searchVisitors, final QueryNode rootNode,
+ final int pageSize ) {
+ this.executorService = executorService;
+ this.searchVisitors = searchVisitors;
+ this.rootNode = rootNode;
+ this.pageSize = pageSize;
}
- for ( final Future<Void> future : iterators ) {
- try {
- future.get();
- }
- catch ( Exception e ) {
- throw new RuntimeException( "Unable to aggregate results", e );
+
+ public void start() {
+ this.merge = new ConcurrentResultMerge( pageSize );
+
+
+ this.workers = new ArrayList<Future<Void>>( searchVisitors.size() );
+
+
+ /**
+ * Start our search processing
+ */
+ for ( SearchVisitor visitor : searchVisitors ) {
+ final VisitorExecutor executor = new VisitorExecutor( rootNode, merge, visitor );
+
+// try {
+// executor.call();
+// }
+// catch ( Exception e ) {
+// throw new RuntimeException( e );
+// }
+ final Future<Void> result = executorService.submit( executor);
+ workers.add( result );
}
}
- merged = true;
- }
+ /**
+ * Get the results of the merge, after all workers have finished
+ * @return
+ */
+ public ConcurrentResultMerge getResults() {
- @Override
- public Set<ScanColumn> next() {
- if ( !hasNext() ) {
- throw new NoSuchElementException( "No more elements" );
- }
+ //make sure all our workers are done
+ for ( final Future<Void> future : workers ) {
+ try {
+ future.get();
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Unable to aggregate results", e );
+ }
+ }
- return merge.copyAndClear();
+ return merge;
+ }
}
@@ -157,7 +193,7 @@ public class GatherIterator implements ResultIterator {
*/
private final class ConcurrentResultMerge {
- private final TreeSet<ScanColumn> results;
+ private TreeSet<ScanColumn> results;
private final int maxSize;
@@ -181,14 +217,25 @@ public class GatherIterator implements ResultIterator {
}
}
+
+ /**
+ * Return true if the merge has results
+ */
+ public boolean hasResults() {
+ return results != null && results.size() > 0;
+ }
+
+
/**
* Get the set
*/
- public Set<ScanColumn> copyAndClear() {
+ public synchronized Set<ScanColumn> copyAndClear() {
//create an immutable copy
- final Set<ScanColumn> toReturn = new LinkedHashSet<ScanColumn>( results );
- results.clear();
+ final Set<ScanColumn> toReturn = results ;
+ results = new TreeSet<ScanColumn>();
return toReturn;
}
+
+
}
}
[2/2] incubator-usergrid git commit: Adds slice duplicate. Otherwise
cursor concurrency errors occur.
Posted by to...@apache.org.
Adds slice duplicate. Otherwise cursor concurrency errors occur.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/219a425a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/219a425a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/219a425a
Branch: refs/heads/USERGRID-752
Commit: 219a425a656d33af1c9defbb08b6d73d574dadeb
Parents: d872e03
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Jul 8 15:55:12 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Jul 8 15:55:12 2015 -0600
----------------------------------------------------------------------
.../persistence/cassandra/QueryProcessor.java | 2 +-
.../persistence/query/ir/QuerySlice.java | 19 +++++++++++++++++++
.../persistence/query/ir/SearchVisitor.java | 4 ++--
3 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/219a425a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
index 10f4fa5..538f6d6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
@@ -220,7 +220,7 @@ public class QueryProcessor {
* Apply cursor position and sort order to this slice. This should only be invoke at evaluation time to ensure that
* the IR tree has already been fully constructed
*/
- public void applyCursorAndSort( QuerySlice slice ) {
+ public void applyCursorAndSort( QuerySlice slice ) {
// apply the sort first, since this can change the hash code
SortPredicate sort = getSort( slice.getPropertyName() );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/219a425a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/QuerySlice.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/QuerySlice.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/QuerySlice.java
index 77c0a6b..2b7d23c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/QuerySlice.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/QuerySlice.java
@@ -53,6 +53,25 @@ public class QuerySlice {
}
+
+ /**
+ * Create a deep copy of the query slice from the original query slice
+ * @param original
+ */
+ private QuerySlice(final QuerySlice original){
+ this.propertyName = original.propertyName;
+ this.nodeId = original.nodeId;
+ this.start = original.start;
+ this.finish = original.finish;
+ this.cursor = original.cursor;
+ this.reversed = original.reversed;
+ }
+
+
+ public QuerySlice duplicate(){
+ return new QuerySlice( this );
+ }
+
/** Reverse this slice. Flips the reversed switch and correctly changes the start and finish */
public void reverse() {
reversed = !reversed;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/219a425a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
index 2fe2e37..8c5fe2b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
@@ -229,7 +229,7 @@ public abstract class SearchVisitor implements NodeVisitor {
scanner = new NoOpIndexScanner();
}
else {
- scanner = secondaryIndexScan( orderByNode, slice );
+ scanner = secondaryIndexScan( orderByNode, slice.duplicate() );
}
final SliceCursorGenerator sliceCursorGenerator = new SliceCursorGenerator( slice );
@@ -264,7 +264,7 @@ public abstract class SearchVisitor implements NodeVisitor {
IntersectionIterator intersections = new IntersectionIterator( queryProcessor.getPageSizeHint( node ) );
for ( QuerySlice slice : node.getAllSlices() ) {
- IndexScanner scanner = secondaryIndexScan( node, slice );
+ IndexScanner scanner = secondaryIndexScan( node, slice.duplicate() );
final SliceCursorGenerator sliceCursorGenerator = new SliceCursorGenerator( slice );