You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/30 19:19:54 UTC
[16/25] incubator-usergrid git commit: First pass at concurrent merge
First pass at concurrent merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b14a9ae1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b14a9ae1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b14a9ae1
Branch: refs/heads/master
Commit: b14a9ae144e1a2836a4f3c96479851469879ded7
Parents: 4444803
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Jul 7 12:05:22 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Jul 7 12:05:22 2015 -0600
----------------------------------------------------------------------
.../persistence/cassandra/QueryProcessor.java | 30 +++-
.../query/ir/result/GatherIterator.java | 162 ++++++++++---------
2 files changed, 119 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b14a9ae1/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
index 32fc07c..a2e0f60 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
@@ -22,6 +22,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Stack;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,6 +94,11 @@ public class QueryProcessor {
private final EntityManager em;
private final ResultsLoaderFactory loaderFactory;
+ //we set up a thread pool with 100 execution threads. We purposefully use a synchronous queue and a caller runs so that we simply reject and immediately execute tasks if all 100 threads are occupied.
+
+ //we make this static, we only want 1 instance of this service in the JVM. Otherwise tests fail
+ private static final ExecutorService executorService = new ThreadPoolExecutor( 100, 100, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>( ), new CallerRunsExecutionHandler() );
+
private Operand rootOperand;
private List<SortPredicate> sorts;
private CursorCache cursorCache;
@@ -270,7 +281,7 @@ public class QueryProcessor {
//use the gather iterator to collect all the '
final int resultSetSize = Math.min( size, Query.MAX_LIMIT );
- ResultIterator itr = new GatherIterator(resultSetSize, rootNode, searchVisitorFactory.createVisitors() );
+ ResultIterator itr = new GatherIterator(resultSetSize, rootNode, searchVisitorFactory.createVisitors(), executorService );
List<ScanColumn> entityIds = new ArrayList<ScanColumn>( );
@@ -683,4 +694,21 @@ public class QueryProcessor {
public EntityManager getEntityManager() {
return em;
}
+
+
+ private static class CallerRunsExecutionHandler implements RejectedExecutionHandler{
+
+ private static final Logger logger = LoggerFactory.getLogger( CallerRunsExecutionHandler.class );
+
+ @Override
+ public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+ //when a task is rejected due to back pressure, we just want to run it in the caller.
+
+ logger.warn( "Concurrent shard execution rejected the task in executor {}, running it in the caller thread", executor );
+
+ r.run();
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b14a9ae1/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
index e199d33..f4ffcd2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
@@ -19,19 +19,16 @@ package org.apache.usergrid.persistence.query.ir.result;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
-import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
-import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
-import org.apache.usergrid.persistence.ResultsIterator;
-import org.apache.usergrid.persistence.cassandra.CursorCache;
import org.apache.usergrid.persistence.query.ir.QueryNode;
import org.apache.usergrid.persistence.query.ir.SearchVisitor;
@@ -42,18 +39,27 @@ import org.apache.usergrid.persistence.query.ir.SearchVisitor;
public class GatherIterator implements ResultIterator {
- private final QueryNode rootNode;
- private final int pageSize;
+ private List<Future<Void>> iterators;
+ private final ConcurrentResultMerge merge;
+ private boolean merged;
- private Iterator<ScanColumn> mergedIterators;
- private List<ResultIterator> iterators;
+ public GatherIterator( final int pageSize, final QueryNode rootNode, final Collection<SearchVisitor> searchVisitors,
+ final ExecutorService executorService ) {
+ this.merge = new ConcurrentResultMerge( pageSize );
- public GatherIterator(final int pageSize, final QueryNode rootNode, final Collection<SearchVisitor> searchVisitors) {
- this.pageSize = pageSize;
- this.rootNode = rootNode;
- createIterators( searchVisitors );
+ this.iterators = new ArrayList<Future<Void>>( searchVisitors.size() );
+ this.merged = false;
+
+
+ /**
+ * Start our search processing
+ */
+ for ( SearchVisitor visitor : searchVisitors ) {
+ final Future<Void> result = executorService.submit( new VisitorExecutor( rootNode, merge, visitor ) );
+ iterators.add( result );
+ }
}
@@ -63,7 +69,6 @@ public class GatherIterator implements ResultIterator {
}
-
@Override
public Iterator<Set<ScanColumn>> iterator() {
return this;
@@ -72,106 +77,119 @@ public class GatherIterator implements ResultIterator {
@Override
public boolean hasNext() {
- if(mergedIterators == null || !mergedIterators.hasNext()){
- mergeIterators();
- }
+ waitForCompletion();
- return mergedIterators.hasNext();
+ return merge.results.size() > 0;
}
- @Override
- public Set<ScanColumn> next() {
- if(!hasNext()){
- throw new NoSuchElementException( "No more elements" );
+ private void waitForCompletion() {
+ if ( merged ) {
+ return;
}
- return getNextPage();
- }
-
- private void createIterators(final Collection<SearchVisitor> searchVisitors ){
-
- this.iterators = new ArrayList<ResultIterator>( searchVisitors.size() );
-
- for(SearchVisitor visitor: searchVisitors){
-
+ for ( final Future<Void> future : iterators ) {
try {
- rootNode.visit( visitor );
+ future.get();
}
catch ( Exception e ) {
- throw new RuntimeException( "Unable to process query", e );
+ throw new RuntimeException( "Unable to aggregate results" );
}
+ }
- final ResultIterator iterator = visitor.getResults();
+ merged = true;
+ }
- iterators.add( iterator );
+ @Override
+ public Set<ScanColumn> next() {
+ if ( !hasNext() ) {
+ throw new NoSuchElementException( "No more elements" );
}
+ return merge.copyAndClear();
}
/**
- * Get the next page of results
- * @return
+ * A visitor that will visit and get the first page of an set and return them.
*/
- private Set<ScanColumn> getNextPage(){
+ private final class VisitorExecutor implements Callable<Void> {
+
+ private final QueryNode rootNode;
+ private final SearchVisitor visitor;
+ private final ConcurrentResultMerge merge;
- //try to take from our PageSize
- LinkedHashSet<ScanColumn> resultSet = new LinkedHashSet<ScanColumn>( pageSize );
- for(int i = 0; i < pageSize && mergedIterators.hasNext(); i ++){
- resultSet.add( mergedIterators.next() );
+ private VisitorExecutor( final QueryNode rootNode, final ConcurrentResultMerge merge,
+ final SearchVisitor visitor ) {
+ this.rootNode = rootNode;
+ this.visitor = visitor;
+ this.merge = merge;
}
- return resultSet;
- }
+ @Override
+ public Void call() throws Exception {
- /**
- * Advance the iterator
- */
- private void mergeIterators(){
- //TODO make this concurrent
+ try {
+ rootNode.visit( visitor );
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Unable to process query", e );
+ }
- TreeSet<ScanColumn> merged = new TreeSet<ScanColumn>( );
+ final ResultIterator iterator = visitor.getResults();
- for(ResultIterator iterator: this.iterators){
- merge(merged, iterator);
- }
- mergedIterators = merged.iterator();
+ if ( iterator.hasNext() ) {
+ merge.merge( iterator.next() );
+ }
+ return null;
+ }
}
-
-
/**
- * Merge this interator into our final column results
- * @param results
- * @param iterator
+ * Class used to synchronize our treeSet access
*/
- private void merge(final TreeSet<ScanColumn> results, final ResultIterator iterator){
+ private final class ConcurrentResultMerge {
+ private final TreeSet<ScanColumn> results;
+ private final int maxSize;
- //nothing to do, return
- if( !iterator.hasNext()){
- return;
- }
- final Iterator<ScanColumn> nextPage = iterator.next().iterator();
+ private ConcurrentResultMerge( final int maxSize ) {
+ this.maxSize = maxSize;
+ results = new TreeSet<ScanColumn>();
+ }
- //only take from the iterator what we need to create a full page.
- for(int i = 0 ; i < pageSize && nextPage.hasNext(); i ++){
- final ScanColumn next = nextPage.next();
- results.add(next);
+ /**
+ * Merge this set into the existing set
+ */
+ public synchronized void merge( final Set<ScanColumn> columns ) {
+ for ( ScanColumn scanColumn : columns ) {
+ results.add( scanColumn );
+ //results are too large remove the last
+ while ( results.size() > maxSize ) {
+ results.pollLast();
+ }
+ }
}
- }
-
+ /**
+ * Get the set
+ */
+ public Set<ScanColumn> copyAndClear() {
+ //create an immutable copy
+ final Set<ScanColumn> toReturn = new LinkedHashSet<ScanColumn>( results );
+ results.clear();
+ return toReturn;
+ }
+ }
}