You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/30 19:19:54 UTC

[16/25] incubator-usergrid git commit: First pass at concurrent merge

First pass at concurrent merge


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b14a9ae1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b14a9ae1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b14a9ae1

Branch: refs/heads/master
Commit: b14a9ae144e1a2836a4f3c96479851469879ded7
Parents: 4444803
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Jul 7 12:05:22 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Jul 7 12:05:22 2015 -0600

----------------------------------------------------------------------
 .../persistence/cassandra/QueryProcessor.java   |  30 +++-
 .../query/ir/result/GatherIterator.java         | 162 ++++++++++---------
 2 files changed, 119 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b14a9ae1/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
index 32fc07c..a2e0f60 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
@@ -22,6 +22,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Stack;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,6 +94,11 @@ public class QueryProcessor {
     private final EntityManager em;
     private final ResultsLoaderFactory loaderFactory;
 
+    //we set up a thread pool with 100 execution threads.  We purposefully use a synchronous queue and a caller runs so that we simply reject and immediately execute tasks if all 100 threads are occupied.
+
+    //we make this static, we only want 1 instance of this service in the JVM.  Otherwise tests fail
+    private static final ExecutorService executorService = new ThreadPoolExecutor( 100, 100, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>( ), new CallerRunsExecutionHandler() );
+
     private Operand rootOperand;
     private List<SortPredicate> sorts;
     private CursorCache cursorCache;
@@ -270,7 +281,7 @@ public class QueryProcessor {
         //use the gather iterator to collect all the          '
         final int resultSetSize = Math.min( size, Query.MAX_LIMIT );
 
-        ResultIterator itr = new GatherIterator(resultSetSize, rootNode, searchVisitorFactory.createVisitors()  );
+        ResultIterator itr = new GatherIterator(resultSetSize, rootNode, searchVisitorFactory.createVisitors(), executorService  );
 
         List<ScanColumn> entityIds = new ArrayList<ScanColumn>( );
 
@@ -683,4 +694,21 @@ public class QueryProcessor {
     public EntityManager getEntityManager() {
         return em;
     }
+
+
+    private static class CallerRunsExecutionHandler implements RejectedExecutionHandler{
+
+        private static final Logger logger = LoggerFactory.getLogger( CallerRunsExecutionHandler.class );
+
+        @Override
+        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+            //when a task is rejected due to back pressure, we just want to run it in the caller.
+
+            logger.warn( "Concurrent shard execution rejected the task in executor {}, running it in the caller thread", executor );
+
+            r.run();
+        }
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b14a9ae1/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
index e199d33..f4ffcd2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
@@ -19,19 +19,16 @@ package org.apache.usergrid.persistence.query.ir.result;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
-import org.apache.usergrid.persistence.ResultsIterator;
-import org.apache.usergrid.persistence.cassandra.CursorCache;
 import org.apache.usergrid.persistence.query.ir.QueryNode;
 import org.apache.usergrid.persistence.query.ir.SearchVisitor;
 
@@ -42,18 +39,27 @@ import org.apache.usergrid.persistence.query.ir.SearchVisitor;
 public class GatherIterator implements ResultIterator {
 
 
-    private final QueryNode rootNode;
-    private final int pageSize;
+    private List<Future<Void>> iterators;
+    private final ConcurrentResultMerge merge;
+    private boolean merged;
 
 
-    private Iterator<ScanColumn> mergedIterators;
-    private List<ResultIterator> iterators;
+    public GatherIterator( final int pageSize, final QueryNode rootNode, final Collection<SearchVisitor> searchVisitors,
+                           final ExecutorService executorService ) {
+        this.merge = new ConcurrentResultMerge( pageSize );
 
 
-    public GatherIterator(final int pageSize, final QueryNode rootNode, final Collection<SearchVisitor> searchVisitors) {
-        this.pageSize = pageSize;
-        this.rootNode = rootNode;
-        createIterators( searchVisitors );
+        this.iterators = new ArrayList<Future<Void>>( searchVisitors.size() );
+        this.merged = false;
+
+
+        /**
+         * Start our search processing
+         */
+        for ( SearchVisitor visitor : searchVisitors ) {
+            final Future<Void> result = executorService.submit( new VisitorExecutor( rootNode, merge, visitor ) );
+            iterators.add( result );
+        }
     }
 
 
@@ -63,7 +69,6 @@ public class GatherIterator implements ResultIterator {
     }
 
 
-
     @Override
     public Iterator<Set<ScanColumn>> iterator() {
         return this;
@@ -72,106 +77,119 @@ public class GatherIterator implements ResultIterator {
 
     @Override
     public boolean hasNext() {
-        if(mergedIterators == null || !mergedIterators.hasNext()){
-            mergeIterators();
-        }
+        waitForCompletion();
 
-        return mergedIterators.hasNext();
+        return merge.results.size() > 0;
     }
 
 
-    @Override
-    public Set<ScanColumn> next() {
-        if(!hasNext()){
-            throw new NoSuchElementException( "No more elements" );
+    private void waitForCompletion() {
+        if ( merged ) {
+            return;
         }
 
-        return getNextPage();
-    }
-
-    private void createIterators(final Collection<SearchVisitor> searchVisitors ){
-
-        this.iterators = new ArrayList<ResultIterator>( searchVisitors.size() );
-
-        for(SearchVisitor visitor: searchVisitors){
-
+        for ( final Future<Void> future : iterators ) {
             try {
-                rootNode.visit( visitor );
+                future.get();
             }
             catch ( Exception e ) {
-                throw new RuntimeException( "Unable to process query", e );
+                throw new RuntimeException( "Unable to aggregate results" );
             }
+        }
 
-            final ResultIterator iterator = visitor.getResults();
+        merged = true;
+    }
 
-            iterators.add( iterator );
 
+    @Override
+    public Set<ScanColumn> next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No more elements" );
         }
 
+        return merge.copyAndClear();
     }
 
 
     /**
-     * Get the next page of results
-     * @return
+     * A visitor that will visit and get the first page of an set and return them.
      */
-    private Set<ScanColumn> getNextPage(){
+    private final class VisitorExecutor implements Callable<Void> {
+
+        private final QueryNode rootNode;
+        private final SearchVisitor visitor;
+        private final ConcurrentResultMerge merge;
 
-        //try to take from our PageSize
-        LinkedHashSet<ScanColumn> resultSet = new LinkedHashSet<ScanColumn>( pageSize );
 
-        for(int i = 0; i < pageSize && mergedIterators.hasNext(); i ++){
-            resultSet.add( mergedIterators.next() );
+        private VisitorExecutor( final QueryNode rootNode, final ConcurrentResultMerge merge,
+                                 final SearchVisitor visitor ) {
+            this.rootNode = rootNode;
+            this.visitor = visitor;
+            this.merge = merge;
         }
 
 
-        return resultSet;
-    }
+        @Override
+        public Void call() throws Exception {
 
-    /**
-     * Advance the iterator
-     */
-    private void mergeIterators(){
-        //TODO make this concurrent
 
+            try {
+                rootNode.visit( visitor );
+            }
+            catch ( Exception e ) {
+                throw new RuntimeException( "Unable to process query", e );
+            }
 
-        TreeSet<ScanColumn> merged = new TreeSet<ScanColumn>(  );
+            final ResultIterator iterator = visitor.getResults();
 
-        for(ResultIterator iterator: this.iterators){
-              merge(merged, iterator);
-        }
 
-        mergedIterators = merged.iterator();
+            if ( iterator.hasNext() ) {
+                merge.merge( iterator.next() );
+            }
 
+            return null;
+        }
     }
 
 
-
-
     /**
-     * Merge this interator into our final column results
-     * @param results
-     * @param iterator
+     * Class used to synchronize our treeSet access
      */
-    private void merge(final TreeSet<ScanColumn> results, final ResultIterator iterator){
+    private final class ConcurrentResultMerge {
 
+        private final TreeSet<ScanColumn> results;
+        private final int maxSize;
 
-        //nothing to do, return
-        if( !iterator.hasNext()){
-            return;
-        }
 
-        final Iterator<ScanColumn> nextPage = iterator.next().iterator();
+        private ConcurrentResultMerge( final int maxSize ) {
+            this.maxSize = maxSize;
+            results = new TreeSet<ScanColumn>();
+        }
 
-        //only take from the iterator what we need to create a full page.
-        for(int i = 0 ; i < pageSize && nextPage.hasNext(); i ++){
-            final ScanColumn next = nextPage.next();
 
-            results.add(next);
+        /**
+         * Merge this set into the existing set
+         */
+        public synchronized void merge( final Set<ScanColumn> columns ) {
+            for ( ScanColumn scanColumn : columns ) {
+                results.add( scanColumn );
 
+                //results are too large remove the last
+                while ( results.size() > maxSize ) {
+                    results.pollLast();
+                }
+            }
         }
 
-    }
-
 
+        /**
+         * Get the set
+         */
+        public Set<ScanColumn> copyAndClear() {
+            //create an immutable copy
+            final Set<ScanColumn> toReturn = new LinkedHashSet<ScanColumn>( results );
+            results.clear();
+            return toReturn;
+        }
+    }
 }