You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2020/08/20 22:39:39 UTC

[accumulo] branch main updated (0c5bc31 -> 47d7b70)

This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git.


    from 0c5bc31  Adds test for multiple start events in WAL #1675 (#1683)
     add dab5b80  Fix issues in prep for 1.10 release (#1688)
     new 47d7b70  Merge branch '1.10' into main

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml                                            | 19 ++----------
 .../replication/CloseWriteAheadLogReferences.java  | 27 ++++++++--------
 .../RemoveCompleteReplicationRecords.java          | 12 ++++----
 .../apache/accumulo/test/functional/ScannerIT.java | 36 ++++++++--------------
 4 files changed, 34 insertions(+), 60 deletions(-)


[accumulo] 01/01: Merge branch '1.10' into main

Posted by ct...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 47d7b708cf14b9d935a889c1082e211075338ed1
Merge: 0c5bc31 dab5b80
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Thu Aug 20 18:38:46 2020 -0400

    Merge branch '1.10' into main

 pom.xml                                            | 19 ++----------
 .../replication/CloseWriteAheadLogReferences.java  | 27 ++++++++--------
 .../RemoveCompleteReplicationRecords.java          | 12 ++++----
 .../apache/accumulo/test/functional/ScannerIT.java | 36 ++++++++--------------
 4 files changed, 34 insertions(+), 60 deletions(-)

diff --cc pom.xml
index aec76ba,5f1fae3..224aeca
--- a/pom.xml
+++ b/pom.xml
@@@ -127,24 -127,26 +127,26 @@@
      <failsafe.groups />
      <!-- surefire/failsafe plugin option -->
      <forkCount>1</forkCount>
 -    <!-- version that works when building with Hadoop 2 or 3.0; use 27.0-jre for 3.1 and later -->
 -    <guava.version>14.0.1</guava.version>
 -    <hadoop.version>2.6.5</hadoop.version>
 +    <hadoop.version>3.2.1</hadoop.version>
 +    <hk2.version>2.6.1</hk2.version>
      <htrace.hadoop.version>4.1.0-incubating</htrace.hadoop.version>
 -    <htrace.version>3.1.0-incubating</htrace.version>
 +    <htrace.version>3.2.0-incubating</htrace.version>
      <it.failIfNoSpecifiedTests>false</it.failIfNoSpecifiedTests>
 -    <!-- jetty 9.2 is the last version to support jdk less than 1.8 -->
 -    <jetty.version>9.2.26.v20180806</jetty.version>
 -    <maven.compiler.release>8</maven.compiler.release>
 -    <maven.compiler.source>1.8</maven.compiler.source>
 -    <maven.compiler.target>1.8</maven.compiler.target>
 -    <maven.plugin-version>3.5.0</maven.plugin-version>
 +    <javax.el.version>3.0.1-b06</javax.el.version>
 +    <jaxb.version>2.3.0.1</jaxb.version>
 +    <jersey.version>2.30.1</jersey.version>
 +    <jetty.version>9.4.27.v20200227</jetty.version>
 +    <maven.compiler.release>11</maven.compiler.release>
 +    <maven.compiler.source>11</maven.compiler.source>
 +    <maven.compiler.target>11</maven.compiler.target>
      <!-- surefire/failsafe plugin option -->
      <maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
 -    <powermock.version>2.0.2</powermock.version>
 +    <powermock.version>2.0.5</powermock.version>
+     <!-- timestamp for reproducible outputs, updated on release by the release plugin -->
+     <project.build.outputTimestamp>2020-08-18T00:00:00Z</project.build.outputTimestamp>
      <!-- surefire/failsafe plugin option -->
      <reuseForks>false</reuseForks>
 -    <slf4j.version>1.7.25</slf4j.version>
 +    <slf4j.version>1.7.30</slf4j.version>
      <sourceReleaseAssemblyDescriptor>source-release-tar</sourceReleaseAssemblyDescriptor>
      <surefire.excludedGroups />
      <surefire.failIfNoSpecifiedTests>false</surefire.failIfNoSpecifiedTests>
diff --cc server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 7f5a7c4,5dd25a1..479618f
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@@ -18,8 -16,10 +18,9 @@@
   */
  package org.apache.accumulo.gc.replication;
  
+ import java.time.Duration;
  import java.util.Collections;
  import java.util.HashSet;
 -import java.util.List;
  import java.util.Map.Entry;
  import java.util.Set;
  
@@@ -76,36 -83,49 +76,35 @@@ public class CloseWriteAheadLogReferenc
  
    @Override
    public void run() {
-     // As long as we depend on a newer Guava than Hadoop uses, we have to make sure we're compatible
-     // with
-     // what the version they bundle uses.
-     Stopwatch sw = Stopwatch.createUnstarted();
 -    // Guava Stopwatch is useful here, for a friendlier toString, but the version of Guava in Hadoop
 -    // 2 and 3 are different in incompatible ways, so we avoid it here and use Duration instead, so
 -    // there won't be conflicts with the older Guava that ships by default with Hadoop 2.
++    // Guava Stopwatch is useful here, for a friendlier toString, but the versions of Guava
++    // are different in incompatible ways, so we avoid it here and use Duration instead, so
++    // there won't be conflicts.
+     long startTime;
+     Duration duration;
  
 -    Connector conn;
 -    try {
 -      conn = context.getConnector();
 -    } catch (Exception e) {
 -      log.error("Could not create connector", e);
 -      throw new RuntimeException(e);
 -    }
 -
 -    if (!ReplicationTable.isOnline(conn)) {
 +    if (!ReplicationTable.isOnline(context)) {
        log.debug("Replication table isn't online, not attempting to clean up wals");
        return;
      }
  
 -    Span findWalsSpan = Trace.start("findReferencedWals");
      HashSet<String> closed = null;
 -    try {
 +    try (TraceScope findWalsSpan = Trace.startSpan("findReferencedWals")) {
-       sw.start();
+       startTime = System.nanoTime();
 -      closed = getClosedLogs(conn);
 +      closed = getClosedLogs();
-     } finally {
-       sw.stop();
+       duration = Duration.ofNanos(System.nanoTime() - startTime);
 -    } finally {
 -      findWalsSpan.stop();
      }
  
-     log.info("Found {} WALs referenced in metadata in {}", closed.size(), sw);
-     sw.reset();
 -    log.info("Found " + closed.size() + " WALs referenced in metadata in " + duration);
++    log.info("Found {} WALs referenced in metadata in {}", closed.size(), duration);
  
 -    Span updateReplicationSpan = Trace.start("updateReplicationTable");
      long recordsClosed = 0;
 -    try {
 +    try (TraceScope updateReplicationSpan = Trace.startSpan("updateReplicationTable")) {
-       sw.start();
+       startTime = System.nanoTime();
 -      recordsClosed = updateReplicationEntries(conn, closed);
 +      recordsClosed = updateReplicationEntries(context, closed);
-     } finally {
-       sw.stop();
+       duration = Duration.ofNanos(System.nanoTime() - startTime);
 -    } finally {
 -      updateReplicationSpan.stop();
      }
  
-     log.info("Closed {} WAL replication references in replication table in {}", recordsClosed, sw);
 -    log.info("Closed " + recordsClosed + " WAL replication references in replication table in "
 -        + duration);
++    log.info("Closed {} WAL replication references in replication table in {}", recordsClosed,
++        duration);
    }
  
    /**
diff --cc server/manager/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
index 4eeca2c,0000000..35418f5
mode 100644,000000..100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
@@@ -1,226 -1,0 +1,226 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.master.replication;
 +
 +import java.io.IOException;
++import java.time.Duration;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.SortedMap;
 +
 +import org.apache.accumulo.core.client.AccumuloClient;
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 +import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 +import org.apache.accumulo.core.replication.ReplicationTable;
 +import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 +import org.apache.accumulo.core.replication.ReplicationTarget;
 +import org.apache.accumulo.server.replication.StatusUtil;
 +import org.apache.accumulo.server.replication.proto.Replication.Status;
 +import org.apache.hadoop.io.Text;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- import com.google.common.base.Stopwatch;
 +import com.google.protobuf.InvalidProtocolBufferException;
 +
 +/**
 + * Delete replication entries from the replication table that are fully replicated and closed
 + */
 +public class RemoveCompleteReplicationRecords implements Runnable {
 +  private static final Logger log = LoggerFactory.getLogger(RemoveCompleteReplicationRecords.class);
 +
 +  private AccumuloClient client;
 +
 +  public RemoveCompleteReplicationRecords(AccumuloClient client) {
 +    this.client = client;
 +  }
 +
 +  @Override
 +  public void run() {
 +    BatchScanner bs;
 +    BatchWriter bw;
 +    try {
 +      bs = ReplicationTable.getBatchScanner(client, 4);
 +      bw = ReplicationTable.getBatchWriter(client);
 +
 +      if (bs == null || bw == null)
 +        throw new AssertionError("Inconceivable; an exception should have been"
 +            + " thrown, but 'bs' or 'bw' was null instead");
 +    } catch (ReplicationTableOfflineException e) {
 +      log.debug("Not attempting to remove complete replication records as the"
 +          + " table ({}) isn't yet online", ReplicationTable.NAME);
 +      return;
 +    }
 +
 +    bs.setRanges(Collections.singleton(new Range()));
 +    IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
 +    StatusSection.limit(bs);
 +    WorkSection.limit(bs);
 +    bs.addScanIterator(cfg);
 +
-     Stopwatch sw = Stopwatch.createUnstarted();
 +    long recordsRemoved = 0;
++    long startTime = System.nanoTime();
++    Duration duration;
 +    try {
-       sw.start();
 +      recordsRemoved = removeCompleteRecords(client, bs, bw);
 +    } finally {
 +      bs.close();
 +      try {
 +        bw.close();
 +      } catch (MutationsRejectedException e) {
 +        log.error("Error writing mutations to {}, will retry", ReplicationTable.NAME, e);
 +      }
-       sw.stop();
++      duration = Duration.ofNanos(System.nanoTime() - startTime);
 +    }
 +
-     log.info("Removed {} complete replication entries from the table {}", recordsRemoved,
-         ReplicationTable.NAME);
++    log.info("Removed {} complete replication entries from the table {} in {}", recordsRemoved,
++        ReplicationTable.NAME, duration);
 +  }
 +
 +  /**
 +   * Removes {@link Status} records read from the given {@code bs} and writes a delete, using the
 +   * given {@code bw}, when that {@link Status} is fully replicated and closed, as defined by
 +   * {@link StatusUtil#isSafeForRemoval(org.apache.accumulo.server.replication.proto.Replication.Status)}.
 +   *
 +   * @param client
 +   *          Accumulo client
 +   * @param bs
 +   *          A BatchScanner to read replication status records from
 +   * @param bw
 +   *          A BatchWriter to write deletes to
 +   * @return Number of records removed
 +   */
 +  protected long removeCompleteRecords(AccumuloClient client, BatchScanner bs, BatchWriter bw) {
 +    Text row = new Text(), colf = new Text(), colq = new Text();
 +    long recordsRemoved = 0;
 +
 +    // For each row in the replication table
 +    for (Entry<Key,Value> rowEntry : bs) {
 +      SortedMap<Key,Value> columns;
 +      try {
 +        columns = WholeRowIterator.decodeRow(rowEntry.getKey(), rowEntry.getValue());
 +      } catch (IOException e) {
 +        log.error("Could not deserialize {} with WholeRowIterator", rowEntry.getKey().getRow(), e);
 +        continue;
 +      }
 +
 +      rowEntry.getKey().getRow(row);
 +
 +      // Try to remove the row (all or nothing)
 +      recordsRemoved += removeRowIfNecessary(bw, columns, row, colf, colq);
 +    }
 +
 +    return recordsRemoved;
 +  }
 +
 +  protected long removeRowIfNecessary(BatchWriter bw, SortedMap<Key,Value> columns, Text row,
 +      Text colf, Text colq) {
 +    long recordsRemoved = 0;
 +    if (columns.isEmpty()) {
 +      return recordsRemoved;
 +    }
 +
 +    Mutation m = new Mutation(row);
 +    Map<TableId,Long> tableToTimeCreated = new HashMap<>();
 +    for (Entry<Key,Value> entry : columns.entrySet()) {
 +      Status status = null;
 +      try {
 +        status = Status.parseFrom(entry.getValue().get());
 +      } catch (InvalidProtocolBufferException e) {
 +        log.error("Encountered unparsable protobuf for key: {}",
 +            entry.getKey().toStringNoTruncate());
 +        continue;
 +      }
 +
 +      // If a column in the row isn't ready for removal, we keep the whole row
 +      if (!StatusUtil.isSafeForRemoval(status)) {
 +        return 0L;
 +      }
 +
 +      Key k = entry.getKey();
 +      k.getColumnFamily(colf);
 +      k.getColumnQualifier(colq);
 +
 +      log.debug("Removing {} {}:{} from replication table", row, colf, colq);
 +
 +      m.putDelete(colf, colq);
 +
 +      TableId tableId;
 +      if (StatusSection.NAME.equals(colf)) {
 +        tableId = TableId.of(colq.toString());
 +      } else if (WorkSection.NAME.equals(colf)) {
 +        ReplicationTarget target = ReplicationTarget.from(colq);
 +        tableId = target.getSourceTableId();
 +      } else {
 +        throw new RuntimeException("Got unexpected column");
 +      }
 +
 +      if (status.hasCreatedTime()) {
 +        Long timeClosed = tableToTimeCreated.get(tableId);
 +        if (timeClosed == null) {
 +          tableToTimeCreated.put(tableId, status.getCreatedTime());
 +        } else if (timeClosed != status.getCreatedTime()) {
 +          log.warn("Found multiple values for timeClosed for {}: {} and {}", row, timeClosed,
 +              status.getCreatedTime());
 +        }
 +      }
 +
 +      recordsRemoved++;
 +    }
 +
 +    List<Mutation> mutations = new ArrayList<>();
 +    mutations.add(m);
 +    for (Entry<TableId,Long> entry : tableToTimeCreated.entrySet()) {
 +      log.info("Removing order mutation for table {} at {} for {}", entry.getKey(),
 +          entry.getValue(), row);
 +      Mutation orderMutation = OrderSection.createMutation(row.toString(), entry.getValue());
 +      orderMutation.putDelete(OrderSection.NAME, new Text(entry.getKey().canonical()));
 +      mutations.add(orderMutation);
 +    }
 +
 +    // Send the mutation deleting all the columns at once.
 +    // If we send them not as a single Mutation, we run the risk of having some of them be applied
 +    // which would mean that we might accidentally re-replicate data. We want to get rid of them all
 +    // at once
 +    // or not at all.
 +    try {
 +      bw.addMutations(mutations);
 +      bw.flush();
 +    } catch (MutationsRejectedException e) {
 +      log.error("Could not submit mutation to remove columns for {} in replication table", row, e);
 +      return 0L;
 +    }
 +
 +    return recordsRemoved;
 +  }
 +}
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
index 5b3b2a0,d3ba259..85d8b2b
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
@@@ -22,11 -20,10 +22,10 @@@ import static org.junit.Assert.assertTr
  
  import java.util.Iterator;
  import java.util.Map.Entry;
- import java.util.concurrent.TimeUnit;
  
 +import org.apache.accumulo.core.client.Accumulo;
 +import org.apache.accumulo.core.client.AccumuloClient;
  import org.apache.accumulo.core.client.BatchWriter;
 -import org.apache.accumulo.core.client.BatchWriterConfig;
 -import org.apache.accumulo.core.client.Connector;
  import org.apache.accumulo.core.client.IteratorSetting;
  import org.apache.accumulo.core.client.Scanner;
  import org.apache.accumulo.core.data.Key;
@@@ -38,8 -35,9 +37,6 @@@ import org.apache.accumulo.fate.util.Ut
  import org.apache.accumulo.harness.AccumuloClusterHarness;
  import org.junit.Test;
  
- import com.google.common.base.Stopwatch;
- 
 -/**
 - *
 - */
  public class ScannerIT extends AccumuloClusterHarness {
  
    @Override
@@@ -50,78 -48,68 +47,71 @@@
    @Test
    public void testScannerReadaheadConfiguration() throws Exception {
      final String table = getUniqueNames(1)[0];
 -    Connector c = getConnector();
 -    c.tableOperations().create(table);
 -
 -    BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
 -
 -    Mutation m = new Mutation("a");
 -    for (int i = 0; i < 10; i++) {
 -      m.put(Integer.toString(i), "", "");
 +    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
 +      c.tableOperations().create(table);
 +
 +      try (BatchWriter bw = c.createBatchWriter(table)) {
 +        Mutation m = new Mutation("a");
 +        for (int i = 0; i < 10; i++) {
 +          m.put(Integer.toString(i), "", "");
 +        }
 +        bw.addMutation(m);
 +      }
 +
 +      IteratorSetting cfg;
-       Stopwatch sw;
 +      Iterator<Entry<Key,Value>> iterator;
++      long nanosWithWait = 0;
 +      try (Scanner s = c.createScanner(table, new Authorizations())) {
 +
 +        cfg = new IteratorSetting(100, SlowIterator.class);
 +        // A batch size of one will end up calling seek() for each element with no calls to next()
 +        SlowIterator.setSeekSleepTime(cfg, 100L);
 +
 +        s.addScanIterator(cfg);
 +        // Never start readahead
 +        s.setReadaheadThreshold(Long.MAX_VALUE);
 +        s.setBatchSize(1);
 +        s.setRange(new Range());
 +
-         sw = Stopwatch.createUnstarted();
 +        iterator = s.iterator();
- 
-         sw.start();
++        long startTime = System.nanoTime();
 +        while (iterator.hasNext()) {
-           sw.stop();
++          nanosWithWait += System.nanoTime() - startTime;
 +
 +          // While we "do work" in the client, we should be fetching the next result
 +          UtilWaitThread.sleep(100L);
 +          iterator.next();
-           sw.start();
++          startTime = System.nanoTime();
 +        }
-         sw.stop();
++        nanosWithWait += System.nanoTime() - startTime;
 +      }
 +
-       long millisWithWait = sw.elapsed(TimeUnit.MILLISECONDS);
- 
++      long nanosWithNoWait = 0;
 +      try (Scanner s = c.createScanner(table, new Authorizations())) {
 +        s.addScanIterator(cfg);
 +        s.setRange(new Range());
 +        s.setBatchSize(1);
 +        s.setReadaheadThreshold(0L);
 +
-         sw = Stopwatch.createUnstarted();
 +        iterator = s.iterator();
- 
-         sw.start();
++        long startTime = System.nanoTime();
 +        while (iterator.hasNext()) {
-           sw.stop();
++          nanosWithNoWait += System.nanoTime() - startTime;
 +
 +          // While we "do work" in the client, we should be fetching the next result
 +          UtilWaitThread.sleep(100L);
 +          iterator.next();
-           sw.start();
++          startTime = System.nanoTime();
 +        }
-         sw.stop();
- 
-         long millisWithNoWait = sw.elapsed(TimeUnit.MILLISECONDS);
++        nanosWithNoWait += System.nanoTime() - startTime;
 +
 +        // The "no-wait" time should be much less than the "wait-time"
 +        assertTrue(
-             "Expected less time to be taken with immediate readahead (" + millisWithNoWait
-                 + ") than without immediate readahead (" + millisWithWait + ")",
-             millisWithNoWait < millisWithWait);
++            "Expected less time to be taken with immediate readahead (" + nanosWithNoWait
++                + ") than without immediate readahead (" + nanosWithWait + ")",
++            nanosWithNoWait < nanosWithWait);
 +      }
      }
 -
 -    bw.addMutation(m);
 -    bw.close();
 -
 -    Scanner s = c.createScanner(table, new Authorizations());
 -
 -    IteratorSetting cfg = new IteratorSetting(100, SlowIterator.class);
 -    // A batch size of one will end up calling seek() for each element with no calls to next()
 -    SlowIterator.setSeekSleepTime(cfg, 100l);
 -
 -    s.addScanIterator(cfg);
 -    // Never start readahead
 -    s.setReadaheadThreshold(Long.MAX_VALUE);
 -    s.setBatchSize(1);
 -    s.setRange(new Range());
 -
 -    Iterator<Entry<Key,Value>> iterator = s.iterator();
 -    long nanosWithWait = 0;
 -    long startTime = System.nanoTime();
 -    while (iterator.hasNext()) {
 -      nanosWithWait += System.nanoTime() - startTime;
 -
 -      // While we "do work" in the client, we should be fetching the next result
 -      UtilWaitThread.sleep(100l);
 -      iterator.next();
 -      startTime = System.nanoTime();
 -    }
 -    nanosWithWait += System.nanoTime() - startTime;
 -
 -    s = c.createScanner(table, new Authorizations());
 -    s.addScanIterator(cfg);
 -    s.setRange(new Range());
 -    s.setBatchSize(1);
 -    s.setReadaheadThreshold(0l);
 -
 -    iterator = s.iterator();
 -    long nanosWithNoWait = 0;
 -    startTime = System.nanoTime();
 -    while (iterator.hasNext()) {
 -      nanosWithNoWait += System.nanoTime() - startTime;
 -
 -      // While we "do work" in the client, we should be fetching the next result
 -      UtilWaitThread.sleep(100l);
 -      iterator.next();
 -      startTime = System.nanoTime();
 -    }
 -    nanosWithNoWait += System.nanoTime() - startTime;
 -
 -    // The "no-wait" time should be much less than the "wait-time"
 -    assertTrue(
 -        "Expected less time to be taken with immediate readahead (" + nanosWithNoWait
 -            + ") than without immediate readahead (" + nanosWithWait + ")",
 -        nanosWithNoWait < nanosWithWait);
    }
  
  }