You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2011/11/03 20:40:52 UTC
svn commit: r1197286 -
/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
Author: kturner
Date: Thu Nov 3 19:40:52 2011
New Revision: 1197286
URL: http://svn.apache.org/viewvc?rev=1197286&view=rev
Log:
ACCUMULO-110 Made scanner use thread pool for read ahead
Modified:
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java?rev=1197286&r1=1197285&r2=1197286&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java Thu Nov 3 19:40:52 2011
@@ -19,9 +19,14 @@ package org.apache.accumulo.core.client.
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.NoSuchElementException;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -60,11 +65,25 @@ public class ScannerIterator implements
private boolean finished = false;
- private Thread readThread;
+ private boolean readaheadInProgress;
private long batchCount = 0;
private static final List<KeyValue> EMPTY_LIST = Collections.emptyList();
+ private static AtomicInteger threadCounter = new AtomicInteger(1);
+
+ private static ThreadPoolExecutor readaheadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3l, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new ThreadFactory() {
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setDaemon(true);
+ t.setName("Accumulo scanner read ahead thread " + threadCounter.getAndIncrement());
+ return t;
+ }
+ });
+
private class Reader implements Runnable {
@Override
@@ -131,15 +150,13 @@ public class ScannerIterator implements
scanState = new ScanState(credentials, tableName, authorizations, new Range(range), options.fetchedColumns, size, options.serverSideIteratorList,
options.serverSideIteratorOptions, isolated);
- readThread = null;
+ readaheadInProgress = false;
iter = null;
}
private void initiateReadAhead() {
- readThread = new Thread(new Reader());
- readThread.setName("Scanner Read Ahead " + scanState.tableName);
- readThread.setDaemon(true);
- readThread.start();
+ readaheadInProgress = true;
+ readaheadPool.execute(new Reader());
}
@SuppressWarnings("unchecked")
@@ -154,7 +171,7 @@ public class ScannerIterator implements
// this is done in order to find see if there is another batch to get
try {
- if (readThread == null) {
+ if (!readaheadInProgress) {
// no read ahead run, fetch the next batch right now
new Reader().run();
}