You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/30 20:44:44 UTC

[21/35] incubator-usergrid git commit: Refactor of gather iterator to clean up coordination

Refactor of gather iterator to clean up coordination


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d872e030
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d872e030
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d872e030

Branch: refs/heads/ug2-doc-update
Commit: d872e0305e85739e1f531793560c298ed6739919
Parents: bbd5637
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Jul 8 15:19:13 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Jul 8 15:19:13 2015 -0600

----------------------------------------------------------------------
 .../query/ir/result/GatherIterator.java         | 125 +++++++++++++------
 1 file changed, 86 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d872e030/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
index 36104f9..c05e3cc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
-import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.TreeSet;
@@ -39,27 +38,15 @@ import org.apache.usergrid.persistence.query.ir.SearchVisitor;
 public class GatherIterator implements ResultIterator {
 
 
-    private List<Future<Void>> iterators;
-    private final ConcurrentResultMerge merge;
-    private boolean merged;
+    private final WorkerCoordinator workerCoordinator;
 
 
     public GatherIterator( final int pageSize, final QueryNode rootNode, final Collection<SearchVisitor> searchVisitors,
                            final ExecutorService executorService ) {
-        this.merge = new ConcurrentResultMerge( pageSize );
 
+        this.workerCoordinator = new WorkerCoordinator( executorService, searchVisitors, rootNode, pageSize );
 
-        this.iterators = new ArrayList<Future<Void>>( searchVisitors.size() );
-        this.merged = false;
-
-
-        /**
-         * Start our search processing
-         */
-        for ( SearchVisitor visitor : searchVisitors ) {
-            final Future<Void> result = executorService.submit( new VisitorExecutor( rootNode, merge, visitor ) );
-            iterators.add( result );
-        }
+        this.workerCoordinator.start();
     }
 
 
@@ -77,37 +64,86 @@ public class GatherIterator implements ResultIterator {
 
     @Override
     public boolean hasNext() {
-        waitForCompletion();
+        return workerCoordinator.getResults().hasResults();
+    }
+
+
+    @Override
+    public Set<ScanColumn> next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No more elements" );
+        }
 
-        return merge.results.size() > 0;
+        return workerCoordinator.getResults().copyAndClear();
     }
 
 
-    private void waitForCompletion() {
-        if ( merged ) {
-            return;
+    /**
+     * Coordinator object for all workers
+     */
+    private final class WorkerCoordinator {
+
+        private final ExecutorService executorService;
+        private final Collection<SearchVisitor> searchVisitors;
+        private final int pageSize;
+        private final QueryNode rootNode;
+        private ConcurrentResultMerge merge;
+        private ArrayList<Future<Void>> workers;
+
+
+        private WorkerCoordinator( final ExecutorService executorService,
+                                   final Collection<SearchVisitor> searchVisitors, final QueryNode rootNode,
+                                   final int pageSize ) {
+            this.executorService = executorService;
+            this.searchVisitors = searchVisitors;
+            this.rootNode = rootNode;
+            this.pageSize = pageSize;
         }
 
-        for ( final Future<Void> future : iterators ) {
-            try {
-                future.get();
-            }
-            catch ( Exception e ) {
-                throw new RuntimeException( "Unable to aggregate results", e );
+
+        public void start() {
+            this.merge = new ConcurrentResultMerge( pageSize );
+
+
+            this.workers = new ArrayList<Future<Void>>( searchVisitors.size() );
+
+
+            /**
+             * Start our search processing
+             */
+            for ( SearchVisitor visitor : searchVisitors ) {
+                final VisitorExecutor executor = new VisitorExecutor( rootNode, merge, visitor );
+
+//                try {
+//                    executor.call();
+//                }
+//                catch ( Exception e ) {
+//                    throw new RuntimeException( e );
+//                }
+                final Future<Void> result = executorService.submit( executor);
+                workers.add( result );
             }
         }
 
-        merged = true;
-    }
 
+        /**
+         * Get the results of the merge, after all workers have finished
+         * @return
+         */
+        public ConcurrentResultMerge getResults() {
 
-    @Override
-    public Set<ScanColumn> next() {
-        if ( !hasNext() ) {
-            throw new NoSuchElementException( "No more elements" );
-        }
+            //make sure all our workers are done
+            for ( final Future<Void> future : workers ) {
+                try {
+                    future.get();
+                }
+                catch ( Exception e ) {
+                    throw new RuntimeException( "Unable to aggregate results", e );
+                }
+            }
 
-        return merge.copyAndClear();
+            return merge;
+        }
     }
 
 
@@ -157,7 +193,7 @@ public class GatherIterator implements ResultIterator {
      */
     private final class ConcurrentResultMerge {
 
-        private final TreeSet<ScanColumn> results;
+        private TreeSet<ScanColumn> results;
         private final int maxSize;
 
 
@@ -181,14 +217,25 @@ public class GatherIterator implements ResultIterator {
             }
         }
 
+
+        /**
+         * Return true if the merge has results
+         */
+        public boolean hasResults() {
+            return results != null && results.size() > 0;
+        }
+
+
         /**
          * Get the set
          */
-        public Set<ScanColumn> copyAndClear() {
+        public synchronized Set<ScanColumn> copyAndClear() {
             //create an immutable copy
-            final Set<ScanColumn> toReturn = new LinkedHashSet<ScanColumn>( results );
-            results.clear();
+            final Set<ScanColumn> toReturn = results ;
+            results = new TreeSet<ScanColumn>();
             return toReturn;
         }
+
+
     }
 }