You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ho...@apache.org on 2022/04/05 00:53:55 UTC

[solr] branch jira/solr-16129 updated (ac602a8438b -> f8e19b259fc)

This is an automated email from the ASF dual-hosted git repository.

hossman pushed a change to branch jira/solr-16129
in repository https://gitbox.apache.org/repos/asf/solr.git


    from ac602a8438b SOLR-16129: tests w/nocommits showing why the test passes for the wrong reason and how to make it fail for the right reason
     new ab2b231c6a0 SOLR-16129: Use AutoLock in InputStreamResponseListener and resolve test nocommits
     new f8e19b259fc SOLR-16129: adjust timeAllowed -> setRequestTimeout hueristic to play nice with pathologically low values -- ex: timeAllowed=0

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../solr/client/solrj/impl/Http2SolrClient.java    |  10 +-
 .../apache/solr/client/solrj/util/AutoLock.java    | 139 ++++++++++++++++
 .../solrj/util/InputStreamResponseListener.java    | 180 +++++++++++++++------
 .../util/TestInputStreamResponseListener.java      | 120 ++++++++------
 4 files changed, 340 insertions(+), 109 deletions(-)
 create mode 100644 solr/solrj/src/java/org/apache/solr/client/solrj/util/AutoLock.java


[solr] 02/02: SOLR-16129: adjust timeAllowed -> setRequestTimeout hueristic to play nice with pathologically low values -- ex: timeAllowed=0

Posted by ho...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hossman pushed a commit to branch jira/solr-16129
in repository https://gitbox.apache.org/repos/asf/solr.git

commit f8e19b259fc520040d322584b94e08d5081c512e
Author: Chris Hostetter <ho...@apache.org>
AuthorDate: Mon Apr 4 17:42:32 2022 -0700

    SOLR-16129: adjust timeAllowed -> setRequestTimeout hueristic to play nice with pathologically low values -- ex: timeAllowed=0
---
 .../org/apache/solr/client/solrj/impl/Http2SolrClient.java     | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index d79449be931..95719b21ee8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -1089,10 +1089,10 @@ public class Http2SolrClient extends SolrClient {
   }
 
   /**
-   * If a <code>timeAllowed=X</code> is specified in the params, use (30ms + 2X) as a requestTimeout
-   * on the response listener. This should give the remote node ample time to recognize it's
-   * exceeded <code>timeAllowed</code> and response to the request, but if it doesn't then we want
-   * to abort in a reasonably proportinate amount of time and not wait forever.
+   * If a <code>timeAllowed=X</code> is specified in the params, use <code>max(500ms, 2X)</code> as
+   * a requestTimeout on the response listener. This should give the remote node ample time to
+   * recognize it's exceeded <code>timeAllowed</code> and respond to the request, but if it doesn't
+   * then we want to abort in a reasonably proportinate amount of time and not wait forever.
    *
    * @see CommonParams#TIME_ALLOWED
    */
@@ -1102,7 +1102,7 @@ public class Http2SolrClient extends SolrClient {
       final Long timeAllowed = params.getLong(CommonParams.TIME_ALLOWED);
       if (null != timeAllowed) {
         listener.setRequestTimeout(
-            Instant.now().plusMillis(30).plusMillis(timeAllowed.longValue() * 2L));
+            Instant.now().plusMillis(Math.max(500, timeAllowed.longValue() * 2L)));
       }
     }
   }


[solr] 01/02: SOLR-16129: Use AutoLock in InputStreamResponseListener and resolve test nocommits

Posted by ho...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hossman pushed a commit to branch jira/solr-16129
in repository https://gitbox.apache.org/repos/asf/solr.git

commit ab2b231c6a0be87fb117c4a610a630c5170e3d43
Author: Chris Hostetter <ho...@apache.org>
AuthorDate: Mon Apr 4 17:26:36 2022 -0700

    SOLR-16129: Use AutoLock in InputStreamResponseListener and resolve test nocommits
---
 .../apache/solr/client/solrj/util/AutoLock.java    | 139 ++++++++++++++++
 .../solrj/util/InputStreamResponseListener.java    | 180 +++++++++++++++------
 .../util/TestInputStreamResponseListener.java      | 120 ++++++++------
 3 files changed, 335 insertions(+), 104 deletions(-)

diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/util/AutoLock.java b/solr/solrj/src/java/org/apache/solr/client/solrj/util/AutoLock.java
new file mode 100644
index 00000000000..2b556d8db3d
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/util/AutoLock.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//  Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
+
+package org.apache.solr.client.solrj.util;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Reentrant lock that can be used in a try-with-resources statement.
+ *
+ * <p>Typical usage:
+ *
+ * <pre>
+ * try (AutoLock lock = this.lock.lock())
+ * {
+ *     // Something
+ * }
+ * </pre>
+ */
+public class AutoLock implements AutoCloseable, Serializable {
+  private static final long serialVersionUID = 3300696774541816341L;
+
+  private final ReentrantLock _lock = new ReentrantLock();
+
+  /**
+   * Acquires the lock.
+   *
+   * @return this AutoLock for unlocking
+   */
+  public AutoLock lock() {
+    _lock.lock();
+    return this;
+  }
+
+  /**
+   * @see ReentrantLock#isHeldByCurrentThread()
+   * @return whether this lock is held by the current thread
+   */
+  public boolean isHeldByCurrentThread() {
+    return _lock.isHeldByCurrentThread();
+  }
+
+  /**
+   * @return a {@link Condition} associated with this lock
+   */
+  public Condition newCondition() {
+    return _lock.newCondition();
+  }
+
+  // Package-private for testing only.
+  boolean isLocked() {
+    return _lock.isLocked();
+  }
+
+  @Override
+  public void close() {
+    _lock.unlock();
+  }
+
+  /**
+   * A reentrant lock with a condition that can be used in a try-with-resources statement.
+   *
+   * <p>Typical usage:
+   *
+   * <pre>
+   * // Waiting
+   * try (AutoLock lock = _lock.lock())
+   * {
+   *     lock.await();
+   * }
+   *
+   * // Signaling
+   * try (AutoLock lock = _lock.lock())
+   * {
+   *     lock.signalAll();
+   * }
+   * </pre>
+   */
+  public static class WithCondition extends AutoLock {
+    private final Condition _condition = newCondition();
+
+    @Override
+    public AutoLock.WithCondition lock() {
+      return (WithCondition) super.lock();
+    }
+
+    /**
+     * @see Condition#signal()
+     */
+    public void signal() {
+      _condition.signal();
+    }
+
+    /**
+     * @see Condition#signalAll()
+     */
+    public void signalAll() {
+      _condition.signalAll();
+    }
+
+    /**
+     * @see Condition#await()
+     * @throws InterruptedException if the current thread is interrupted
+     */
+    public void await() throws InterruptedException {
+      _condition.await();
+    }
+
+    /**
+     * @see Condition#await(long, TimeUnit)
+     * @param time the time to wait
+     * @param unit the time unit
+     * @return false if the waiting time elapsed
+     * @throws InterruptedException if the current thread is interrupted
+     */
+    public boolean await(long time, TimeUnit unit) throws InterruptedException {
+      return _condition.await(time, unit);
+    }
+  }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/util/InputStreamResponseListener.java b/solr/solrj/src/java/org/apache/solr/client/solrj/util/InputStreamResponseListener.java
index ef7a9212400..7edea3ec7ee 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/util/InputStreamResponseListener.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/util/InputStreamResponseListener.java
@@ -31,6 +31,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Queue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -48,10 +49,6 @@ import org.eclipse.jetty.util.IO;
 import org.eclipse.jetty.util.log.Log;
 import org.eclipse.jetty.util.log.Logger;
 
-// nocommit: we need to switch from using 'Object lock' to using a ReentrantLock w/Condition we can await on
-// nocommit: safest plan is to adopt all changes in https://github.com/eclipse/jetty.project/pull/7260 as is ...
-// nocommit: ...(while also pulling in "AutoLock") and add requestTimeout logic to await() call
-
 /**
  * Fork of jetty's <code>InputStreamResponseListener</code> that adds support for an (optional)
  * <code>requestTimeout</code> (which defaults to 1 hour from instantiation) as well as a
@@ -64,7 +61,8 @@ import org.eclipse.jetty.util.log.Logger;
  * <p>Typical usage is:
  *
  * <pre>
- * InputStreamResponseListener listener = new InputStreamResponseListener();
+ * long maxWaitLimit = 5000;
+ * InputStreamResponseListener listener = new InputStreamResponseListener(maxWaitLimit);
  * client.newRequest(...).send(listener);
  *
  * // Wait for the response headers to arrive
@@ -85,12 +83,14 @@ import org.eclipse.jetty.util.log.Logger;
  * <p>If the consumer is faster than the producer, then the consumer will block with the typical
  * {@link InputStream#read()} semantic. If the consumer is slower than the producer, then the
  * producer will block until the client consumes.
+ *
+ * @see <a href="https://github.com/eclipse/jetty.project/pull/7260">Jetty PR#7260</a>
  */
 public class InputStreamResponseListener extends Listener.Adapter {
   private static final Logger log = Log.getLogger(InputStreamResponseListener.class);
   private static final DeferredContentProvider.Chunk EOF =
       new DeferredContentProvider.Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP);
-  private final Object lock = this; 
+  private final AutoLock.WithCondition lock = new AutoLock.WithCondition();
   private final CountDownLatch responseLatch = new CountDownLatch(1);
   private final CountDownLatch resultLatch = new CountDownLatch(1);
   private final AtomicReference<InputStream> stream = new AtomicReference<>();
@@ -121,22 +121,29 @@ public class InputStreamResponseListener extends Listener.Adapter {
    * will throw an {@link IOException} wrapping a {@link TimeoutException}. Defaults to 1 HOUR from
    * when this Listener was constructed.
    *
+   * <p><b>NOTE:</b> This timeout is only checked when the caller is blocked waiting for more data
+   * to be recieved, it will not cause any failures in situations where the caller is slower to
+   * consume the content then the remote server is to provided it.
+   *
    * @param requestTimeout Instant past which all response chunks must be recieved, if null then
    *     {@link Instant#MAX} is used
+   * @see #getInputStream
    */
   public void setRequestTimeout(final Instant requestTimeout) {
     requestTimeoutRef.set(null == requestTimeout ? Instant.MAX : requestTimeout);
   }
 
   @Override
+  @SuppressWarnings("try")
   public void onHeaders(Response response) {
-    synchronized (lock) {
+    try (AutoLock l = lock.lock()) {
       this.response = response;
       responseLatch.countDown();
     }
   }
 
   @Override
+  @SuppressWarnings("try")
   public void onContent(Response response, ByteBuffer content, Callback callback) {
     if (content.remaining() == 0) {
       if (log.isDebugEnabled()) {
@@ -147,14 +154,14 @@ public class InputStreamResponseListener extends Listener.Adapter {
     }
 
     boolean closed;
-    synchronized (lock) {
+    try (AutoLock.WithCondition l = lock.lock()) {
       closed = this.closed;
       if (!closed) {
         if (log.isDebugEnabled()) {
           log.debug("Queueing content {}", content);
         }
         chunks.add(new DeferredContentProvider.Chunk(content, callback));
-        lock.notifyAll();
+        l.signalAll();
       }
     }
 
@@ -167,10 +174,11 @@ public class InputStreamResponseListener extends Listener.Adapter {
   }
 
   @Override
+  @SuppressWarnings("try")
   public void onSuccess(Response response) {
-    synchronized (lock) {
+    try (AutoLock.WithCondition l = lock.lock()) {
       if (!closed) chunks.add(EOF);
-      lock.notifyAll();
+      l.signalAll();
     }
 
     if (log.isDebugEnabled()) {
@@ -179,27 +187,34 @@ public class InputStreamResponseListener extends Listener.Adapter {
   }
 
   @Override
+  @SuppressWarnings("try")
   public void onFailure(Response response, Throwable failure) {
     List<Callback> callbacks;
-    synchronized (lock) {
+    try (AutoLock.WithCondition l = lock.lock()) {
       if (this.failure != null) return;
+      if (failure == null) {
+        failure = new IOException("Generic failure");
+        log.warn("Missing failure in onFailure() callback", failure);
+      }
       this.failure = failure;
       callbacks = drain();
-      lock.notifyAll();
+      l.signalAll();
     }
 
     if (log.isDebugEnabled()) {
       log.debug("Content failure", failure);
     }
 
-    callbacks.forEach(callback -> callback.failed(failure));
+    Throwable f = failure;
+    callbacks.forEach(callback -> callback.failed(f));
   }
 
   @Override
+  @SuppressWarnings("try")
   public void onComplete(Result result) {
     Throwable failure = result.getFailure();
     List<Callback> callbacks = Collections.emptyList();
-    synchronized (lock) {
+    try (AutoLock.WithCondition l = lock.lock()) {
       this.result = result;
       if (result.isFailed() && this.failure == null) {
         this.failure = failure;
@@ -208,12 +223,15 @@ public class InputStreamResponseListener extends Listener.Adapter {
       // Notify the response latch in case of request failures.
       responseLatch.countDown();
       resultLatch.countDown();
-      lock.notifyAll();
+      l.signalAll();
     }
 
     if (log.isDebugEnabled()) {
-      if (failure == null) log.debug("Result success");
-      else log.debug("Result failure", failure);
+      if (failure == null) {
+        log.debug("Result success");
+      } else {
+        log.debug("Result failure", failure);
+      }
     }
 
     callbacks.forEach(callback -> callback.failed(failure));
@@ -232,11 +250,12 @@ public class InputStreamResponseListener extends Listener.Adapter {
    * @throws TimeoutException if the timeout expires
    * @throws ExecutionException if a failure happened
    */
+  @SuppressWarnings("try")
   public Response get(long timeout, TimeUnit unit)
       throws InterruptedException, TimeoutException, ExecutionException {
     boolean expired = !responseLatch.await(timeout, unit);
     if (expired) throw new TimeoutException();
-    synchronized (lock) {
+    try (AutoLock l = lock.lock()) {
       // If the request failed there is no response.
       if (response == null) throw new ExecutionException(failure);
       return response;
@@ -256,10 +275,11 @@ public class InputStreamResponseListener extends Listener.Adapter {
    * @throws TimeoutException if the timeout expires
    * @see #get(long, TimeUnit)
    */
+  @SuppressWarnings("try")
   public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
     boolean expired = !resultLatch.await(timeout, unit);
     if (expired) throw new TimeoutException();
-    synchronized (lock) {
+    try (AutoLock l = lock.lock()) {
       return result;
     }
   }
@@ -270,6 +290,11 @@ public class InputStreamResponseListener extends Listener.Adapter {
    * <p>The method may be invoked only once; subsequent invocations will return a closed {@link
    * InputStream}.
    *
+   * <p>{@link InputStream#read} calls on this <code>InputStream</code> may block up to the
+   * configured <code>maxWaitLimit</code> or until the {@link #setRequestTimeout} (which ever is
+   * sooner) if no data is currently available, at which point an {@link IOException} wrapping a
+   * {@link TimeoutException} wll be thrown
+   *
    * @return an input stream providing the response content
    */
   public InputStream getInputStream() {
@@ -278,9 +303,10 @@ public class InputStreamResponseListener extends Listener.Adapter {
     return IO.getClosedStream();
   }
 
+  @SuppressWarnings("try")
   private List<Callback> drain() {
     List<Callback> callbacks = new ArrayList<>();
-    synchronized (lock) {
+    try (AutoLock l = lock.lock()) {
       while (true) {
         DeferredContentProvider.Chunk chunk = chunks.peek();
         if (chunk == null || chunk == EOF) break;
@@ -291,6 +317,23 @@ public class InputStreamResponseListener extends Listener.Adapter {
     return callbacks;
   }
 
+  @Override
+  @SuppressWarnings("try")
+  public String toString() {
+    try (AutoLock l = lock.lock()) {
+      return String.format(
+          Locale.ROOT,
+          "%s@%x[response=%s,result=%s,closed=%b,failure=%s,chunks=%s]",
+          getClass().getSimpleName(),
+          hashCode(),
+          response,
+          result,
+          closed,
+          failure,
+          chunks);
+    }
+  }
+
   private class Input extends InputStream {
     @Override
     public int read() throws IOException {
@@ -300,17 +343,49 @@ public class InputStreamResponseListener extends Listener.Adapter {
       return tmp[0] & 0xFF;
     }
 
+    /**
+     * awaits on the condition until either <code>maxWaitLimit</code> or <code>requestTimeout</code>
+     * is reached (whichever is sooner)
+     *
+     * @return an explantion as to why the condition wait expired, or null if the condition was met
+     *     in a timely manner
+     */
+    private final String awaitOrReturnError(final AutoLock.WithCondition condition)
+        throws InterruptedException {
+      final Instant effectiveNow = Instant.now();
+      final Instant requestTimeout = requestTimeoutRef.get();
+      assert null != requestTimeout;
+
+      if (effectiveNow.isBefore(requestTimeout)) {
+        // NOTE: convert maxWaitLimit to Instant for comparison, rather then vice-versa, so we
+        // don't risk ArithemticException.  (await in MILLIS instead of NANOS for same reason)
+        final long awaitAmountMillis =
+            effectiveNow.plusMillis(maxWaitLimit).isBefore(requestTimeout)
+                ? maxWaitLimit
+                : Math.min(1L, Duration.between(effectiveNow, requestTimeout).toMillis());
+
+        if (condition.await(awaitAmountMillis, TimeUnit.MILLISECONDS)) {
+          return null;
+        } else {
+          return (awaitAmountMillis < maxWaitLimit ? "requestTimeout" : "maxWaitLimit")
+              + " exceeded";
+        }
+      } // else...
+
+      // we've already reached (or exceeded) requestTimeout w/o any waiting
+      return "requestTimeout exceeded";
+    }
+
     @Override
     public int read(byte[] b, int offset, int length) throws IOException {
       try {
-        int result;
+        int result = 0;
         Callback callback = null;
-        synchronized (lock) {
+        List<Callback> callbacks = Collections.emptyList();
+        Throwable timeoutFailure = null;
+        try (AutoLock.WithCondition l = lock.lock()) {
           DeferredContentProvider.Chunk chunk;
           while (true) {
-            final Instant requestTimeout = requestTimeoutRef.get();
-            assert null != requestTimeout;
-
             chunk = chunks.peek();
             if (chunk == EOF) return -1;
 
@@ -320,35 +395,38 @@ public class InputStreamResponseListener extends Listener.Adapter {
 
             if (closed) throw new AsynchronousCloseException();
 
-            // nocommit: this check shouldn't be needed/used here - the await call should tell us to Timeout
-            if (requestTimeout.isBefore(Instant.now()))
-              throw new TimeoutException("requestTimeout exceeded");
-
-            // NOTE: convert maxWaitLimit to Instant for comparison, rather then vice-versa, so we
-            // don't risk ArithemticException
-            final Instant now = Instant.now();
-            // nocommit: replace this with await, if result is false throw TimeoutException...
-            // nocommit: ...exception message should mention waitLimit, unless requestTime.isBefore(now())
-            lock.wait(
-                now.plusMillis(maxWaitLimit).isBefore(requestTimeout)
-                    ? maxWaitLimit
-                    : Duration.between(Instant.now(), requestTimeout).toMillis());
+            final String expirationReason = awaitOrReturnError(l);
+            if (null != expirationReason) {
+              if (log.isDebugEnabled()) {
+                log.debug(
+                    "Read timed out: {}, {}", expirationReason, InputStreamResponseListener.this);
+              }
+              failure = timeoutFailure = new TimeoutException("Read timeout: " + expirationReason);
+              callbacks = drain();
+              break;
+            }
           }
 
-          ByteBuffer buffer = chunk.buffer;
-          result = Math.min(buffer.remaining(), length);
-          buffer.get(b, offset, result);
-          if (!buffer.hasRemaining()) {
-            callback = chunk.callback;
-            chunks.poll();
+          if (timeoutFailure == null) {
+            ByteBuffer buffer = chunk.buffer;
+            result = Math.min(buffer.remaining(), length);
+            buffer.get(b, offset, result);
+            if (!buffer.hasRemaining()) {
+              callback = chunk.callback;
+              chunks.poll();
+            }
           }
         }
-        if (callback != null) callback.succeeded();
-        return result;
+        if (timeoutFailure == null) {
+          if (callback != null) callback.succeeded();
+          return result;
+        } else {
+          Throwable f = timeoutFailure;
+          callbacks.forEach(c -> c.failed(f));
+          throw toIOException(f);
+        }
       } catch (InterruptedException x) {
         throw new InterruptedIOException();
-      } catch (TimeoutException y) {
-        throw toIOException(y);
       }
     }
 
@@ -360,11 +438,11 @@ public class InputStreamResponseListener extends Listener.Adapter {
     @Override
     public void close() throws IOException {
       List<Callback> callbacks;
-      synchronized (lock) {
+      try (AutoLock.WithCondition l = lock.lock()) {
         if (closed) return;
         closed = true;
         callbacks = drain();
-        lock.notifyAll();
+        l.signalAll();
       }
 
       if (log.isDebugEnabled()) {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/util/TestInputStreamResponseListener.java b/solr/solrj/src/test/org/apache/solr/client/solrj/util/TestInputStreamResponseListener.java
index 742fbce0a81..b3477295680 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/util/TestInputStreamResponseListener.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/util/TestInputStreamResponseListener.java
@@ -16,97 +16,111 @@
  */
 package org.apache.solr.client.solrj.util;
 
-import java.io.InputStream;
+import static org.hamcrest.core.StringContains.containsString;
+
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
-
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinTask;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.TimeoutException;
 import org.apache.solr.SolrTestCase;
-import org.apache.solr.client.solrj.impl.HttpClientUtil;
-
 import org.eclipse.jetty.client.HttpResponse;
 import org.eclipse.jetty.util.Callback;
 
-import static org.hamcrest.core.StringContains.containsString;
-
 public class TestInputStreamResponseListener extends SolrTestCase {
 
   public void testNoDataTriggersWaitLimit() throws Exception {
     final long waitLimit = 1000; // millis
     final InputStreamResponseListener listener = new InputStreamResponseListener(waitLimit);
+    listener.setRequestTimeout(null);
 
-    // nocommit: we should be able to use a null requestTimeout to pass this test....
-    // nocommit: (the waitLimit should be enough to trigger failure)
-    listener.setRequestTimeout(Instant.now()); // nocommit
-    // nocommit: // listener.setRequestTimeout(null);
-    
     // emulate low level transport code providing headers, and then nothing else...
-    final HttpResponse dummyResponse = new HttpResponse(null /* bogus request */, Collections.emptyList());
+    final HttpResponse dummyResponse =
+        new HttpResponse(null /* bogus request */, Collections.emptyList());
     listener.onHeaders(dummyResponse);
 
     // client tries to consume, but there is never any content...
     assertEquals(dummyResponse, listener.get(0, TimeUnit.SECONDS));
-    final ForkJoinTask<IOException> readTask = ForkJoinPool.commonPool().submit(() -> {
-        try (final InputStream stream = listener.getInputStream()) {
-          return expectThrows(IOException.class, () -> {
-              int trash = stream.read();
-            });
-        }
-      });
-    final IOException expected = readTask.get(waitLimit * 2L, TimeUnit.MILLISECONDS);
+    final ForkJoinTask<IOException> readTask =
+        ForkJoinPool.commonPool()
+            .submit(
+                () -> {
+                  // (Do this in a ForkJoin thread so we can easily fail test if read() doesn't
+                  // throw IOException in a timely manner)
+                  try (final InputStream stream = listener.getInputStream()) {
+                    return expectThrows(
+                        IOException.class,
+                        () -> {
+                          int trash = stream.read();
+                        });
+                  }
+                });
+    final IOException expected = readTask.get(waitLimit * 3L, TimeUnit.MILLISECONDS);
     assertNotNull(expected.getCause());
     assertEquals(TimeoutException.class, expected.getCause().getClass());
-
-    // nocommit: this should be something about waitLimit...
-    assertThat(expected.getCause().getMessage(), containsString("requestTimeout exceeded"));
+    assertThat(expected.getCause().getMessage(), containsString("maxWaitLimit exceeded"));
   }
 
-
-      
   public void testReallySlowDataTriggersRequestTimeout() throws Exception {
-    final long writeDelayMillies = 500;
-    final InputStreamResponseListener listener = new InputStreamResponseListener(writeDelayMillies * 2);
-    
-    // emulate low level transport code providing headers, and then writes a (slow) never ending stream of bytes
-    final HttpResponse dummyResponse = new HttpResponse(null /* bogus request */, Collections.emptyList());
+    final long writeDelayMillies = 100;
+    // crazy long maxWaitLimit relative to how often new data should be available
+    final InputStreamResponseListener listener =
+        new InputStreamResponseListener(5 * writeDelayMillies);
+
+    // emulate low level transport code providing headers, and then writes a (slow) never ending
+    // stream of bytes
+    final HttpResponse dummyResponse =
+        new HttpResponse(null /* bogus request */, Collections.emptyList());
     listener.onHeaders(dummyResponse);
-    final CountDownLatch writeTaskCloseLatch = new CountDownLatch(1);
+    final CountDownLatch closeLatch = new CountDownLatch(1);
     try {
-      final ForkJoinTask<Boolean> writeTask = ForkJoinPool.commonPool().submit(() -> {
-          final ByteBuffer dataToWriteForever = ByteBuffer.allocate(5);
-          while (0 < writeTaskCloseLatch.getCount()) {
-            dataToWriteForever.position(0);
-            listener.onContent(dummyResponse, dataToWriteForever, Callback.NOOP);
-            Thread.sleep(writeDelayMillies);
-          }
-          return true;
-        });
+      final ForkJoinTask<Boolean> writeTask =
+          ForkJoinPool.commonPool()
+              .submit(
+                  () -> {
+                    final ByteBuffer dataToWriteForever = ByteBuffer.allocate(5);
+                    while (0 < closeLatch.getCount()) {
+                      dataToWriteForever.position(0);
+                      listener.onContent(dummyResponse, dataToWriteForever, Callback.NOOP);
+                      Thread.sleep(writeDelayMillies);
+                    }
+                    return true;
+                  });
 
       // client reads "forever" ... until read times out because requestTimeout exceeded
       assertEquals(dummyResponse, listener.get(0, TimeUnit.SECONDS));
-      final IOException expected = expectThrows(IOException.class, () -> {
-          final Instant requestTimeout = Instant.now().plus(1, ChronoUnit.MINUTES);
-          listener.setRequestTimeout(requestTimeout);
-          final Instant forever = requestTimeout.plusSeconds(60);
-          try (final InputStream stream = listener.getInputStream()) {
-            while (Instant.now().isBefore(forever)) {
-              int trash = stream.read(); // this should eventually throw an exception
-            }
-          }
-        });
+      final ForkJoinTask<IOException> readTask =
+          ForkJoinPool.commonPool()
+              .submit(
+                  () -> {
+                    // (Do this in a ForkJoin thread so we can easily fail test if read() doesn't
+                    // throw IOException in a timely manner)
+                    return expectThrows(
+                        IOException.class,
+                        () -> {
+                          final Instant requestTimeout = Instant.now().plus(5, ChronoUnit.SECONDS);
+                          listener.setRequestTimeout(requestTimeout);
+                          final Instant forever = requestTimeout.plusSeconds(60);
+                          try (final InputStream stream = listener.getInputStream()) {
+                            while (0 < closeLatch.getCount()) {
+                              int trash =
+                                  stream.read(); // this should eventually throw an exception
+                            }
+                          }
+                        });
+                  });
+      final IOException expected = readTask.get(10, TimeUnit.SECONDS);
       assertNotNull(expected.getCause());
       assertEquals(TimeoutException.class, expected.getCause().getClass());
       assertThat(expected.getCause().getMessage(), containsString("requestTimeout exceeded"));
     } finally {
-      writeTaskCloseLatch.countDown();
+      closeLatch.countDown();
     }
   }
 }