You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/10/08 19:24:06 UTC

[1/3] git commit: ACCUMULO-1566 Lift out the implicit "3 batch" convention from ScannerIterator into Scanner so it can be configured by the user.

Updated Branches:
  refs/heads/master 24b44f947 -> dab1be962


ACCUMULO-1566 Lift out the implicit "3 batch" convention from ScannerIterator
into Scanner so it can be configured by the user.

The ScannerIterator previously started pre-fetching the next batch after the
previous was returned, only after three batches are returned. Clients have the
ability to know how to control this better, and, as such, we should let them.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0d85d60c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0d85d60c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0d85d60c

Branch: refs/heads/master
Commit: 0d85d60c08f88bc6d3e366b192fba5a371654363
Parents: 24b44f9
Author: Josh Elser <el...@apache.org>
Authored: Mon Oct 7 23:28:36 2013 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Oct 7 23:28:36 2013 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/Constants.java     |  4 ++++
 .../core/client/ClientSideIteratorScanner.java  | 17 ++++++++++++++
 .../accumulo/core/client/IsolatedScanner.java   | 24 ++++++++++++++++++--
 .../apache/accumulo/core/client/Scanner.java    | 14 ++++++++++++
 .../core/client/impl/OfflineScanner.java        | 10 ++++++++
 .../accumulo/core/client/impl/ScannerImpl.java  | 17 +++++++++++++-
 .../core/client/impl/ScannerIterator.java       | 18 ++++++++++++---
 .../accumulo/core/client/mock/MockScanner.java  | 10 ++++++++
 .../monitor/servlets/trace/NullScanner.java     | 10 ++++++++
 .../server/util/OfflineMetadataScanner.java     | 10 ++++++++
 10 files changed, 128 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/core/src/main/java/org/apache/accumulo/core/Constants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java
index ebcc47b..3b4c1e6 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -81,6 +81,10 @@ public class Constants {
   // this affects the table client caching of metadata
   public static final int SCAN_BATCH_SIZE = 1000;
   
+  // Scanners will default to fetching 3 batches of Key/Value pairs before asynchronously
+  // fetching the next batch.
+  public static final long SCANNER_DEFAULT_READAHEAD_THRESHOLD = 3l;
+  
   // Security configuration
   public static final String PW_HASH_ALGORITHM = "SHA-256";
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
index 3085f56..168e56f 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
@@ -26,6 +26,7 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.impl.ScannerOptions;
 import org.apache.accumulo.core.client.mock.IteratorAdapter;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -57,6 +58,7 @@ public class ClientSideIteratorScanner extends ScannerOptions implements Scanner
   
   private Range range;
   private boolean isolated = false;
+  private long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
   
   /**
    * A class that wraps a Scanner in a SortedKeyValueIterator so that other accumulo iterators can use it as a source.
@@ -137,6 +139,7 @@ public class ClientSideIteratorScanner extends ScannerOptions implements Scanner
     this.range = scanner.getRange();
     this.size = scanner.getBatchSize();
     this.timeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
+    this.readaheadThreshold = scanner.getReadaheadThreshold();
   }
   
   /**
@@ -152,6 +155,7 @@ public class ClientSideIteratorScanner extends ScannerOptions implements Scanner
   public Iterator<Entry<Key,Value>> iterator() {
     smi.scanner.setBatchSize(size);
     smi.scanner.setTimeout(timeOut, TimeUnit.MILLISECONDS);
+    smi.scanner.setReadaheadThreshold(readaheadThreshold);
     if (isolated)
       smi.scanner.enableIsolation();
     else
@@ -254,4 +258,17 @@ public class ClientSideIteratorScanner extends ScannerOptions implements Scanner
   public void disableIsolation() {
     this.isolated = false;
   }
+
+  @Override
+  public long getReadaheadThreshold() {
+    return readaheadThreshold;
+  }
+
+  @Override
+  public void setReadaheadThreshold(long batches) {
+    if (0 > batches) {
+      throw new IllegalArgumentException("Number of batches before read-ahead must be non-negative");
+    }
+    this.readaheadThreshold = batches;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java b/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
index 4b362dc..4acea76 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.impl.IsolationException;
 import org.apache.accumulo.core.client.impl.ScannerOptions;
 import org.apache.accumulo.core.data.ByteSequence;
@@ -52,6 +53,7 @@ public class IsolatedScanner extends ScannerOptions implements Scanner {
     private ScannerOptions opts;
     private Range range;
     private int batchSize;
+    private long readaheadThreshold;
     
     private void readRow() {
       
@@ -123,6 +125,7 @@ public class IsolatedScanner extends ScannerOptions implements Scanner {
         scanner.setBatchSize(batchSize);
         scanner.setTimeout(timeout, TimeUnit.MILLISECONDS);
         scanner.setRange(r);
+        scanner.setReadaheadThreshold(readaheadThreshold);
         setOptions((ScannerOptions) scanner, opts);
         
         return scanner.iterator();
@@ -130,12 +133,13 @@ public class IsolatedScanner extends ScannerOptions implements Scanner {
       }
     }
     
-    public RowBufferingIterator(Scanner scanner, ScannerOptions opts, Range range, long timeout, int batchSize, RowBufferFactory bufferFactory) {
+    public RowBufferingIterator(Scanner scanner, ScannerOptions opts, Range range, long timeout, int batchSize, long readaheadThreshold, RowBufferFactory bufferFactory) {
       this.scanner = scanner;
       this.opts = new ScannerOptions(opts);
       this.range = range;
       this.timeout = timeout;
       this.batchSize = batchSize;
+      this.readaheadThreshold = readaheadThreshold;
       
       buffer = bufferFactory.newBuffer();
       
@@ -211,6 +215,7 @@ public class IsolatedScanner extends ScannerOptions implements Scanner {
   private Scanner scanner;
   private Range range;
   private int batchSize;
+  private long readaheadThreshold;
   private RowBufferFactory bufferFactory;
   
   public IsolatedScanner(Scanner scanner) {
@@ -222,12 +227,13 @@ public class IsolatedScanner extends ScannerOptions implements Scanner {
     this.range = scanner.getRange();
     this.timeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
     this.batchSize = scanner.getBatchSize();
+    this.readaheadThreshold = scanner.getReadaheadThreshold();
     this.bufferFactory = bufferFactory;
   }
   
   @Override
   public Iterator<Entry<Key,Value>> iterator() {
-    return new RowBufferingIterator(scanner, this, range, timeOut, batchSize, bufferFactory);
+    return new RowBufferingIterator(scanner, this, range, timeOut, batchSize, readaheadThreshold, bufferFactory);
   }
   
   @Deprecated
@@ -278,4 +284,18 @@ public class IsolatedScanner extends ScannerOptions implements Scanner {
   public void disableIsolation() {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public long getReadaheadThreshold() {
+    return readaheadThreshold;
+  }
+
+  @Override
+  public void setReadaheadThreshold(long batches) {
+    if (0 > batches) {
+      throw new IllegalArgumentException("Number of batches before read-ahead must be non-negative");
+    }
+    
+    this.readaheadThreshold = batches;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/Scanner.java b/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
index fe9edda..245aa18 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
@@ -85,4 +85,18 @@ public interface Scanner extends ScannerBase {
    * Disables row isolation. Writes that occur to a row after a scan of that row has begun may be seen if this option is enabled.
    */
   void disableIsolation();
+  
+  /**
+   * The number of batches of Key/Value pairs returned before the {@link Scanner} will begin to prefetch the next batch
+   * @return Number of batches before read-ahead begins
+   * @since 1.6.0
+   */
+  public long getReadaheadThreshold();
+  
+  /**
+   * Sets the number of batches of Key/Value pairs returned before the {@link Scanner} will begin to prefetch the next batch
+   * @param batches Non-negative number of batches
+   * @since 1.6.0
+   */
+  public void setReadaheadThreshold(long batches); 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
index 385a1cc..9f6f3cd 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
@@ -413,5 +413,15 @@ public class OfflineScanner extends ScannerOptions implements Scanner {
   public Iterator<Entry<Key,Value>> iterator() {
     return new OfflineIterator(this, instance, credentials, authorizations, tableId, range);
   }
+
+  @Override
+  public long getReadaheadThreshold() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setReadaheadThreshold(long batches) {
+    throw new UnsupportedOperationException();
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
index 5f845cc..6be55b6 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
@@ -55,6 +55,7 @@ public class ScannerImpl extends ScannerOptions implements Scanner {
   
   private Range range;
   private boolean isolated = false;
+  private long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
   
   public ScannerImpl(Instance instance, Credentials credentials, String table, Authorizations authorizations) {
     ArgumentChecker.notNull(instance, credentials, table, authorizations);
@@ -97,7 +98,7 @@ public class ScannerImpl extends ScannerOptions implements Scanner {
    */
   @Override
   public synchronized Iterator<Entry<Key,Value>> iterator() {
-    return new ScannerIterator(instance, credentials, table, authorizations, range, size, getTimeOut(), this, isolated);
+    return new ScannerIterator(instance, credentials, table, authorizations, range, size, getTimeOut(), this, isolated, readaheadThreshold);
   }
   
   @Override
@@ -127,4 +128,18 @@ public class ScannerImpl extends ScannerOptions implements Scanner {
       return Integer.MAX_VALUE;
     return (int) timeout;
   }
+  
+  @Override
+  public synchronized void setReadaheadThreshold(long batches) {
+    if (0 > batches) {
+      throw new IllegalArgumentException("Number of batches before read-ahead must be non-negative");
+    }
+    
+    readaheadThreshold = batches;
+  }
+  
+  @Override
+  public synchronized long getReadaheadThreshold() {
+    return readaheadThreshold;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
index f81759d..e9d1412 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
@@ -26,6 +26,7 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Instance;
@@ -64,8 +65,9 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>> {
   
   private boolean finished = false;
   
-  private boolean readaheadInProgress;
+  private boolean readaheadInProgress = false;
   private long batchCount = 0;
+  private long readaheadThreshold;
   
   private static final List<KeyValue> EMPTY_LIST = Collections.emptyList();
   
@@ -123,10 +125,16 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>> {
   
   ScannerIterator(Instance instance, Credentials credentials, Text table, Authorizations authorizations, Range range, int size, int timeOut,
       ScannerOptions options, boolean isolated) {
+    this(instance, credentials, table, authorizations, range, size, timeOut, options, isolated, Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD);
+  }
+  
+  ScannerIterator(Instance instance, Credentials credentials, Text table, Authorizations authorizations, Range range, int size, int timeOut,
+      ScannerOptions options, boolean isolated, long readaheadThreshold) {
     this.instance = instance;
     this.tableId = new Text(table);
     this.timeOut = timeOut;
     this.credentials = credentials;
+    this.readaheadThreshold = readaheadThreshold;
     
     this.options = new ScannerOptions(options);
     
@@ -138,7 +146,11 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>> {
     
     scanState = new ScanState(instance, credentials, tableId, authorizations, new Range(range), options.fetchedColumns, size, options.serverSideIteratorList,
         options.serverSideIteratorOptions, isolated);
-    readaheadInProgress = false;
+    
+    // If we want to start readahead immediately, don't wait for hasNext to be called
+    if (0l == readaheadThreshold) {
+      initiateReadAhead();
+    }
     iter = null;
   }
   
@@ -185,7 +197,7 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>> {
       iter = currentBatch.iterator();
       batchCount++;
       
-      if (batchCount > 3) {
+      if (batchCount > readaheadThreshold) {
         // start a thread to read the next batch
         initiateReadAhead();
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
index 002dbfc..e7c0ee0 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
@@ -109,5 +109,15 @@ public class MockScanner extends MockScannerBase implements Scanner {
     }
     
   }
+
+  @Override
+  public long getReadaheadThreshold() {
+    return 0;
+  }
+
+  @Override
+  public void setReadaheadThreshold(long batches) {
+    
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/NullScanner.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/NullScanner.java b/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/NullScanner.java
index 11d20c0..6ee16b1 100644
--- a/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/NullScanner.java
+++ b/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/NullScanner.java
@@ -103,4 +103,14 @@ public class NullScanner implements Scanner {
   
   @Override
   public void close() {}
+
+  @Override
+  public long getReadaheadThreshold() {
+    return 0l;
+  }
+
+  @Override
+  public void setReadaheadThreshold(long batches) {
+    
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java b/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
index 5e82ada..3fee062 100644
--- a/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
@@ -268,5 +268,15 @@ public class OfflineMetadataScanner extends ScannerOptions implements Scanner {
     for (Entry<Key,Value> entry : scanner)
       System.out.println(entry.getKey() + " " + entry.getValue());
   }
+
+  @Override
+  public long getReadaheadThreshold() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setReadaheadThreshold(long batches) {
+    throw new UnsupportedOperationException();
+  }
   
 }


[3/3] git commit: ACCUMULO-1566 Add in an integration-test which checks that the readahead configuration works as intended.

Posted by el...@apache.org.
ACCUMULO-1566 Add in an integration-test which checks that the readahead
configuration works as intended.

By applying the SlowIterator to sleep on next(), and then sleep for the same
amount of time in the main loop over the iterator from the Scanner, we should
only have to wait once when we're in "readahead mode", but wait twice when we're
not.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/dab1be96
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/dab1be96
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/dab1be96

Branch: refs/heads/master
Commit: dab1be962b6ab1ab095c4ccf7f3995ab1208c3d7
Parents: ff58f6b
Author: Josh Elser <el...@apache.org>
Authored: Tue Oct 8 00:08:07 2013 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Oct 8 00:55:46 2013 -0400

----------------------------------------------------------------------
 .../accumulo/test/functional/ScannerIT.java     | 112 +++++++++++++++++++
 1 file changed, 112 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/dab1be96/test/src/test/java/org/apache/accumulo/test/functional/ScannerIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ScannerIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ScannerIT.java
new file mode 100644
index 0000000..7913089
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ScannerIT.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.base.Stopwatch;
+
+/**
+ * 
+ */
+public class ScannerIT extends SimpleMacIT {
+
+  @Test(timeout = 60000)
+  public void testScannerReadaheadConfiguration() throws Exception {
+    final String table = "table";
+    Connector c = getConnector();
+    c.tableOperations().create(table);
+    
+    BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
+    
+    Mutation m = new Mutation("a");
+    for (int i = 0; i < 10; i++) {
+      m.put(Integer.toString(i), "", "");
+    }
+    
+    bw.addMutation(m);
+    bw.close();
+    
+    Scanner s = c.createScanner(table, new Authorizations());
+    
+    IteratorSetting cfg = new IteratorSetting(100, SlowIterator.class);
+    SlowIterator.setSleepTime(cfg, 100l);
+    s.addScanIterator(cfg);
+    s.setReadaheadThreshold(5);
+    s.setBatchSize(1);
+    s.setRange(new Range());
+    
+    Stopwatch sw = new Stopwatch();
+    Iterator<Entry<Key,Value>> iterator = s.iterator();
+    
+    sw.start();
+    while (iterator.hasNext()) {
+      sw.stop();
+      
+      // While we "do work" in the client, we should be fetching the next result
+      UtilWaitThread.sleep(100l);
+      iterator.next();
+      sw.start();
+    }
+    sw.stop();
+    
+    long millisWithWait = sw.elapsed(TimeUnit.MILLISECONDS);
+    
+    s = c.createScanner(table, new Authorizations());
+    s.addScanIterator(cfg);
+    s.setRange(new Range());
+    s.setBatchSize(1);
+    s.setReadaheadThreshold(0l);
+    
+    sw = new Stopwatch();
+    iterator = s.iterator();
+    
+    sw.start();
+    while (iterator.hasNext()) {
+      sw.stop();
+      
+      // While we "do work" in the client, we should be fetching the next result
+      UtilWaitThread.sleep(100l);
+      iterator.next();
+      sw.start();
+    }
+    sw.stop();
+
+    long millisWithNoWait = sw.elapsed(TimeUnit.MILLISECONDS);
+    
+    // The "no-wait" time should be much less than the "wait-time"
+    Assert.assertTrue("Expected less time to be taken with immediate readahead (" + millisWithNoWait 
+        + ") than without immediate readahead (" + millisWithWait + ")", millisWithNoWait < millisWithWait);
+  }
+
+}


[2/3] git commit: ACCUMULO-1566 Simple unit test to ensure that values work as expected.

Posted by el...@apache.org.
ACCUMULO-1566 Simple unit test to ensure that values work as expected.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ff58f6b1
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ff58f6b1
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ff58f6b1

Branch: refs/heads/master
Commit: ff58f6b15c36f5f33d0a296c9806b24ad8a94ab3
Parents: 0d85d60
Author: Josh Elser <el...@apache.org>
Authored: Mon Oct 7 23:30:18 2013 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Oct 7 23:30:18 2013 -0400

----------------------------------------------------------------------
 .../core/client/impl/ScannerImplTest.java       | 50 ++++++++++++++++++++
 1 file changed, 50 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff58f6b1/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java
new file mode 100644
index 0000000..311bbf8
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.Credentials;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class ScannerImplTest {
+
+  @Test
+  public void testValidReadaheadValues() {
+    MockInstance instance = new MockInstance();
+    Scanner s = new ScannerImpl(instance, new Credentials("root", new PasswordToken("")), "foo", new Authorizations());
+    s.setReadaheadThreshold(0);
+    s.setReadaheadThreshold(10);
+    s.setReadaheadThreshold(Long.MAX_VALUE);
+    
+    Assert.assertEquals(Long.MAX_VALUE, s.getReadaheadThreshold());
+  }
+  
+  @Test(expected = IllegalArgumentException.class)
+  public void testInValidReadaheadValues() {
+    MockInstance instance = new MockInstance();
+    Scanner s = new ScannerImpl(instance, new Credentials("root", new PasswordToken("")), "foo", new Authorizations());
+    s.setReadaheadThreshold(-1);
+  }
+
+}