You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by le...@apache.org on 2019/03/04 21:50:12 UTC
[incubator-druid] branch master updated: Fix and document
concurrency of EventReceiverFirehose and TimedShutoffFirehose;
Refine concurrency specification of Firehose (#7038)
This is an automated email from the ASF dual-hosted git repository.
leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 10c9f6d Fix and document concurrency of EventReceiverFirehose and TimedShutoffFirehose; Refine concurrency specification of Firehose (#7038)
10c9f6d is described below
commit 10c9f6d7086fc2bbccb8673bce55b1e711d131e7
Author: Roman Leventov <le...@gmail.com>
AuthorDate: Mon Mar 4 18:50:03 2019 -0300
Fix and document concurrency of EventReceiverFirehose and TimedShutoffFirehose; Refine concurrency specification of Firehose (#7038)
#### `EventReceiverFirehoseFactory`
Fixed several concurrency bugs in `EventReceiverFirehoseFactory`:
- Race condition over putting an entry into `producerSequences` in `checkProducerSequence()`.
- `Stopwatch` used to measure time across threads, but it's a non-thread-safe class.
- Use `System.nanoTime()` instead of `System.currentTimeMillis()` because the latter are [not suitable](https://stackoverflow.com/a/351571/648955) for measuring time intervals.
- `close()` was not synchronized by could be called from multiple threads concurrently.
Removed unnecessary `readLock` (protecting `hasMore()` and `nextRow()` which are always called from a single thread). Removed unnecessary `volatile` modifiers.
Documented threading model and concurrent control flow of `EventReceiverFirehose` instances.
**Important:** please read the updated Javadoc for `EventReceiverFirehose.addAll()`. It allows events from different requests (batches) to be interleaved in the buffer. Is this OK?
#### `TimedShutoffFirehoseFactory`
- Fixed a race condition that was possible because `close()` that was not properly synchronized.
Documented threading model and concurrent control flow of `TimedShutoffFirehose` instances.
#### `Firehose`
Refined concurrency contract of `Firehose` based on `EventReceiverFirehose` implementation. Importantly, now it states that `close()` doesn't affect `hasMore()` and `nextRow()` and could be called concurrently with them. In other words, specified that `close()` is for "row supply" side rather than "row consume" side. However, I didn't check that other `Firehose` implementatations adhere to this contract.
<hr>
This issue is the result of reviewing `EventReceiverFirehose` and `TimedShutoffFirehose` using [this checklist](https://medium.com/@leventov/code-review-checklist-java-concurrency-49398c326154).
---
.idea/inspectionProfiles/Druid.xml | 4 +-
.../org/apache/druid/concurrent/LifecycleLock.java | 6 +-
.../java/org/apache/druid/concurrent/Threads.java | 59 +++
.../java/org/apache/druid/data/input/Firehose.java | 41 +-
.../org/apache/druid/data/input/FirehoseV2.java | 7 +-
.../org/apache/druid/utils/CloseableUtils.java | 49 +++
docs/content/ingestion/firehose.md | 5 +-
.../firehose/EventReceiverFirehoseFactory.java | 460 ++++++++++++++-------
.../firehose/TimedShutoffFirehoseFactory.java | 54 +--
.../firehose/EventReceiverFirehoseIdleTest.java | 22 +-
.../firehose/EventReceiverFirehoseTest.java | 34 +-
11 files changed, 525 insertions(+), 216 deletions(-)
diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml
index 70a6a49..b2dcda3 100644
--- a/.idea/inspectionProfiles/Druid.xml
+++ b/.idea/inspectionProfiles/Druid.xml
@@ -46,6 +46,7 @@
<inspection_tool class="EqualsUsesNonFinalVariable" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="EqualsWhichDoesntCheckParameterClass" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="EqualsWithItself" enabled="true" level="ERROR" enabled_by_default="true" />
+ <inspection_tool class="FieldAccessNotGuarded" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="FieldCanBeLocal" enabled="true" level="WARNING" enabled_by_default="true">
<option name="EXCLUDE_ANNOS">
<value>
@@ -119,7 +120,7 @@
<inspection_tool class="NumberEquality" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="ObjectEquality" enabled="true" level="WARNING" enabled_by_default="true">
<option name="m_ignoreEnums" value="true" />
- <option name="m_ignoreClassObjects" value="false" />
+ <option name="m_ignoreClassObjects" value="true" />
<option name="m_ignorePrivateConstructors" value="false" />
</inspection_tool>
<inspection_tool class="ObjectEqualsNull" enabled="true" level="ERROR" enabled_by_default="true" />
@@ -356,6 +357,5 @@
<option name="ADD_SERVLET_TO_ENTRIES" value="true" />
<option name="ADD_NONJAVA_TO_ENTRIES" value="true" />
</inspection_tool>
- <inspection_tool class="FieldAccessNotGuarded" enabled="true" level="ERROR" enabled_by_default="true" />
</profile>
</component>
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/druid/concurrent/LifecycleLock.java b/core/src/main/java/org/apache/druid/concurrent/LifecycleLock.java
index ed6fea2..021c413 100644
--- a/core/src/main/java/org/apache/druid/concurrent/LifecycleLock.java
+++ b/core/src/main/java/org/apache/druid/concurrent/LifecycleLock.java
@@ -23,9 +23,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
- * A synchronization tool for lifecycled objects (see {@link org.apache.druid.java.util.common.lifecycle.Lifecycle}, that need
- * happens-before between start() and other methods and/or to check that the object was successfully started in other
- * methods.
+ * A synchronization tool for lifecycled objects (see {@link org.apache.druid.java.util.common.lifecycle.Lifecycle},
+ * that need happens-before between start() and other methods and/or to check that the object was successfully started
+ * in other methods.
*
* Guarantees in terms of JMM: happens-before between {@link #exitStart()} and {@link #awaitStarted()},
* exitStart() and {@link #canStop()}, if it returns {@code true}.
diff --git a/core/src/main/java/org/apache/druid/concurrent/Threads.java b/core/src/main/java/org/apache/druid/concurrent/Threads.java
new file mode 100644
index 0000000..fe8e79f
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/concurrent/Threads.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.concurrent;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+public final class Threads
+{
+
+ /**
+ * Equivalent of {@link Thread#sleep(long)} with arguments and semantics of timed wait methods in classes from {@link
+ * java.util.concurrent} (like {@link java.util.concurrent.Semaphore#tryAcquire(long, TimeUnit)},
+ * {@link java.util.concurrent.locks.Lock#tryLock(long, TimeUnit)}, etc.): if the sleepTime argument is negative or
+ * zero, the method returns immediately. {@link Thread#sleep}, on the contrary, throws an IllegalArgumentException if
+ * the argument is negative and attempts to unschedule the thread if the argument is zero.
+ *
+ * @throws InterruptedException if the current thread is interrupted when this method is called or during sleeping.
+ */
+ public static void sleepFor(long sleepTime, TimeUnit unit) throws InterruptedException
+ {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ if (sleepTime <= 0) {
+ return;
+ }
+ long sleepTimeLimitNanos = System.nanoTime() + unit.toNanos(sleepTime);
+ while (true) {
+ long sleepTimeoutNanos = sleepTimeLimitNanos - System.nanoTime();
+ if (sleepTimeoutNanos <= 0) {
+ return;
+ }
+ LockSupport.parkNanos(sleepTimeoutNanos);
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ }
+ }
+
+ private Threads() {}
+}
diff --git a/core/src/main/java/org/apache/druid/data/input/Firehose.java b/core/src/main/java/org/apache/druid/data/input/Firehose.java
index 7886cdc..39556a8 100644
--- a/core/src/main/java/org/apache/druid/data/input/Firehose.java
+++ b/core/src/main/java/org/apache/druid/data/input/Firehose.java
@@ -23,19 +23,24 @@ import org.apache.druid.guice.annotations.ExtensionPoint;
import javax.annotation.Nullable;
import java.io.Closeable;
+import java.io.IOException;
/**
* This is an interface that holds onto the stream of incoming data. Realtime data ingestion is built around this
- * abstraction. In order to add a new type of source for realtime data ingestion, all you need to do is implement
- * one of these and register it with the Main.
+ * abstraction.
*
* This object acts a lot like an Iterator, but it doesn't extend the Iterator interface because it extends
- * Closeable and it is very important that the close() method doesn't get forgotten, which is easy to do if this
- * gets passed around as an Iterator.
- * <p>
- * The implementation of this interface only needs to be minimally thread-safe. The three methods ##hasMore(),
- * ##nextRow() and ##commit() are all called from the same thread. ##commit(), however, returns a callback
- * which will be called on another thread, so the operations inside of that callback must be thread-safe.
+ * Closeable and it is very important that the {@link #close()} method doesn't get forgotten, which is easy to do if
+ * this gets passed around as an Iterator. Note that {@link #close()} doesn't cut the stream of rows for Firehose users
+ * immediately, but rather stops the supply of new rows into internal buffers. {@link #hasMore()} and {@link #nextRow()}
+ * are expected to operate for some time after (or concurrently with) {@link #close()} until the buffered events (if
+ * any) run out.
+ *
+ * Concurrency:
+ * The three methods {@link #hasMore()}, {@link #nextRow()} and {@link #commit()} are all called from the same thread.
+ * {@link #commit()}, however, returns a callback which will be called on another thread. {@link #close()} might be
+ * called concurrently from a thread different from the thread calling {@link #hasMore()}, {@link #nextRow()} and {@link
+ * #commit()}.
* </p>
*/
@ExtensionPoint
@@ -43,8 +48,8 @@ public interface Firehose extends Closeable
{
/**
* Returns whether there are more rows to process. This is used to indicate that another item is immediately
- * available via ##nextRow(). Thus, if the stream is still available but there are no new messages on it, this call
- * should block until a new message is available.
+ * available via {@link #nextRow()}. Thus, if the stream is still available but there are no new messages on it, this
+ * call should block until a new message is available.
*
* If something happens such that the stream is no longer available, this should return false.
*
@@ -77,8 +82,22 @@ public interface Firehose extends Closeable
* A simple implementation of this interface might do nothing when run() is called
* (in which case the same do-nothing instance can be returned every time), or
* a more complex implementation might clean up temporary resources that are no longer needed
- * because of InputRows delivered by prior calls to ##nextRow().
+ * because of InputRows delivered by prior calls to {@link #nextRow()}.
* </p>
*/
Runnable commit();
+
+ /**
+ * Closes the "ingestion side" of the Firehose, potentially concurrently with calls to {@link #hasMore()}, {@link
+ * #nextRow()} and {@link #commit()} being made from a different thread. {@link #hasMore()} and {@link #nextRow()}
+ * continue to work after close(), but since the ingestion side is closed rows will eventually run out.
+ *
+ * The effects of calling run() on the {@link Runnable} object returned from {@link #commit()} (in other words,
+ * doing the commit) concurrently or after close() are unspecified: commit may not be performed silently (that is,
+ * run() call completes without an Exception, but the commit is not actually done), or a error may result. Note that
+ * {@link #commit()} method itself can be called concurrently with close(), but it doesn't make much sense, because
+ * run() on the returned Runnable then can't be called.
+ */
+ @Override
+ void close() throws IOException;
}
diff --git a/core/src/main/java/org/apache/druid/data/input/FirehoseV2.java b/core/src/main/java/org/apache/druid/data/input/FirehoseV2.java
index 6c2ddae..c6aa33f 100644
--- a/core/src/main/java/org/apache/druid/data/input/FirehoseV2.java
+++ b/core/src/main/java/org/apache/druid/data/input/FirehoseV2.java
@@ -42,9 +42,10 @@ import java.io.Closeable;
* Closeable and it is very important that the close() method doesn't get forgotten, which is easy to do if this
* gets passed around as an Iterator.
*
- * The implementation of this interface only needs to be minimally thread-safe. The methods ##start(), ##advance(),
- * ##currRow() and ##makeCommitter() are all called from the same thread. ##makeCommitter(), however, returns a callback
- * which will be called on another thread, so the operations inside of that callback must be thread-safe.
+ * The implementation of this interface only needs to be minimally thread-safe. The methods {@link #start()}, {@link
+ * #advance()}, {@link #currRow()} and {@link #makeCommitter()} are all called from the same thread. {@link
+ * #makeCommitter()}, however, returns a callback which will be called on another thread, so the operations inside of
+ * that callback must be thread-safe.
*/
@ExtensionPoint
public interface FirehoseV2 extends Closeable
diff --git a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java
new file mode 100644
index 0000000..03a4f2c
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.utils;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Methods in this class could have belonged to {@link org.apache.druid.java.util.common.io.Closer}, but not editing
+ * that class to keep its source close to Guava source.
+ */
+public final class CloseableUtils
+{
+ /**
+ * Call method instead of code like
+ *
+ * first.close();
+ * second.close();
+ *
+ * to have safety of {@link org.apache.druid.java.util.common.io.Closer}, but without associated boilerplate code
+ * of creating a Closer and registering objects in it.
+ */
+ public static void closeBoth(Closeable first, Closeable second) throws IOException
+ {
+ //noinspection EmptyTryBlock
+ try (Closeable ignore1 = second; Closeable ignore2 = first) {
+ // piggy-back try-with-resources semantics
+ }
+ }
+
+ private CloseableUtils() {}
+}
diff --git a/docs/content/ingestion/firehose.md b/docs/content/ingestion/firehose.md
index faa83f3..322a1f2 100644
--- a/docs/content/ingestion/firehose.md
+++ b/docs/content/ingestion/firehose.md
@@ -191,8 +191,9 @@ When using this firehose, events can be sent by submitting a POST request to the
|property|description|required?|
|--------|-----------|---------|
|type|This should be "receiver"|yes|
-|serviceName|name used to announce the event receiver service endpoint|yes|
-|bufferSize| size of buffer used by firehose to store events|no default(100000)|
+|serviceName|Name used to announce the event receiver service endpoint|yes|
+|maxIdleTime|A firehose is automatically shut down after not receiving any events for this period of time, in milliseconds. If not specified, a firehose is never shut down due to being idle. Zero and negative values have the same effect.|no|
+|bufferSize|Size of buffer used by firehose to store events|no, default is 100000|
Shut down time for EventReceiverFirehose can be specified by submitting a POST request to
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
index 09751f9..b8ed0ad 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
@@ -28,10 +28,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CountingInputStream;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.concurrent.Threads;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
@@ -39,7 +40,6 @@ import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.metrics.EventReceiverFirehoseMetric;
import org.apache.druid.server.metrics.EventReceiverFirehoseRegister;
@@ -50,6 +50,7 @@ import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.utils.Runnables;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@@ -70,12 +71,9 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -85,16 +83,29 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowParser<Map<String, Object>>>
{
+ private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
+
public static final int MAX_FIREHOSE_PRODUCERS = 10_000;
- private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
private static final int DEFAULT_BUFFER_SIZE = 100_000;
- private static final long DEFAULT_MAX_IDLE_TIME = Long.MAX_VALUE;
+
+ /**
+ * A "poison pill" object for {@link EventReceiverFirehose}'s internal buffer.
+ */
+ private static final Object FIREHOSE_CLOSED = new Object();
private final String serviceName;
private final int bufferSize;
- private final long maxIdleTime;
- private final Optional<ChatHandlerProvider> chatHandlerProvider;
+
+ /**
+ * Doesn't really support max idle times finer than 1 second due to how {@link
+ * EventReceiverFirehose#delayedCloseExecutor} is implemented, see a comment inside {@link
+ * EventReceiverFirehose#createDelayedCloseExecutor()}. This aspect is not reflected in docs because it's unlikely
+ * that anybody configures or cares about finer max idle times, and also because this is an implementation detail of
+ * {@link EventReceiverFirehose} that may change in the future.
+ */
+ private final long maxIdleTimeMillis;
+ private final @Nullable ChatHandlerProvider chatHandlerProvider;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final EventReceiverFirehoseRegister eventReceiverFirehoseRegister;
@@ -104,7 +115,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
public EventReceiverFirehoseFactory(
@JsonProperty("serviceName") String serviceName,
@JsonProperty("bufferSize") Integer bufferSize,
- @JsonProperty("maxIdleTime") Long maxIdleTime,
+ // Keeping the legacy 'maxIdleTime' property name for backward compatibility. When the project is updated to
+ // Jackson 2.9 it could be changed, see https://github.com/apache/incubator-druid/issues/7152
+ @JsonProperty("maxIdleTime") @Nullable Long maxIdleTimeMillis,
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject @Json ObjectMapper jsonMapper,
@JacksonInject @Smile ObjectMapper smileMapper,
@@ -116,9 +129,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
this.serviceName = serviceName;
this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize;
- this.maxIdleTime = maxIdleTime == null || maxIdleTime <= 0 ?
- DEFAULT_MAX_IDLE_TIME : maxIdleTime;
- this.chatHandlerProvider = Optional.ofNullable(chatHandlerProvider);
+ this.maxIdleTimeMillis = (maxIdleTimeMillis == null || maxIdleTimeMillis <= 0) ? Long.MAX_VALUE : maxIdleTimeMillis;
+ this.chatHandlerProvider = chatHandlerProvider;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.eventReceiverFirehoseRegister = eventReceiverFirehoseRegister;
@@ -134,12 +146,12 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
log.info("Connecting firehose: %s", serviceName);
final EventReceiverFirehose firehose = new EventReceiverFirehose(firehoseParser);
- if (chatHandlerProvider.isPresent()) {
- log.info("Found chathandler of class[%s]", chatHandlerProvider.get().getClass().getName());
- chatHandlerProvider.get().register(serviceName, firehose);
+ if (chatHandlerProvider != null) {
+ log.info("Found chathandler of class[%s]", chatHandlerProvider.getClass().getName());
+ chatHandlerProvider.register(serviceName, firehose);
int lastIndexOfColon = serviceName.lastIndexOf(':');
if (lastIndexOfColon > 0) {
- chatHandlerProvider.get().register(serviceName.substring(lastIndexOfColon + 1), firehose);
+ chatHandlerProvider.register(serviceName.substring(lastIndexOfColon + 1), firehose);
}
} else {
log.warn("No chathandler detected");
@@ -162,62 +174,178 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
return bufferSize;
}
- @JsonProperty
- public long getMaxIdleTime()
+ /**
+ * Keeping the legacy 'maxIdleTime' property name for backward compatibility. When the project is updated to Jackson
+ * 2.9 it could be changed, see https://github.com/apache/incubator-druid/issues/7152
+ */
+ @JsonProperty("maxIdleTime")
+ public long getMaxIdleTimeMillis()
{
- return maxIdleTime;
+ return maxIdleTimeMillis;
}
+ /**
+ * Apart from adhering to {@link Firehose} contract regarding concurrency, this class has two methods that might be
+ * called concurrently with any other methods and each other, from arbitrary number of threads: {@link #addAll} and
+ * {@link #shutdown}.
+ *
+ * Concurrent data flow: in {@link #addAll} (can be called concurrently with any other methods and other calls to
+ * {@link #addAll}) rows are pushed into {@link #buffer}. The single Firehose "consumer" thread calls {@link #hasMore}
+ * and {@link #nextRow()}, where rows are taken out from the other end of the {@link #buffer} queue.
+ *
+ * This class creates and manages one thread ({@link #delayedCloseExecutor}) for calling {@link #close()}
+ * asynchronously in response to a {@link #shutdown} request, or after this Firehose has been idle (no calls to {@link
+ * #addAll}) for {@link #maxIdleTimeMillis}.
+ */
+ @VisibleForTesting
public class EventReceiverFirehose implements ChatHandler, Firehose, EventReceiverFirehoseMetric
{
- private final ScheduledExecutorService exec;
- private final ExecutorService idleDetector;
- private final BlockingQueue<InputRow> buffer;
- private final InputRowParser<Map<String, Object>> parser;
+ /**
+ * How does this thread work (and its interruption policy) is described in the comment for {@link
+ * #createDelayedCloseExecutor}.
+ */
+ @GuardedBy("this")
+ private @Nullable Thread delayedCloseExecutor;
- private final Object readLock = new Object();
+ /**
+ * Contains {@link InputRow} objects, the last one is {@link #FIREHOSE_CLOSED} which is a "poison pill". Poison pill
+ * is used to notify the thread that calls {@link #hasMore()} and {@link #nextRow()} that the EventReceiverFirehose
+ * is closed without heuristic 500 ms timed blocking in a loop instead of a simple {@link BlockingQueue#take()}
+ * call (see {@link #hasMore} code).
+ */
+ private final BlockingQueue<Object> buffer;
+ private final InputRowParser<Map<String, Object>> parser;
- private volatile InputRow nextRow = null;
+ /**
+ * This field needs to be volatile to ensure progress in {@link #addRows} method where it is read in a loop, and
+ * also in testing code calling {@link #isClosed()}.
+ */
private volatile boolean closed = false;
+
+ /**
+ * This field and {@link #rowsRunOut} are not volatile because they are accessed only from {@link #hasMore()} and
+ * {@link #nextRow()} methods that are called from a single thread according to {@link Firehose} spec.
+ */
+ private InputRow nextRow = null;
+ private boolean rowsRunOut = false;
+
private final AtomicLong bytesReceived = new AtomicLong(0);
- private final AtomicLong lastBufferAddFailMsgTime = new AtomicLong(0);
+ private final AtomicLong lastBufferAddFailLoggingTimeNs = new AtomicLong(System.nanoTime());
private final ConcurrentHashMap<String, Long> producerSequences = new ConcurrentHashMap<>();
- private final Stopwatch idleWatch = Stopwatch.createUnstarted();
- public EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)
+ /**
+ * This field and {@link #requestedShutdownTimeNs} use nanoseconds instead of milliseconds not to deal with the fact
+ * that {@link System#currentTimeMillis()} can "go backward", e. g. due to time correction on the server.
+ *
+ * This field and {@link #requestedShutdownTimeNs} must be volatile because they are de facto lazily initialized
+ * fields that are used concurrently in {@link #delayedCloseExecutor} (see {@link #createDelayedCloseExecutor()}).
+ * If they were not volatile, NPE would be possible in {@link #delayedCloseExecutor}. See
+ * https://shipilev.net/blog/2016/close-encounters-of-jmm-kind/#wishful-hb-actual for explanations.
+ */
+ private volatile Long idleCloseTimeNs = null;
+ private volatile Long requestedShutdownTimeNs = null;
+
+ EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)
{
this.buffer = new ArrayBlockingQueue<>(bufferSize);
this.parser = parser;
- exec = Execs.scheduledSingleThreaded("event-receiver-firehose-%d");
- idleDetector = Execs.singleThreaded("event-receiver-firehose-idle-detector-%d");
- idleDetector.submit(() -> {
- long idled;
- try {
- while ((idled = idleWatch.elapsed(TimeUnit.MILLISECONDS)) < maxIdleTime) {
- Thread.sleep(maxIdleTime - idled);
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
+
+ if (maxIdleTimeMillis != Long.MAX_VALUE) {
+ idleCloseTimeNs = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(maxIdleTimeMillis);
+ synchronized (this) {
+ createDelayedCloseExecutor();
}
- log.info("Firehose has been idle for %d ms, closing.", idled);
- close();
- });
- idleWatch.start();
+ }
}
+ @VisibleForTesting
+ synchronized @Nullable Thread getDelayedCloseExecutor()
+ {
+ return delayedCloseExecutor;
+ }
+
+ /**
+ * Creates and starts a {@link #delayedCloseExecutor} thread, either right from the EventReceiverFirehose's
+ * constructor if {@link #maxIdleTimeMillis} is specified, or otherwise lazily from {@link #shutdown}.
+ *
+ * The thread waits until the time when the Firehose should be closed because either {@link #addAll} was not called
+ * for the specified max idle time (see {@link #idleCloseTimeNs}), or until the shutoff time requested last
+ * via {@link #shutdown} (see {@link #requestedShutdownTimeNs}), whatever is sooner. Then the thread does
+ * two things:
+ * 1. if the Firehose is already closed (or in the process of closing, but {@link #closed} flag is already set), it
+ * silently exits.
+ * 2. It checks both deadlines again:
+ * a) if either of them has arrived, it calls {@link #close()} and exits.
+ * b) otherwise, it waits until the nearest deadline again, and so on in a loop.
+ *
+ * This way the thread works predictably and robustly regardless of how both deadlines change (for example, shutoff
+ * time specified via {@link #shutdown} may jump in both directions).
+ *
+ * Other methods notify {@link #delayedCloseExecutor} that the Firehose state in some way that is important for this
+ * thread (that is, when {@link #close()} is called, {@link #delayedCloseExecutor} is no longer needed and should
+ * exit as soon as possible to release system resources; when {@link #shutdown} is called, the thread may need to
+ * wake up sooner if the shutoff time has been moved sooner) by simply interrupting it. The thread wakes up and
+ * continues its loop.
+ */
+ @GuardedBy("this")
+ private Thread createDelayedCloseExecutor()
+ {
+ Thread delayedCloseExecutor = new Thread(
+ () -> {
+ // The closed = true is visible after close() because there is a happens-before edge between
+ // delayedCloseExecutor.interrupt() call in close() and catching InterruptedException below in this loop.
+ while (!closed) {
+ if (idleCloseTimeNs == null && requestedShutdownTimeNs == null) {
+ // This is not possible unless there are bugs in the code of EventReceiverFirehose. AssertionError could
+ // have been thrown instead, but it doesn't seem to make a lot of sense in a background thread. Instead,
+ // we long the error and continue a loop after some pause.
+ log.error(
+ "Either idleCloseTimeNs or requestedShutdownTimeNs must be non-null. "
+ + "Please file a bug at https://github.com/apache/incubator-druid/issues"
+ );
+ }
+ if (idleCloseTimeNs != null && idleCloseTimeNs - System.nanoTime() <= 0) { // overflow-aware comparison
+ log.info("Firehose has been idle for %d ms, closing.", maxIdleTimeMillis);
+ close();
+ } else if (requestedShutdownTimeNs != null &&
+ requestedShutdownTimeNs - System.nanoTime() <= 0) { // overflow-aware comparison
+ log.info("Closing Firehose after a shutdown request");
+ close();
+ }
+ try {
+ // It is possible to write code that sleeps until the next the next idleCloseTimeNs or
+ // requestedShutdownTimeNs, whatever is non-null and sooner, but that's fairly complicated code. That
+ // complexity perhaps overweighs the minor inefficiency of simply waking up every second.
+ Threads.sleepFor(1, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException ignore) {
+ // Interruption is a wakeup, continue the loop
+ }
+ }
+ },
+ "event-receiver-firehose-closer"
+ );
+ delayedCloseExecutor.setDaemon(true);
+ this.delayedCloseExecutor = delayedCloseExecutor;
+ delayedCloseExecutor.start();
+ return delayedCloseExecutor;
+ }
+
+ /**
+ * This method might be called concurrently from multiple threads, if multiple requests arrive to the server at the
+ * same time (possibly exact duplicates). Concurrency is controlled in {@link #checkProducerSequence}, where only
+ * requests with "X-Firehose-Producer-Seq" number greater than the max "X-Firehose-Producer-Seq" in previously
+ * arrived requests are allowed to proceed. After that check requests don't synchronize with each other and
+ * therefore if two large batches are sent with little interval, the events from the batches might be mixed up in
+ * {@link #buffer} (if two {@link #addRows(Iterable)} are executed concurrently).
+ */
@POST
@Path("/push-events")
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
- public Response addAll(
- InputStream in,
- @Context final HttpServletRequest req
- )
+ public Response addAll(InputStream in, @Context final HttpServletRequest req) throws JsonProcessingException
{
- idleWatch.reset();
- idleWatch.start();
+ idleCloseTimeNs = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(maxIdleTimeMillis);
Access accessResult = AuthorizationUtils.authorizeResourceAction(
req,
new ResourceAction(
@@ -236,9 +364,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
- Optional<Response> producerSequenceResponse = checkProducerSequence(req, reqContentType, objectMapper);
- if (producerSequenceResponse.isPresent()) {
- return producerSequenceResponse.get();
+ Response producerSequenceResponse = checkProducerSequence(req, reqContentType, objectMapper);
+ if (producerSequenceResponse != null) {
+ return producerSequenceResponse;
}
CountingInputStream countingInputStream = new CountingInputStream(in);
@@ -274,67 +402,59 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw Throwables.propagate(e);
- }
- catch (JsonProcessingException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
@Override
public boolean hasMore()
{
- synchronized (readLock) {
- try {
- while (nextRow == null) {
- nextRow = buffer.poll(500, TimeUnit.MILLISECONDS);
- if (closed) {
- break;
- }
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw Throwables.propagate(e);
- }
-
- return nextRow != null;
+ if (rowsRunOut) {
+ return false;
+ }
+ if (nextRow != null) {
+ return true;
+ }
+ Object next;
+ try {
+ next = buffer.take();
}
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ //noinspection ObjectEquality
+ if (next == FIREHOSE_CLOSED) {
+ rowsRunOut = true;
+ return false;
+ }
+ nextRow = (InputRow) next;
+ return true;
}
@Nullable
@Override
public InputRow nextRow()
{
- synchronized (readLock) {
- final InputRow row = nextRow;
-
- if (row == null) {
- throw new NoSuchElementException();
- } else {
- nextRow = null;
- return row;
- }
+ final InputRow row = nextRow;
+
+ if (row == null) {
+ throw new NoSuchElementException();
+ } else {
+ nextRow = null;
+ return row;
}
}
@Override
public Runnable commit()
{
- return new Runnable()
- {
- @Override
- public void run()
- {
- // Nothing
- }
- };
+ return Runnables.getNoopRunnable();
}
@Override
public int getCurrentBufferSize()
{
- // ArrayBlockingQueue's implementation of size() is thread-safe, so we can use that
return buffer.size();
}
@@ -350,34 +470,44 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
return bytesReceived.get();
}
+ /**
+ * This method is synchronized because it might be called concurrently from multiple threads: from {@link
+ * #delayedCloseExecutor}, and from the thread that creates and uses the Firehose object.
+ */
@Override
- public void close()
+ public synchronized void close()
{
- if (!closed) {
- log.info("Firehose closing.");
- closed = true;
+ if (closed) {
+ return;
+ }
+ closed = true;
+ log.info("Firehose closing.");
- eventReceiverFirehoseRegister.unregister(serviceName);
- if (chatHandlerProvider.isPresent()) {
- chatHandlerProvider.get().unregister(serviceName);
- }
- exec.shutdown();
- idleDetector.shutdown();
- idleWatch.stop();
+ // Critical to add the poison pill to the queue, don't allow interruption.
+ Uninterruptibles.putUninterruptibly(buffer, FIREHOSE_CLOSED);
+
+ eventReceiverFirehoseRegister.unregister(serviceName);
+ if (chatHandlerProvider != null) {
+ chatHandlerProvider.unregister(serviceName);
+ }
+ if (delayedCloseExecutor != null && !delayedCloseExecutor.equals(Thread.currentThread())) {
+ // Interrupt delayedCloseExecutor to let it discover that closed flag is already set and exit.
+ delayedCloseExecutor.interrupt();
}
}
- // public for tests
- public void addRows(Iterable<InputRow> rows) throws InterruptedException
+ @VisibleForTesting
+ void addRows(Iterable<InputRow> rows) throws InterruptedException
{
for (final InputRow row : rows) {
boolean added = false;
while (!closed && !added) {
added = buffer.offer(row, 500, TimeUnit.MILLISECONDS);
if (!added) {
- long currTime = System.currentTimeMillis();
- long lastTime = lastBufferAddFailMsgTime.get();
- if (currTime - lastTime > 10000 && lastBufferAddFailMsgTime.compareAndSet(lastTime, currTime)) {
+ long currTimeNs = System.nanoTime();
+ long lastTimeNs = lastBufferAddFailLoggingTimeNs.get();
+ if (currTimeNs - lastTimeNs > TimeUnit.SECONDS.toNanos(10) &&
+ lastBufferAddFailLoggingTimeNs.compareAndSet(lastTimeNs, currTimeNs)) {
log.warn("Failed to add event to buffer with current size [%s] . Retrying...", buffer.size());
}
}
@@ -389,12 +519,19 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
}
}
+ /**
+ * This method might be called concurrently from multiple threads, if multiple shutdown requests arrive at the same
+ * time. No attempts are made to synchronize such requests, or prioritize them a-la "latest shutdown time wins" or
+ * "soonest shutdown time wins". {@link #delayedCloseExecutor}'s logic (see {@link #createDelayedCloseExecutor()})
+ * is indifferent to shutdown times jumping in arbitrary directions. But once a shutdown request is made, it can't
+ * be cancelled entirely, the shutdown time could only be rescheduled with a new request.
+ */
@POST
@Path("/shutdown")
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response shutdown(
- @QueryParam("shutoffTime") final String shutoffTime,
+ @QueryParam("shutoffTime") final String shutoffTimeMillis,
@Context final HttpServletRequest req
)
{
@@ -411,13 +548,27 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
}
try {
- DateTime shutoffAt = shutoffTime == null ? DateTimes.nowUtc() : DateTimes.of(shutoffTime);
- log.info("Setting Firehose shutoffTime to %s", shutoffTime);
- exec.schedule(
- this::close,
- shutoffAt.getMillis() - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS
- );
+ DateTime shutoffAt = shutoffTimeMillis == null ? DateTimes.nowUtc() : DateTimes.of(shutoffTimeMillis);
+ log.info("Setting Firehose shutoffTime to %s", shutoffTimeMillis);
+ long shutoffTimeoutMillis = Math.max(shutoffAt.getMillis() - System.currentTimeMillis(), 0);
+
+ requestedShutdownTimeNs = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(shutoffTimeoutMillis);
+ Thread delayedCloseExecutor;
+ // Need to interrupt delayedCloseExecutor because a newly specified shutdown time might be closer than idle
+ // timeout or previously specified shutdown. Interruption of delayedCloseExecutor lets it adjust the sleep time
+ // (see the logic of this thread in createDelayedCloseExecutor()).
+ boolean needToInterruptDelayedCloseExecutor = true;
+ synchronized (this) {
+ delayedCloseExecutor = this.delayedCloseExecutor;
+ if (delayedCloseExecutor == null) {
+ delayedCloseExecutor = createDelayedCloseExecutor();
+ // Don't need to interrupt a freshly created thread
+ needToInterruptDelayedCloseExecutor = false;
+ }
+ }
+ if (needToInterruptDelayedCloseExecutor) {
+ delayedCloseExecutor.interrupt();
+ }
return Response.ok().build();
}
catch (IllegalArgumentException e) {
@@ -429,7 +580,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
}
@VisibleForTesting
- public boolean isClosed()
+ boolean isClosed()
{
return closed;
}
@@ -437,14 +588,17 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
/**
* Checks the request for a producer ID and sequence value. If the producer ID is specified, a corresponding
* sequence value must be specified as well. If the incoming sequence is less than or equal to the last seen
- * sequence for that producer ID, the request is ignored
+ * sequence for that producer ID, the request is ignored.
+ *
+ * This method might be called concurrently from multiple threads.
*
* @param req Http request
* @param responseContentType Response content type
* @param responseMapper Response object mapper
- * @return Optional of a response to return of an empty optional if the request can proceed
+ * @return an error response to return or null if the request can proceed
*/
- private Optional<Response> checkProducerSequence(
+ @Nullable
+ private Response checkProducerSequence(
final HttpServletRequest req,
final String responseContentType,
final ObjectMapper responseMapper
@@ -453,61 +607,57 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
final String producerId = req.getHeader("X-Firehose-Producer-Id");
if (producerId == null) {
- return Optional.empty();
+ return null;
}
final String sequenceValue = req.getHeader("X-Firehose-Producer-Seq");
if (sequenceValue == null) {
- return Optional.of(
- Response.status(Response.Status.BAD_REQUEST)
- .entity(ImmutableMap.<String, Object>of("error", "Producer sequence value is missing"))
- .build()
- );
+ return Response
+ .status(Response.Status.BAD_REQUEST)
+ .entity(ImmutableMap.<String, Object>of("error", "Producer sequence value is missing"))
+ .build();
}
Long producerSequence = producerSequences.computeIfAbsent(producerId, key -> Long.MIN_VALUE);
if (producerSequences.size() >= MAX_FIREHOSE_PRODUCERS) {
- return Optional.of(
- Response.status(Response.Status.FORBIDDEN)
- .entity(
- ImmutableMap.<String, Object>of(
- "error",
- "Too many individual producer IDs for this firehose. Max is " + MAX_FIREHOSE_PRODUCERS
- )
- )
- .build()
- );
+ return Response
+ .status(Response.Status.FORBIDDEN)
+ .entity(
+ ImmutableMap.of(
+ "error",
+ "Too many individual producer IDs for this firehose. Max is " + MAX_FIREHOSE_PRODUCERS
+ )
+ )
+ .build();
}
try {
Long newSequence = Long.parseLong(sequenceValue);
- if (newSequence <= producerSequence) {
- return Optional.of(
- Response.ok(
- responseMapper.writeValueAsString(
- ImmutableMap.of("eventCount", 0, "skipped", true)
- ),
- responseContentType
- ).build()
- );
- }
- producerSequences.put(producerId, newSequence);
+ while (true) {
+ if (newSequence <= producerSequence) {
+ return Response.ok(
+ responseMapper.writeValueAsString(ImmutableMap.of("eventCount", 0, "skipped", true)),
+ responseContentType
+ ).build();
+ }
+ if (producerSequences.replace(producerId, producerSequence, newSequence)) {
+ return null;
+ }
+ producerSequence = producerSequences.get(producerId);
+ }
}
catch (JsonProcessingException ex) {
- throw Throwables.propagate(ex);
+ throw new RuntimeException(ex);
}
catch (NumberFormatException ex) {
- return Optional.of(
- Response.status(Response.Status.BAD_REQUEST)
- .entity(ImmutableMap.<String, Object>of("error", "Producer sequence must be a number"))
- .build()
- );
+ return Response
+ .status(Response.Status.BAD_REQUEST)
+ .entity(ImmutableMap.<String, Object>of("error", "Producer sequence must be a number"))
+ .build();
}
-
- return Optional.empty();
}
}
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java
index d37265b..705b90e 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java
@@ -21,12 +21,14 @@ package org.apache.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.utils.CloseableUtils;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@@ -37,10 +39,14 @@ import java.util.concurrent.TimeUnit;
/**
* Creates firehoses that shut off at a particular time. Useful for limiting the lifespan of a realtime job.
+ *
+ * Each {@link Firehose} created by this factory spins up and manages one thread for calling {@link Firehose#close()}
+ * asynchronously at the specified {@link #shutoffTime}.
*/
public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowParser>
{
private static final EmittingLogger log = new EmittingLogger(FirehoseFactory.class);
+
private final FirehoseFactory delegateFactory;
private final DateTime shutoffTime;
@@ -63,31 +69,25 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowPars
class TimedShutoffFirehose implements Firehose
{
private final Firehose firehose;
- private final ScheduledExecutorService exec;
- private final Object shutdownLock = new Object();
- private volatile boolean shutdown = false;
+ private final ScheduledExecutorService shutdownExec;
+ @GuardedBy("this")
+ private boolean closed = false;
TimedShutoffFirehose(InputRowParser parser, File temporaryDirectory) throws IOException
{
firehose = delegateFactory.connect(parser, temporaryDirectory);
- exec = Execs.scheduledSingleThreaded("timed-shutoff-firehose-%d");
-
- exec.schedule(
- new Runnable()
- {
- @Override
- public void run()
- {
- log.info("Closing delegate firehose.");
-
- shutdown = true;
- try {
- firehose.close();
- }
- catch (IOException e) {
- log.warn(e, "Failed to close delegate firehose, ignoring.");
- }
+ shutdownExec = Execs.scheduledSingleThreaded("timed-shutoff-firehose-%d");
+
+ shutdownExec.schedule(
+ () -> {
+ log.info("Closing delegate firehose.");
+
+ try {
+ TimedShutoffFirehose.this.close();
+ }
+ catch (IOException e) {
+ log.warn(e, "Failed to close delegate firehose, ignoring.");
}
},
shutoffTime.getMillis() - System.currentTimeMillis(),
@@ -116,14 +116,16 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowPars
return firehose.commit();
}
+ /**
+ * This method is synchronized because it might be called concurrently from multiple threads: from {@link
+ * #shutdownExec}, and explicitly on this Firehose object.
+ */
@Override
- public void close() throws IOException
+ public synchronized void close() throws IOException
{
- synchronized (shutdownLock) {
- if (!shutdown) {
- shutdown = true;
- firehose.close();
- }
+ if (!closed) {
+ closed = true;
+ CloseableUtils.closeBoth(firehose, shutdownExec::shutdownNow);
}
}
}
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseIdleTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseIdleTest.java
index 8d2232a..b625433 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseIdleTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseIdleTest.java
@@ -87,8 +87,22 @@ public class EventReceiverFirehoseIdleTest
@Test(timeout = 40_000L)
public void testIdle() throws Exception
{
- Thread.sleep(8_000L);
- Assert.assertTrue(firehose.isClosed());
+ awaitFirehoseClosed();
+ awaitDelayedExecutorThreadTerminated();
+ }
+
+ private void awaitFirehoseClosed() throws InterruptedException
+ {
+ while (!firehose.isClosed()) {
+ Thread.sleep(50);
+ }
+ }
+
+ private void awaitDelayedExecutorThreadTerminated() throws InterruptedException
+ {
+ while (firehose.getDelayedCloseExecutor().getState() != Thread.State.TERMINATED) {
+ Thread.sleep(50);
+ }
}
@Test(timeout = 40_000L)
@@ -117,7 +131,7 @@ public class EventReceiverFirehoseIdleTest
Thread.sleep(3_000L);
}
- Thread.sleep(5_000L);
- Assert.assertTrue(firehose.isClosed());
+ awaitFirehoseClosed();
+ awaitDelayedExecutorThreadTerminated();
}
}
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
index 45b45bbe..e9c7ee3 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
@@ -57,7 +57,7 @@ public class EventReceiverFirehoseTest
{
private static final int CAPACITY = 300;
private static final int NUM_EVENTS = 100;
- private static final long MAX_IDLE_TIME = Long.MAX_VALUE;
+ private static final long MAX_IDLE_TIME_MILLIS = TimeUnit.SECONDS.toMillis(20);
private static final String SERVICE_NAME = "test_firehose";
private final String inputRow = "[{\n"
@@ -77,7 +77,7 @@ public class EventReceiverFirehoseTest
eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory(
SERVICE_NAME,
CAPACITY,
- MAX_IDLE_TIME,
+ MAX_IDLE_TIME_MILLIS,
null,
new DefaultObjectMapper(),
new DefaultObjectMapper(),
@@ -100,8 +100,8 @@ public class EventReceiverFirehoseTest
);
}
- @Test
- public void testSingleThread() throws IOException
+ @Test(timeout = 60_000L)
+ public void testSingleThread() throws IOException, InterruptedException
{
for (int i = 0; i < NUM_EVENTS; ++i) {
setUpRequestExpectations(null, null);
@@ -138,9 +138,10 @@ public class EventReceiverFirehoseTest
Assert.assertFalse(firehose.hasMore());
Assert.assertEquals(0, Iterables.size(register.getMetrics()));
+ awaitDelayedExecutorThreadTerminated();
}
- @Test
+ @Test(timeout = 60_000L)
public void testMultipleThreads() throws InterruptedException, IOException, TimeoutException, ExecutionException
{
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED))
@@ -210,6 +211,8 @@ public class EventReceiverFirehoseTest
Assert.assertFalse(firehose.hasMore());
Assert.assertEquals(0, Iterables.size(register.getMetrics()));
+ awaitDelayedExecutorThreadTerminated();
+
executorService.shutdownNow();
}
@@ -219,7 +222,7 @@ public class EventReceiverFirehoseTest
EventReceiverFirehoseFactory eventReceiverFirehoseFactory2 = new EventReceiverFirehoseFactory(
SERVICE_NAME,
CAPACITY,
- MAX_IDLE_TIME,
+ MAX_IDLE_TIME_MILLIS,
null,
new DefaultObjectMapper(),
new DefaultObjectMapper(),
@@ -259,11 +262,24 @@ public class EventReceiverFirehoseTest
EasyMock.replay(req);
firehose.shutdown(DateTimes.nowUtc().minusMinutes(2).toString(), req);
+ awaitFirehoseClosed();
+ awaitDelayedExecutorThreadTerminated();
+ }
+
+ private void awaitFirehoseClosed() throws InterruptedException
+ {
while (!firehose.isClosed()) {
Thread.sleep(50);
}
}
+ private void awaitDelayedExecutorThreadTerminated() throws InterruptedException
+ {
+ while (firehose.getDelayedCloseExecutor().getState() != Thread.State.TERMINATED) {
+ Thread.sleep(50);
+ }
+ }
+
@Test(timeout = 60_000L)
public void testShutdown() throws Exception
{
@@ -279,9 +295,8 @@ public class EventReceiverFirehoseTest
EasyMock.replay(req);
firehose.shutdown(DateTimes.nowUtc().plusMillis(100).toString(), req);
- while (!firehose.isClosed()) {
- Thread.sleep(50);
- }
+ awaitFirehoseClosed();
+ awaitDelayedExecutorThreadTerminated();
}
@Test
@@ -322,7 +337,6 @@ public class EventReceiverFirehoseTest
firehose.close();
Assert.assertFalse(firehose.hasMore());
Assert.assertEquals(0, Iterables.size(register.getMetrics()));
-
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org