You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by le...@apache.org on 2019/03/04 21:50:12 UTC

[incubator-druid] branch master updated: Fix and document concurrency of EventReceiverFirehose and TimedShutoffFirehose; Refine concurrency specification of Firehose (#7038)

This is an automated email from the ASF dual-hosted git repository.

leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 10c9f6d  Fix and document concurrency of EventReceiverFirehose and TimedShutoffFirehose; Refine concurrency specification of Firehose (#7038)
10c9f6d is described below

commit 10c9f6d7086fc2bbccb8673bce55b1e711d131e7
Author: Roman Leventov <le...@gmail.com>
AuthorDate: Mon Mar 4 18:50:03 2019 -0300

    Fix and document concurrency of EventReceiverFirehose and TimedShutoffFirehose; Refine concurrency specification of Firehose (#7038)
    
    #### `EventReceiverFirehoseFactory`
    Fixed several concurrency bugs in `EventReceiverFirehoseFactory`:
     - Race condition over putting an entry into `producerSequences` in `checkProducerSequence()`.
     - `Stopwatch` used to measure time across threads, but it's a non-thread-safe class.
     - Use `System.nanoTime()` instead of `System.currentTimeMillis()` because the latter are [not suitable](https://stackoverflow.com/a/351571/648955)  for measuring time intervals.
     - `close()` was not synchronized by could be called from multiple threads concurrently.
    
    Removed unnecessary `readLock` (protecting `hasMore()` and `nextRow()` which are always called from a single thread). Removed unnecessary `volatile` modifiers.
    
    Documented threading model and concurrent control flow of `EventReceiverFirehose` instances.
    
    **Important:** please read the updated Javadoc for `EventReceiverFirehose.addAll()`. It allows events from different requests (batches) to be interleaved in the buffer. Is this OK?
    
    #### `TimedShutoffFirehoseFactory`
    - Fixed a race condition that was possible because `close()` that was not properly synchronized.
    
    Documented threading model and concurrent control flow of `TimedShutoffFirehose` instances.
    
    #### `Firehose`
    
    Refined concurrency contract of `Firehose` based on `EventReceiverFirehose` implementation. Importantly, now it states that `close()` doesn't affect `hasMore()` and `nextRow()` and could be called concurrently with them. In other words, specified that `close()` is for "row supply" side rather than "row consume" side. However, I didn't check that other `Firehose` implementatations adhere to this contract.
    
    <hr>
    
    This issue is the result of reviewing `EventReceiverFirehose` and `TimedShutoffFirehose` using [this checklist](https://medium.com/@leventov/code-review-checklist-java-concurrency-49398c326154).
---
 .idea/inspectionProfiles/Druid.xml                 |   4 +-
 .../org/apache/druid/concurrent/LifecycleLock.java |   6 +-
 .../java/org/apache/druid/concurrent/Threads.java  |  59 +++
 .../java/org/apache/druid/data/input/Firehose.java |  41 +-
 .../org/apache/druid/data/input/FirehoseV2.java    |   7 +-
 .../org/apache/druid/utils/CloseableUtils.java     |  49 +++
 docs/content/ingestion/firehose.md                 |   5 +-
 .../firehose/EventReceiverFirehoseFactory.java     | 460 ++++++++++++++-------
 .../firehose/TimedShutoffFirehoseFactory.java      |  54 +--
 .../firehose/EventReceiverFirehoseIdleTest.java    |  22 +-
 .../firehose/EventReceiverFirehoseTest.java        |  34 +-
 11 files changed, 525 insertions(+), 216 deletions(-)

diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml
index 70a6a49..b2dcda3 100644
--- a/.idea/inspectionProfiles/Druid.xml
+++ b/.idea/inspectionProfiles/Druid.xml
@@ -46,6 +46,7 @@
     <inspection_tool class="EqualsUsesNonFinalVariable" enabled="true" level="WARNING" enabled_by_default="true" />
     <inspection_tool class="EqualsWhichDoesntCheckParameterClass" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="EqualsWithItself" enabled="true" level="ERROR" enabled_by_default="true" />
+    <inspection_tool class="FieldAccessNotGuarded" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="FieldCanBeLocal" enabled="true" level="WARNING" enabled_by_default="true">
       <option name="EXCLUDE_ANNOS">
         <value>
@@ -119,7 +120,7 @@
     <inspection_tool class="NumberEquality" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="ObjectEquality" enabled="true" level="WARNING" enabled_by_default="true">
       <option name="m_ignoreEnums" value="true" />
-      <option name="m_ignoreClassObjects" value="false" />
+      <option name="m_ignoreClassObjects" value="true" />
       <option name="m_ignorePrivateConstructors" value="false" />
     </inspection_tool>
     <inspection_tool class="ObjectEqualsNull" enabled="true" level="ERROR" enabled_by_default="true" />
@@ -356,6 +357,5 @@
       <option name="ADD_SERVLET_TO_ENTRIES" value="true" />
       <option name="ADD_NONJAVA_TO_ENTRIES" value="true" />
     </inspection_tool>
-    <inspection_tool class="FieldAccessNotGuarded" enabled="true" level="ERROR" enabled_by_default="true" />
   </profile>
 </component>
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/druid/concurrent/LifecycleLock.java b/core/src/main/java/org/apache/druid/concurrent/LifecycleLock.java
index ed6fea2..021c413 100644
--- a/core/src/main/java/org/apache/druid/concurrent/LifecycleLock.java
+++ b/core/src/main/java/org/apache/druid/concurrent/LifecycleLock.java
@@ -23,9 +23,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 
 /**
- * A synchronization tool for lifecycled objects (see {@link org.apache.druid.java.util.common.lifecycle.Lifecycle}, that need
- * happens-before between start() and other methods and/or to check that the object was successfully started in other
- * methods.
+ * A synchronization tool for lifecycled objects (see {@link org.apache.druid.java.util.common.lifecycle.Lifecycle},
+ * that need happens-before between start() and other methods and/or to check that the object was successfully started
+ * in other methods.
  *
  * Guarantees in terms of JMM: happens-before between {@link #exitStart()} and {@link #awaitStarted()},
  * exitStart() and {@link #canStop()}, if it returns {@code true}.
diff --git a/core/src/main/java/org/apache/druid/concurrent/Threads.java b/core/src/main/java/org/apache/druid/concurrent/Threads.java
new file mode 100644
index 0000000..fe8e79f
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/concurrent/Threads.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.concurrent;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+public final class Threads
+{
+
+  /**
+   * Equivalent of {@link Thread#sleep(long)} with arguments and semantics of timed wait methods in classes from {@link
+   * java.util.concurrent} (like {@link java.util.concurrent.Semaphore#tryAcquire(long, TimeUnit)},
+   * {@link java.util.concurrent.locks.Lock#tryLock(long, TimeUnit)}, etc.): if the sleepTime argument is negative or
+   * zero, the method returns immediately. {@link Thread#sleep}, on the contrary, throws an IllegalArgumentException if
+   * the argument is negative and attempts to unschedule the thread if the argument is zero.
+   *
+   * @throws InterruptedException if the current thread is interrupted when this method is called or during sleeping.
+   */
+  public static void sleepFor(long sleepTime, TimeUnit unit) throws InterruptedException
+  {
+    if (Thread.interrupted()) {
+      throw new InterruptedException();
+    }
+    if (sleepTime <= 0) {
+      return;
+    }
+    long sleepTimeLimitNanos = System.nanoTime() + unit.toNanos(sleepTime);
+    while (true) {
+      long sleepTimeoutNanos = sleepTimeLimitNanos - System.nanoTime();
+      if (sleepTimeoutNanos <= 0) {
+        return;
+      }
+      LockSupport.parkNanos(sleepTimeoutNanos);
+      if (Thread.interrupted()) {
+        throw new InterruptedException();
+      }
+    }
+  }
+
+  private Threads() {}
+}
diff --git a/core/src/main/java/org/apache/druid/data/input/Firehose.java b/core/src/main/java/org/apache/druid/data/input/Firehose.java
index 7886cdc..39556a8 100644
--- a/core/src/main/java/org/apache/druid/data/input/Firehose.java
+++ b/core/src/main/java/org/apache/druid/data/input/Firehose.java
@@ -23,19 +23,24 @@ import org.apache.druid.guice.annotations.ExtensionPoint;
 
 import javax.annotation.Nullable;
 import java.io.Closeable;
+import java.io.IOException;
 
 /**
  * This is an interface that holds onto the stream of incoming data.  Realtime data ingestion is built around this
- * abstraction.  In order to add a new type of source for realtime data ingestion, all you need to do is implement
- * one of these and register it with the Main.
+ * abstraction.
  *
  * This object acts a lot like an Iterator, but it doesn't extend the Iterator interface because it extends
- * Closeable and it is very important that the close() method doesn't get forgotten, which is easy to do if this
- * gets passed around as an Iterator.
- * <p>
- * The implementation of this interface only needs to be minimally thread-safe. The three methods ##hasMore(),
- * ##nextRow() and ##commit() are all called from the same thread.  ##commit(), however, returns a callback
- * which will be called on another thread, so the operations inside of that callback must be thread-safe.
+ * Closeable and it is very important that the {@link #close()} method doesn't get forgotten, which is easy to do if
+ * this gets passed around as an Iterator. Note that {@link #close()} doesn't cut the stream of rows for Firehose users
+ * immediately, but rather stops the supply of new rows into internal buffers. {@link #hasMore()} and {@link #nextRow()}
+ * are expected to operate for some time after (or concurrently with) {@link #close()} until the buffered events (if
+ * any) run out.
+ *
+ * Concurrency:
+ * The three methods {@link #hasMore()}, {@link #nextRow()} and {@link #commit()} are all called from the same thread.
+ * {@link #commit()}, however, returns a callback which will be called on another thread. {@link #close()} might be
+ * called concurrently from a thread different from the thread calling {@link #hasMore()}, {@link #nextRow()} and {@link
+ * #commit()}.
  * </p>
  */
 @ExtensionPoint
@@ -43,8 +48,8 @@ public interface Firehose extends Closeable
 {
   /**
    * Returns whether there are more rows to process.  This is used to indicate that another item is immediately
-   * available via ##nextRow().  Thus, if the stream is still available but there are no new messages on it, this call
-   * should block until a new message is available.
+   * available via {@link #nextRow()}.  Thus, if the stream is still available but there are no new messages on it, this
+   * call should block until a new message is available.
    *
    * If something happens such that the stream is no longer available, this should return false.
    *
@@ -77,8 +82,22 @@ public interface Firehose extends Closeable
    * A simple implementation of this interface might do nothing when run() is called 
    * (in which case the same do-nothing instance can be returned every time), or 
    * a more complex implementation might clean up temporary resources that are no longer needed 
-   * because of InputRows delivered by prior calls to ##nextRow().
+   * because of InputRows delivered by prior calls to {@link #nextRow()}.
    * </p>
    */
   Runnable commit();
+
+  /**
+   * Closes the "ingestion side" of the Firehose, potentially concurrently with calls to {@link #hasMore()}, {@link
+   * #nextRow()} and {@link #commit()} being made from a different thread. {@link #hasMore()} and {@link #nextRow()}
+   * continue to work after close(), but since the ingestion side is closed rows will eventually run out.
+   *
+   * The effects of calling run() on the {@link Runnable} object returned from {@link #commit()} (in other words,
+   * doing the commit) concurrently or after close() are unspecified: commit may not be performed silently (that is,
+   * run() call completes without an Exception, but the commit is not actually done), or a error may result. Note that
+   * {@link #commit()} method itself can be called concurrently with close(), but it doesn't make much sense, because
+   * run() on the returned Runnable then can't be called.
+   */
+  @Override
+  void close() throws IOException;
 }
diff --git a/core/src/main/java/org/apache/druid/data/input/FirehoseV2.java b/core/src/main/java/org/apache/druid/data/input/FirehoseV2.java
index 6c2ddae..c6aa33f 100644
--- a/core/src/main/java/org/apache/druid/data/input/FirehoseV2.java
+++ b/core/src/main/java/org/apache/druid/data/input/FirehoseV2.java
@@ -42,9 +42,10 @@ import java.io.Closeable;
  * Closeable and it is very important that the close() method doesn't get forgotten, which is easy to do if this
  * gets passed around as an Iterator.
  *
- * The implementation of this interface only needs to be minimally thread-safe. The methods ##start(), ##advance(),
- * ##currRow() and ##makeCommitter() are all called from the same thread.  ##makeCommitter(), however, returns a callback
- * which will be called on another thread, so the operations inside of that callback must be thread-safe.
+ * The implementation of this interface only needs to be minimally thread-safe. The methods {@link #start()}, {@link
+ * #advance()}, {@link #currRow()} and {@link #makeCommitter()} are all called from the same thread. {@link
+ * #makeCommitter()}, however, returns a callback which will be called on another thread, so the operations inside of
+ * that callback must be thread-safe.
  */
 @ExtensionPoint
 public interface FirehoseV2 extends Closeable
diff --git a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java
new file mode 100644
index 0000000..03a4f2c
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.utils;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Methods in this class could have belonged to {@link org.apache.druid.java.util.common.io.Closer}, but not editing
+ * that class to keep its source close to Guava source.
+ */
+public final class CloseableUtils
+{
+  /**
+   * Call method instead of code like
+   *
+   * first.close();
+   * second.close();
+   *
+   * to have safety of {@link org.apache.druid.java.util.common.io.Closer}, but without associated boilerplate code
+   * of creating a Closer and registering objects in it.
+   */
+  public static void closeBoth(Closeable first, Closeable second) throws IOException
+  {
+    //noinspection EmptyTryBlock
+    try (Closeable ignore1 = second; Closeable ignore2 = first) {
+      // piggy-back try-with-resources semantics
+    }
+  }
+
+  private CloseableUtils() {}
+}
diff --git a/docs/content/ingestion/firehose.md b/docs/content/ingestion/firehose.md
index faa83f3..322a1f2 100644
--- a/docs/content/ingestion/firehose.md
+++ b/docs/content/ingestion/firehose.md
@@ -191,8 +191,9 @@ When using this firehose, events can be sent by submitting a POST request to the
 |property|description|required?|
 |--------|-----------|---------|
 |type|This should be "receiver"|yes|
-|serviceName|name used to announce the event receiver service endpoint|yes|
-|bufferSize| size of buffer used by firehose to store events|no default(100000)|
+|serviceName|Name used to announce the event receiver service endpoint|yes|
+|maxIdleTime|A firehose is automatically shut down after not receiving any events for this period of time, in milliseconds. If not specified, a firehose is never shut down due to being idle. Zero and negative values have the same effect.|no|
+|bufferSize|Size of buffer used by firehose to store events|no, default is 100000|
 
 Shut down time for EventReceiverFirehose can be specified by submitting a POST request to
 
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
index 09751f9..b8ed0ad 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
@@ -28,10 +28,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.CountingInputStream;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.concurrent.Threads;
 import org.apache.druid.data.input.Firehose;
 import org.apache.druid.data.input.FirehoseFactory;
 import org.apache.druid.data.input.InputRow;
@@ -39,7 +40,6 @@ import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.server.metrics.EventReceiverFirehoseMetric;
 import org.apache.druid.server.metrics.EventReceiverFirehoseRegister;
@@ -50,6 +50,7 @@ import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.utils.Runnables;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
@@ -70,12 +71,9 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -85,16 +83,29 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowParser<Map<String, Object>>>
 {
+  private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
+
   public static final int MAX_FIREHOSE_PRODUCERS = 10_000;
 
-  private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
   private static final int DEFAULT_BUFFER_SIZE = 100_000;
-  private static final long DEFAULT_MAX_IDLE_TIME = Long.MAX_VALUE;
+
+  /**
+   * A "poison pill" object for {@link EventReceiverFirehose}'s internal buffer.
+   */
+  private static final Object FIREHOSE_CLOSED = new Object();
 
   private final String serviceName;
   private final int bufferSize;
-  private final long maxIdleTime;
-  private final Optional<ChatHandlerProvider> chatHandlerProvider;
+
+  /**
+   * Doesn't really support max idle times finer than 1 second due to how {@link
+   * EventReceiverFirehose#delayedCloseExecutor} is implemented, see a comment inside {@link
+   * EventReceiverFirehose#createDelayedCloseExecutor()}. This aspect is not reflected in docs because it's unlikely
+   * that anybody configures or cares about finer max idle times, and also because this is an implementation detail of
+   * {@link EventReceiverFirehose} that may change in the future.
+   */
+  private final long maxIdleTimeMillis;
+  private final @Nullable ChatHandlerProvider chatHandlerProvider;
   private final ObjectMapper jsonMapper;
   private final ObjectMapper smileMapper;
   private final EventReceiverFirehoseRegister eventReceiverFirehoseRegister;
@@ -104,7 +115,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
   public EventReceiverFirehoseFactory(
       @JsonProperty("serviceName") String serviceName,
       @JsonProperty("bufferSize") Integer bufferSize,
-      @JsonProperty("maxIdleTime") Long maxIdleTime,
+      // Keeping the legacy 'maxIdleTime' property name for backward compatibility. When the project is updated to
+      // Jackson 2.9 it could be changed, see https://github.com/apache/incubator-druid/issues/7152
+      @JsonProperty("maxIdleTime") @Nullable Long maxIdleTimeMillis,
       @JacksonInject ChatHandlerProvider chatHandlerProvider,
       @JacksonInject @Json ObjectMapper jsonMapper,
       @JacksonInject @Smile ObjectMapper smileMapper,
@@ -116,9 +129,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
 
     this.serviceName = serviceName;
     this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize;
-    this.maxIdleTime = maxIdleTime == null || maxIdleTime <= 0 ?
-                       DEFAULT_MAX_IDLE_TIME : maxIdleTime;
-    this.chatHandlerProvider = Optional.ofNullable(chatHandlerProvider);
+    this.maxIdleTimeMillis = (maxIdleTimeMillis == null || maxIdleTimeMillis <= 0) ? Long.MAX_VALUE : maxIdleTimeMillis;
+    this.chatHandlerProvider = chatHandlerProvider;
     this.jsonMapper = jsonMapper;
     this.smileMapper = smileMapper;
     this.eventReceiverFirehoseRegister = eventReceiverFirehoseRegister;
@@ -134,12 +146,12 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
     log.info("Connecting firehose: %s", serviceName);
     final EventReceiverFirehose firehose = new EventReceiverFirehose(firehoseParser);
 
-    if (chatHandlerProvider.isPresent()) {
-      log.info("Found chathandler of class[%s]", chatHandlerProvider.get().getClass().getName());
-      chatHandlerProvider.get().register(serviceName, firehose);
+    if (chatHandlerProvider != null) {
+      log.info("Found chathandler of class[%s]", chatHandlerProvider.getClass().getName());
+      chatHandlerProvider.register(serviceName, firehose);
       int lastIndexOfColon = serviceName.lastIndexOf(':');
       if (lastIndexOfColon > 0) {
-        chatHandlerProvider.get().register(serviceName.substring(lastIndexOfColon + 1), firehose);
+        chatHandlerProvider.register(serviceName.substring(lastIndexOfColon + 1), firehose);
       }
     } else {
       log.warn("No chathandler detected");
@@ -162,62 +174,178 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
     return bufferSize;
   }
 
-  @JsonProperty
-  public long getMaxIdleTime()
+  /**
+   * Keeping the legacy 'maxIdleTime' property name for backward compatibility. When the project is updated to Jackson
+   * 2.9 it could be changed, see https://github.com/apache/incubator-druid/issues/7152
+   */
+  @JsonProperty("maxIdleTime")
+  public long getMaxIdleTimeMillis()
   {
-    return maxIdleTime;
+    return maxIdleTimeMillis;
   }
 
+  /**
+   * Apart from adhering to {@link Firehose} contract regarding concurrency, this class has two methods that might be
+   * called concurrently with any other methods and each other, from arbitrary number of threads: {@link #addAll} and
+   * {@link #shutdown}.
+   *
+   * Concurrent data flow: in {@link #addAll} (can be called concurrently with any other methods and other calls to
+   * {@link #addAll}) rows are pushed into {@link #buffer}. The single Firehose "consumer" thread calls {@link #hasMore}
+   * and {@link #nextRow()}, where rows are taken out from the other end of the {@link #buffer} queue.
+   *
+   * This class creates and manages one thread ({@link #delayedCloseExecutor}) for calling {@link #close()}
+   * asynchronously in response to a {@link #shutdown} request, or after this Firehose has been idle (no calls to {@link
+   * #addAll}) for {@link #maxIdleTimeMillis}.
+   */
+  @VisibleForTesting
   public class EventReceiverFirehose implements ChatHandler, Firehose, EventReceiverFirehoseMetric
   {
-    private final ScheduledExecutorService exec;
-    private final ExecutorService idleDetector;
-    private final BlockingQueue<InputRow> buffer;
-    private final InputRowParser<Map<String, Object>> parser;
+    /**
+     * How does this thread work (and its interruption policy) is described in the comment for {@link
+     * #createDelayedCloseExecutor}.
+     */
+    @GuardedBy("this")
+    private @Nullable Thread delayedCloseExecutor;
 
-    private final Object readLock = new Object();
+    /**
+     * Contains {@link InputRow} objects, the last one is {@link #FIREHOSE_CLOSED} which is a "poison pill". Poison pill
+     * is used to notify the thread that calls {@link #hasMore()} and {@link #nextRow()} that the EventReceiverFirehose
+     * is closed without heuristic 500 ms timed blocking in a loop instead of a simple {@link BlockingQueue#take()}
+     * call (see {@link #hasMore} code).
+     */
+    private final BlockingQueue<Object> buffer;
+    private final InputRowParser<Map<String, Object>> parser;
 
-    private volatile InputRow nextRow = null;
+    /**
+     * This field needs to be volatile to ensure progress in {@link #addRows} method where it is read in a loop, and
+     * also in testing code calling {@link #isClosed()}.
+     */
     private volatile boolean closed = false;
+
+    /**
+     * This field and {@link #rowsRunOut} are not volatile because they are accessed only from {@link #hasMore()} and
+     * {@link #nextRow()} methods that are called from a single thread according to {@link Firehose} spec.
+     */
+    private InputRow nextRow = null;
+    private boolean rowsRunOut = false;
+
     private final AtomicLong bytesReceived = new AtomicLong(0);
-    private final AtomicLong lastBufferAddFailMsgTime = new AtomicLong(0);
+    private final AtomicLong lastBufferAddFailLoggingTimeNs = new AtomicLong(System.nanoTime());
     private final ConcurrentHashMap<String, Long> producerSequences = new ConcurrentHashMap<>();
-    private final Stopwatch idleWatch = Stopwatch.createUnstarted();
 
-    public EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)
+    /**
+     * This field and {@link #requestedShutdownTimeNs} use nanoseconds instead of milliseconds not to deal with the fact
+     * that {@link System#currentTimeMillis()} can "go backward", e. g. due to time correction on the server.
+     *
+     * This field and {@link #requestedShutdownTimeNs} must be volatile because they are de facto lazily initialized
+     * fields that are used concurrently in {@link #delayedCloseExecutor} (see {@link #createDelayedCloseExecutor()}).
+     * If they were not volatile, NPE would be possible in {@link #delayedCloseExecutor}. See
+     * https://shipilev.net/blog/2016/close-encounters-of-jmm-kind/#wishful-hb-actual for explanations.
+     */
+    private volatile Long idleCloseTimeNs = null;
+    private volatile Long requestedShutdownTimeNs = null;
+
+    EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)
     {
       this.buffer = new ArrayBlockingQueue<>(bufferSize);
       this.parser = parser;
-      exec = Execs.scheduledSingleThreaded("event-receiver-firehose-%d");
-      idleDetector = Execs.singleThreaded("event-receiver-firehose-idle-detector-%d");
-      idleDetector.submit(() -> {
-        long idled;
-        try {
-          while ((idled = idleWatch.elapsed(TimeUnit.MILLISECONDS)) < maxIdleTime) {
-            Thread.sleep(maxIdleTime - idled);
-          }
-        }
-        catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          return;
+
+      if (maxIdleTimeMillis != Long.MAX_VALUE) {
+        idleCloseTimeNs = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(maxIdleTimeMillis);
+        synchronized (this) {
+          createDelayedCloseExecutor();
         }
-        log.info("Firehose has been idle for %d ms, closing.", idled);
-        close();
-      });
-      idleWatch.start();
+      }
     }
 
+    @VisibleForTesting
+    synchronized @Nullable Thread getDelayedCloseExecutor()
+    {
+      return delayedCloseExecutor;
+    }
+
+    /**
+     * Creates and starts a {@link #delayedCloseExecutor} thread, either right from the EventReceiverFirehose's
+     * constructor if {@link #maxIdleTimeMillis} is specified, or otherwise lazily from {@link #shutdown}.
+     *
+     * The thread waits until the time when the Firehose should be closed because either {@link #addAll} was not called
+     * for the specified max idle time (see {@link #idleCloseTimeNs}), or until the shutoff time requested last
+     * via {@link #shutdown} (see {@link #requestedShutdownTimeNs}), whatever is sooner. Then the thread does
+     * two things:
+     *  1. if the Firehose is already closed (or in the process of closing, but {@link #closed} flag is already set), it
+     *    silently exits.
+     *  2. It checks both deadlines again:
+     *       a) if either of them has arrived, it calls {@link #close()} and exits.
+     *       b) otherwise, it waits until the nearest deadline again, and so on in a loop.
+     *
+     * This way the thread works predictably and robustly regardless of how both deadlines change (for example, shutoff
+     * time specified via {@link #shutdown} may jump in both directions).
+     *
+     * Other methods notify {@link #delayedCloseExecutor} that the Firehose state in some way that is important for this
+     * thread (that is, when {@link #close()} is called, {@link #delayedCloseExecutor} is no longer needed and should
+     * exit as soon as possible to release system resources; when {@link #shutdown} is called, the thread may need to
+     * wake up sooner if the shutoff time has been moved sooner) by simply interrupting it. The thread wakes up and
+     * continues its loop.
+     */
+    @GuardedBy("this")
+    private Thread createDelayedCloseExecutor()
+    {
+      Thread delayedCloseExecutor = new Thread(
+          () -> {
+            // The closed = true is visible after close() because there is a happens-before edge between
+            // delayedCloseExecutor.interrupt() call in close() and catching InterruptedException below in this loop.
+            while (!closed) {
+              if (idleCloseTimeNs == null && requestedShutdownTimeNs == null) {
+                // This is not possible unless there are bugs in the code of EventReceiverFirehose. AssertionError could
+                // have been thrown instead, but it doesn't seem to make a lot of sense in a background thread. Instead,
+                // we long the error and continue a loop after some pause.
+                log.error(
+                    "Either idleCloseTimeNs or requestedShutdownTimeNs must be non-null. "
+                    + "Please file a bug at https://github.com/apache/incubator-druid/issues"
+                );
+              }
+              if (idleCloseTimeNs != null && idleCloseTimeNs - System.nanoTime() <= 0) { // overflow-aware comparison
+                log.info("Firehose has been idle for %d ms, closing.", maxIdleTimeMillis);
+                close();
+              } else if (requestedShutdownTimeNs != null &&
+                         requestedShutdownTimeNs - System.nanoTime() <= 0) { // overflow-aware comparison
+                log.info("Closing Firehose after a shutdown request");
+                close();
+              }
+              try {
+                // It is possible to write code that sleeps until the next the next idleCloseTimeNs or
+                // requestedShutdownTimeNs, whatever is non-null and sooner, but that's fairly complicated code. That
+                // complexity perhaps overweighs the minor inefficiency of simply waking up every second.
+                Threads.sleepFor(1, TimeUnit.SECONDS);
+              }
+              catch (InterruptedException ignore) {
+                // Interruption is a wakeup, continue the loop
+              }
+            }
+          },
+          "event-receiver-firehose-closer"
+      );
+      delayedCloseExecutor.setDaemon(true);
+      this.delayedCloseExecutor = delayedCloseExecutor;
+      delayedCloseExecutor.start();
+      return delayedCloseExecutor;
+    }
+
+    /**
+     * This method might be called concurrently from multiple threads, if multiple requests arrive to the server at the
+     * same time (possibly exact duplicates). Concurrency is controlled in {@link #checkProducerSequence}, where only
+     * requests with "X-Firehose-Producer-Seq" number greater than the max "X-Firehose-Producer-Seq" in previously
+     * arrived requests are allowed to proceed. After that check requests don't synchronize with each other and
+     * therefore if two large batches are sent with little interval, the events from the batches might be mixed up in
+     * {@link #buffer} (if two {@link #addRows(Iterable)} are executed concurrently).
+     */
     @POST
     @Path("/push-events")
     @Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
     @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
-    public Response addAll(
-        InputStream in,
-        @Context final HttpServletRequest req
-    )
+    public Response addAll(InputStream in, @Context final HttpServletRequest req) throws JsonProcessingException
     {
-      idleWatch.reset();
-      idleWatch.start();
+      idleCloseTimeNs = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(maxIdleTimeMillis);
       Access accessResult = AuthorizationUtils.authorizeResourceAction(
           req,
           new ResourceAction(
@@ -236,9 +364,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
 
       ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
 
-      Optional<Response> producerSequenceResponse = checkProducerSequence(req, reqContentType, objectMapper);
-      if (producerSequenceResponse.isPresent()) {
-        return producerSequenceResponse.get();
+      Response producerSequenceResponse = checkProducerSequence(req, reqContentType, objectMapper);
+      if (producerSequenceResponse != null) {
+        return producerSequenceResponse;
       }
 
       CountingInputStream countingInputStream = new CountingInputStream(in);
@@ -274,67 +402,59 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
       }
       catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        throw Throwables.propagate(e);
-      }
-      catch (JsonProcessingException e) {
-        throw Throwables.propagate(e);
+        throw new RuntimeException(e);
       }
     }
 
     @Override
     public boolean hasMore()
     {
-      synchronized (readLock) {
-        try {
-          while (nextRow == null) {
-            nextRow = buffer.poll(500, TimeUnit.MILLISECONDS);
-            if (closed) {
-              break;
-            }
-          }
-        }
-        catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw Throwables.propagate(e);
-        }
-
-        return nextRow != null;
+      if (rowsRunOut) {
+        return false;
+      }
+      if (nextRow != null) {
+        return true;
+      }
+      Object next;
+      try {
+        next = buffer.take();
       }
+      catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+      //noinspection ObjectEquality
+      if (next == FIREHOSE_CLOSED) {
+        rowsRunOut = true;
+        return false;
+      }
+      nextRow = (InputRow) next;
+      return true;
     }
 
     @Nullable
     @Override
     public InputRow nextRow()
     {
-      synchronized (readLock) {
-        final InputRow row = nextRow;
-
-        if (row == null) {
-          throw new NoSuchElementException();
-        } else {
-          nextRow = null;
-          return row;
-        }
+      final InputRow row = nextRow;
+
+      if (row == null) {
+        throw new NoSuchElementException();
+      } else {
+        nextRow = null;
+        return row;
       }
     }
 
     @Override
     public Runnable commit()
     {
-      return new Runnable()
-      {
-        @Override
-        public void run()
-        {
-          // Nothing
-        }
-      };
+      return Runnables.getNoopRunnable();
     }
 
     @Override
     public int getCurrentBufferSize()
     {
-      // ArrayBlockingQueue's implementation of size() is thread-safe, so we can use that
       return buffer.size();
     }
 
@@ -350,34 +470,44 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
       return bytesReceived.get();
     }
 
+    /**
+     * This method is synchronized because it might be called concurrently from multiple threads: from {@link
+     * #delayedCloseExecutor}, and from the thread that creates and uses the Firehose object.
+     */
     @Override
-    public void close()
+    public synchronized void close()
     {
-      if (!closed) {
-        log.info("Firehose closing.");
-        closed = true;
+      if (closed) {
+        return;
+      }
+      closed = true;
+      log.info("Firehose closing.");
 
-        eventReceiverFirehoseRegister.unregister(serviceName);
-        if (chatHandlerProvider.isPresent()) {
-          chatHandlerProvider.get().unregister(serviceName);
-        }
-        exec.shutdown();
-        idleDetector.shutdown();
-        idleWatch.stop();
+      // Critical to add the poison pill to the queue, don't allow interruption.
+      Uninterruptibles.putUninterruptibly(buffer, FIREHOSE_CLOSED);
+
+      eventReceiverFirehoseRegister.unregister(serviceName);
+      if (chatHandlerProvider != null) {
+        chatHandlerProvider.unregister(serviceName);
+      }
+      if (delayedCloseExecutor != null && !delayedCloseExecutor.equals(Thread.currentThread())) {
+        // Interrupt delayedCloseExecutor to let it discover that closed flag is already set and exit.
+        delayedCloseExecutor.interrupt();
       }
     }
 
-    // public for tests
-    public void addRows(Iterable<InputRow> rows) throws InterruptedException
+    @VisibleForTesting
+    void addRows(Iterable<InputRow> rows) throws InterruptedException
     {
       for (final InputRow row : rows) {
         boolean added = false;
         while (!closed && !added) {
           added = buffer.offer(row, 500, TimeUnit.MILLISECONDS);
           if (!added) {
-            long currTime = System.currentTimeMillis();
-            long lastTime = lastBufferAddFailMsgTime.get();
-            if (currTime - lastTime > 10000 && lastBufferAddFailMsgTime.compareAndSet(lastTime, currTime)) {
+            long currTimeNs = System.nanoTime();
+            long lastTimeNs = lastBufferAddFailLoggingTimeNs.get();
+            if (currTimeNs - lastTimeNs > TimeUnit.SECONDS.toNanos(10) &&
+                lastBufferAddFailLoggingTimeNs.compareAndSet(lastTimeNs, currTimeNs)) {
               log.warn("Failed to add event to buffer with current size [%s] . Retrying...", buffer.size());
             }
           }
@@ -389,12 +519,19 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
       }
     }
 
+    /**
+     * This method might be called concurrently from multiple threads, if multiple shutdown requests arrive at the same
+     * time. No attempts are made to synchronize such requests, or prioritize them a-la "latest shutdown time wins" or
+     * "soonest shutdown time wins". {@link #delayedCloseExecutor}'s logic (see {@link #createDelayedCloseExecutor()})
+     * is indifferent to shutdown times jumping in arbitrary directions. But once a shutdown request is made, it can't
+     * be cancelled entirely, the shutdown time could only be rescheduled with a new request.
+     */
     @POST
     @Path("/shutdown")
     @Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
     @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
     public Response shutdown(
-        @QueryParam("shutoffTime") final String shutoffTime,
+        @QueryParam("shutoffTime") final String shutoffTimeMillis,
         @Context final HttpServletRequest req
     )
     {
@@ -411,13 +548,27 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
       }
 
       try {
-        DateTime shutoffAt = shutoffTime == null ? DateTimes.nowUtc() : DateTimes.of(shutoffTime);
-        log.info("Setting Firehose shutoffTime to %s", shutoffTime);
-        exec.schedule(
-            this::close,
-            shutoffAt.getMillis() - System.currentTimeMillis(),
-            TimeUnit.MILLISECONDS
-        );
+        DateTime shutoffAt = shutoffTimeMillis == null ? DateTimes.nowUtc() : DateTimes.of(shutoffTimeMillis);
+        log.info("Setting Firehose shutoffTime to %s", shutoffTimeMillis);
+        long shutoffTimeoutMillis = Math.max(shutoffAt.getMillis() - System.currentTimeMillis(), 0);
+
+        requestedShutdownTimeNs = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(shutoffTimeoutMillis);
+        Thread delayedCloseExecutor;
+        // Need to interrupt delayedCloseExecutor because a newly specified shutdown time might be closer than idle
+        // timeout or previously specified shutdown. Interruption of delayedCloseExecutor lets it adjust the sleep time
+        // (see the logic of this thread in createDelayedCloseExecutor()).
+        boolean needToInterruptDelayedCloseExecutor = true;
+        synchronized (this) {
+          delayedCloseExecutor = this.delayedCloseExecutor;
+          if (delayedCloseExecutor == null) {
+            delayedCloseExecutor = createDelayedCloseExecutor();
+            // Don't need to interrupt a freshly created thread
+            needToInterruptDelayedCloseExecutor = false;
+          }
+        }
+        if (needToInterruptDelayedCloseExecutor) {
+          delayedCloseExecutor.interrupt();
+        }
         return Response.ok().build();
       }
       catch (IllegalArgumentException e) {
@@ -429,7 +580,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
     }
 
     @VisibleForTesting
-    public boolean isClosed()
+    boolean isClosed()
     {
       return closed;
     }
@@ -437,14 +588,17 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
     /**
      * Checks the request for a producer ID and sequence value.  If the producer ID is specified, a corresponding
      * sequence value must be specified as well.  If the incoming sequence is less than or equal to the last seen
-     * sequence for that producer ID, the request is ignored
+     * sequence for that producer ID, the request is ignored.
+     *
+     * This method might be called concurrently from multiple threads.
      *
      * @param req Http request
      * @param responseContentType Response content type
      * @param responseMapper Response object mapper
-     * @return Optional of a response to return of an empty optional if the request can proceed
+     * @return an error response to return or null if the request can proceed
      */
-    private Optional<Response> checkProducerSequence(
+    @Nullable
+    private Response checkProducerSequence(
         final HttpServletRequest req,
         final String responseContentType,
         final ObjectMapper responseMapper
@@ -453,61 +607,57 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
       final String producerId = req.getHeader("X-Firehose-Producer-Id");
 
       if (producerId == null) {
-        return Optional.empty();
+        return null;
       }
 
       final String sequenceValue = req.getHeader("X-Firehose-Producer-Seq");
 
       if (sequenceValue == null) {
-        return Optional.of(
-            Response.status(Response.Status.BAD_REQUEST)
-                       .entity(ImmutableMap.<String, Object>of("error", "Producer sequence value is missing"))
-                       .build()
-        );
+        return Response
+            .status(Response.Status.BAD_REQUEST)
+            .entity(ImmutableMap.<String, Object>of("error", "Producer sequence value is missing"))
+            .build();
       }
 
       Long producerSequence = producerSequences.computeIfAbsent(producerId, key -> Long.MIN_VALUE);
 
       if (producerSequences.size() >= MAX_FIREHOSE_PRODUCERS) {
-        return Optional.of(
-            Response.status(Response.Status.FORBIDDEN)
-                    .entity(
-                        ImmutableMap.<String, Object>of(
-                            "error",
-                            "Too many individual producer IDs for this firehose.  Max is " + MAX_FIREHOSE_PRODUCERS
-                        )
-                    )
-                    .build()
-        );
+        return Response
+            .status(Response.Status.FORBIDDEN)
+            .entity(
+                ImmutableMap.of(
+                    "error",
+                    "Too many individual producer IDs for this firehose.  Max is " + MAX_FIREHOSE_PRODUCERS
+                )
+            )
+            .build();
       }
 
       try {
         Long newSequence = Long.parseLong(sequenceValue);
-        if (newSequence <= producerSequence) {
-          return Optional.of(
-              Response.ok(
-                  responseMapper.writeValueAsString(
-                      ImmutableMap.of("eventCount", 0, "skipped", true)
-                  ),
-                  responseContentType
-              ).build()
-          );
-        }
 
-        producerSequences.put(producerId, newSequence);
+        while (true) {
+          if (newSequence <= producerSequence) {
+            return Response.ok(
+                responseMapper.writeValueAsString(ImmutableMap.of("eventCount", 0, "skipped", true)),
+                responseContentType
+            ).build();
+          }
+          if (producerSequences.replace(producerId, producerSequence, newSequence)) {
+            return null;
+          }
+          producerSequence = producerSequences.get(producerId);
+        }
       }
       catch (JsonProcessingException ex) {
-        throw Throwables.propagate(ex);
+        throw new RuntimeException(ex);
       }
       catch (NumberFormatException ex) {
-        return Optional.of(
-            Response.status(Response.Status.BAD_REQUEST)
-                    .entity(ImmutableMap.<String, Object>of("error", "Producer sequence must be a number"))
-                    .build()
-        );
+        return Response
+            .status(Response.Status.BAD_REQUEST)
+            .entity(ImmutableMap.<String, Object>of("error", "Producer sequence must be a number"))
+            .build();
       }
-
-      return Optional.empty();
     }
   }
 }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java
index d37265b..705b90e 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java
@@ -21,12 +21,14 @@ package org.apache.druid.segment.realtime.firehose;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.data.input.Firehose;
 import org.apache.druid.data.input.FirehoseFactory;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.utils.CloseableUtils;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
@@ -37,10 +39,14 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * Creates firehoses that shut off at a particular time. Useful for limiting the lifespan of a realtime job.
+ *
+ * Each {@link Firehose} created by this factory spins up and manages one thread for calling {@link Firehose#close()}
+ * asynchronously at the specified {@link #shutoffTime}.
  */
 public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowParser>
 {
   private static final EmittingLogger log = new EmittingLogger(FirehoseFactory.class);
+
   private final FirehoseFactory delegateFactory;
   private final DateTime shutoffTime;
 
@@ -63,31 +69,25 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowPars
   class TimedShutoffFirehose implements Firehose
   {
     private final Firehose firehose;
-    private final ScheduledExecutorService exec;
-    private final Object shutdownLock = new Object();
-    private volatile boolean shutdown = false;
+    private final ScheduledExecutorService shutdownExec;
+    @GuardedBy("this")
+    private boolean closed = false;
 
     TimedShutoffFirehose(InputRowParser parser, File temporaryDirectory) throws IOException
     {
       firehose = delegateFactory.connect(parser, temporaryDirectory);
 
-      exec = Execs.scheduledSingleThreaded("timed-shutoff-firehose-%d");
-
-      exec.schedule(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              log.info("Closing delegate firehose.");
-
-              shutdown = true;
-              try {
-                firehose.close();
-              }
-              catch (IOException e) {
-                log.warn(e, "Failed to close delegate firehose, ignoring.");
-              }
+      shutdownExec = Execs.scheduledSingleThreaded("timed-shutoff-firehose-%d");
+
+      shutdownExec.schedule(
+          () -> {
+            log.info("Closing delegate firehose.");
+
+            try {
+              TimedShutoffFirehose.this.close();
+            }
+            catch (IOException e) {
+              log.warn(e, "Failed to close delegate firehose, ignoring.");
             }
           },
           shutoffTime.getMillis() - System.currentTimeMillis(),
@@ -116,14 +116,16 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowPars
       return firehose.commit();
     }
 
+    /**
+     * This method is synchronized because it might be called concurrently from multiple threads: from {@link
+     * #shutdownExec}, and explicitly on this Firehose object.
+     */
     @Override
-    public void close() throws IOException
+    public synchronized void close() throws IOException
     {
-      synchronized (shutdownLock) {
-        if (!shutdown) {
-          shutdown = true;
-          firehose.close();
-        }
+      if (!closed) {
+        closed = true;
+        CloseableUtils.closeBoth(firehose, shutdownExec::shutdownNow);
       }
     }
   }
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseIdleTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseIdleTest.java
index 8d2232a..b625433 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseIdleTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseIdleTest.java
@@ -87,8 +87,22 @@ public class EventReceiverFirehoseIdleTest
   @Test(timeout = 40_000L)
   public void testIdle() throws Exception
   {
-    Thread.sleep(8_000L);
-    Assert.assertTrue(firehose.isClosed());
+    awaitFirehoseClosed();
+    awaitDelayedExecutorThreadTerminated();
+  }
+
+  private void awaitFirehoseClosed() throws InterruptedException
+  {
+    while (!firehose.isClosed()) {
+      Thread.sleep(50);
+    }
+  }
+
+  private void awaitDelayedExecutorThreadTerminated() throws InterruptedException
+  {
+    while (firehose.getDelayedCloseExecutor().getState() != Thread.State.TERMINATED) {
+      Thread.sleep(50);
+    }
   }
 
   @Test(timeout = 40_000L)
@@ -117,7 +131,7 @@ public class EventReceiverFirehoseIdleTest
       Thread.sleep(3_000L);
     }
 
-    Thread.sleep(5_000L);
-    Assert.assertTrue(firehose.isClosed());
+    awaitFirehoseClosed();
+    awaitDelayedExecutorThreadTerminated();
   }
 }
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
index 45b45bbe..e9c7ee3 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
@@ -57,7 +57,7 @@ public class EventReceiverFirehoseTest
 {
   private static final int CAPACITY = 300;
   private static final int NUM_EVENTS = 100;
-  private static final long MAX_IDLE_TIME = Long.MAX_VALUE;
+  private static final long MAX_IDLE_TIME_MILLIS = TimeUnit.SECONDS.toMillis(20);
   private static final String SERVICE_NAME = "test_firehose";
 
   private final String inputRow = "[{\n"
@@ -77,7 +77,7 @@ public class EventReceiverFirehoseTest
     eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory(
         SERVICE_NAME,
         CAPACITY,
-        MAX_IDLE_TIME,
+        MAX_IDLE_TIME_MILLIS,
         null,
         new DefaultObjectMapper(),
         new DefaultObjectMapper(),
@@ -100,8 +100,8 @@ public class EventReceiverFirehoseTest
     );
   }
 
-  @Test
-  public void testSingleThread() throws IOException
+  @Test(timeout = 60_000L)
+  public void testSingleThread() throws IOException, InterruptedException
   {
     for (int i = 0; i < NUM_EVENTS; ++i) {
       setUpRequestExpectations(null, null);
@@ -138,9 +138,10 @@ public class EventReceiverFirehoseTest
     Assert.assertFalse(firehose.hasMore());
     Assert.assertEquals(0, Iterables.size(register.getMetrics()));
 
+    awaitDelayedExecutorThreadTerminated();
   }
 
-  @Test
+  @Test(timeout = 60_000L)
   public void testMultipleThreads() throws InterruptedException, IOException, TimeoutException, ExecutionException
   {
     EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED))
@@ -210,6 +211,8 @@ public class EventReceiverFirehoseTest
     Assert.assertFalse(firehose.hasMore());
     Assert.assertEquals(0, Iterables.size(register.getMetrics()));
 
+    awaitDelayedExecutorThreadTerminated();
+
     executorService.shutdownNow();
   }
 
@@ -219,7 +222,7 @@ public class EventReceiverFirehoseTest
     EventReceiverFirehoseFactory eventReceiverFirehoseFactory2 = new EventReceiverFirehoseFactory(
         SERVICE_NAME,
         CAPACITY,
-        MAX_IDLE_TIME,
+        MAX_IDLE_TIME_MILLIS,
         null,
         new DefaultObjectMapper(),
         new DefaultObjectMapper(),
@@ -259,11 +262,24 @@ public class EventReceiverFirehoseTest
     EasyMock.replay(req);
 
     firehose.shutdown(DateTimes.nowUtc().minusMinutes(2).toString(), req);
+    awaitFirehoseClosed();
+    awaitDelayedExecutorThreadTerminated();
+  }
+
+  private void awaitFirehoseClosed() throws InterruptedException
+  {
     while (!firehose.isClosed()) {
       Thread.sleep(50);
     }
   }
 
+  private void awaitDelayedExecutorThreadTerminated() throws InterruptedException
+  {
+    while (firehose.getDelayedCloseExecutor().getState() != Thread.State.TERMINATED) {
+      Thread.sleep(50);
+    }
+  }
+
   @Test(timeout = 60_000L)
   public void testShutdown() throws Exception
   {
@@ -279,9 +295,8 @@ public class EventReceiverFirehoseTest
     EasyMock.replay(req);
 
     firehose.shutdown(DateTimes.nowUtc().plusMillis(100).toString(), req);
-    while (!firehose.isClosed()) {
-      Thread.sleep(50);
-    }
+    awaitFirehoseClosed();
+    awaitDelayedExecutorThreadTerminated();
   }
 
   @Test
@@ -322,7 +337,6 @@ public class EventReceiverFirehoseTest
     firehose.close();
     Assert.assertFalse(firehose.hasMore());
     Assert.assertEquals(0, Iterables.size(register.getMetrics()));
-
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org