You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2011/11/03 20:40:52 UTC

svn commit: r1197286 - /incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java

Author: kturner
Date: Thu Nov  3 19:40:52 2011
New Revision: 1197286

URL: http://svn.apache.org/viewvc?rev=1197286&view=rev
Log:
ACCUMULO-110 Made scanner use thread pool for read ahead

Modified:
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java?rev=1197286&r1=1197285&r2=1197286&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java Thu Nov  3 19:40:52 2011
@@ -19,9 +19,14 @@ package org.apache.accumulo.core.client.
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -60,11 +65,25 @@ public class ScannerIterator implements 
   
   private boolean finished = false;
   
-  private Thread readThread;
+  private boolean readaheadInProgress;
   private long batchCount = 0;
   
   private static final List<KeyValue> EMPTY_LIST = Collections.emptyList();
   
+  private static AtomicInteger threadCounter = new AtomicInteger(1);
+
+  private static ThreadPoolExecutor readaheadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3l, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+      new ThreadFactory() {
+        
+        @Override
+        public Thread newThread(Runnable r) {
+          Thread t = new Thread(r);
+          t.setDaemon(true);
+          t.setName("Accumulo scanner read ahead thread " + threadCounter.getAndIncrement());
+          return t;
+        }
+      });
+
   private class Reader implements Runnable {
     
     @Override
@@ -131,15 +150,13 @@ public class ScannerIterator implements 
     
     scanState = new ScanState(credentials, tableName, authorizations, new Range(range), options.fetchedColumns, size, options.serverSideIteratorList,
         options.serverSideIteratorOptions, isolated);
-    readThread = null;
+    readaheadInProgress = false;
     iter = null;
   }
   
   private void initiateReadAhead() {
-    readThread = new Thread(new Reader());
-    readThread.setName("Scanner Read Ahead " + scanState.tableName);
-    readThread.setDaemon(true);
-    readThread.start();
+    readaheadInProgress = true;
+    readaheadPool.execute(new Reader());
   }
   
   @SuppressWarnings("unchecked")
@@ -154,7 +171,7 @@ public class ScannerIterator implements 
     // this is done in order to find see if there is another batch to get
     
     try {
-      if (readThread == null) {
+      if (!readaheadInProgress) {
         // no read ahead run, fetch the next batch right now
         new Reader().run();
       }