You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2018/08/21 22:29:44 UTC
[incubator-druid] branch master updated: Make time-related
variables more readable (#6158)
This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3647d4c Make time-related variables more readable (#6158)
3647d4c is described below
commit 3647d4c94ad52ba16ae9ab9ee583dad355f21fd2
Author: Benedict Jin <15...@qq.com>
AuthorDate: Wed Aug 22 06:29:40 2018 +0800
Make time-related variables more readable (#6158)
* Make time-related variables more readable
* Patch some improvements from the code reviewer
* Remove unnecessary boxing of Long type variables
---
.../java/io/druid/common/config/Log4jShutdown.java | 5 +++--
.../ambari/metrics/AmbariMetricsEmitterConfig.java | 8 +++----
.../io/druid/emitter/graphite/GraphiteEmitter.java | 4 ++--
.../emitter/graphite/GraphiteEmitterConfig.java | 10 ++++-----
.../KafkaEightSimpleConsumerFirehoseFactory.java | 5 +++--
.../druid/firehose/kafka/KafkaSimpleConsumer.java | 13 ++++++-----
.../datasketches/theta/SketchHolder.java | 2 +-
.../query/lookup/TestKafkaExtractionCluster.java | 3 ++-
.../IncrementalPublishingKafkaIndexTaskRunner.java | 2 +-
.../io/druid/indexing/kafka/KafkaIndexTask.java | 3 ++-
.../indexing/kafka/LegacyKafkaIndexTaskRunner.java | 2 +-
.../indexing/common/task/HadoopIndexTask.java | 2 +-
.../indexing/common/task/RealtimeIndexTask.java | 2 +-
.../java/io/druid/indexing/common/task/Tasks.java | 3 ++-
.../java/io/druid/indexing/overlord/TaskQueue.java | 5 ++++-
.../http/client/pool/ChannelResourceFactory.java | 5 +++--
.../main/java/io/druid/query/QueryContexts.java | 4 +++-
.../java/io/druid/query/AsyncQueryRunnerTest.java | 6 ++---
.../io/druid/guice/http/JettyHttpClientModule.java | 5 +++--
.../druid/segment/loading/SegmentLoaderConfig.java | 3 ++-
.../druid/server/initialization/ServerConfig.java | 3 ++-
.../StreamAppenderatorDriverFailTest.java | 10 ++++-----
.../appenderator/StreamAppenderatorDriverTest.java | 26 +++++++++++-----------
.../firehose/IngestSegmentFirehoseTest.java | 4 ++--
24 files changed, 75 insertions(+), 60 deletions(-)
diff --git a/common/src/main/java/io/druid/common/config/Log4jShutdown.java b/common/src/main/java/io/druid/common/config/Log4jShutdown.java
index d910b18..7b81ada 100644
--- a/common/src/main/java/io/druid/common/config/Log4jShutdown.java
+++ b/common/src/main/java/io/druid/common/config/Log4jShutdown.java
@@ -28,11 +28,12 @@ import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
import javax.annotation.concurrent.GuardedBy;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class Log4jShutdown implements ShutdownCallbackRegistry, LifeCycle
{
- private static final long SHUTDOWN_WAIT_TIMEOUT = 60000;
+ private static final long SHUTDOWN_WAIT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
private final SynchronizedStateHolder state = new SynchronizedStateHolder(State.INITIALIZED);
private final Queue<Runnable> shutdownCallbacks = new ConcurrentLinkedQueue<>();
@@ -100,7 +101,7 @@ public class Log4jShutdown implements ShutdownCallbackRegistry, LifeCycle
public void stop()
{
if (!state.compareAndSet(State.STARTED, State.STOPPING)) {
- State current = state.waitForTransition(State.STOPPING, State.STOPPED, SHUTDOWN_WAIT_TIMEOUT);
+ State current = state.waitForTransition(State.STOPPING, State.STOPPED, SHUTDOWN_WAIT_TIMEOUT_MILLIS);
if (current != State.STOPPED) {
throw new ISE("Expected state [%s] found [%s]", State.STARTED, current);
}
diff --git a/extensions-contrib/ambari-metrics-emitter/src/main/java/io/druid/emitter/ambari/metrics/AmbariMetricsEmitterConfig.java b/extensions-contrib/ambari-metrics-emitter/src/main/java/io/druid/emitter/ambari/metrics/AmbariMetricsEmitterConfig.java
index a7df750..fdd3595 100644
--- a/extensions-contrib/ambari-metrics-emitter/src/main/java/io/druid/emitter/ambari/metrics/AmbariMetricsEmitterConfig.java
+++ b/extensions-contrib/ambari-metrics-emitter/src/main/java/io/druid/emitter/ambari/metrics/AmbariMetricsEmitterConfig.java
@@ -25,13 +25,13 @@ import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
-
+import java.util.concurrent.TimeUnit;
public class AmbariMetricsEmitterConfig
{
private static final int DEFAULT_BATCH_SIZE = 100;
- private static final Long DEFAULT_FLUSH_PERIOD_MILLIS = (long) (60 * 1000); // flush every one minute
- private static final long DEFAULT_GET_TIMEOUT = 1000; // default wait for get operations on the queue 1 sec
+ private static final long DEFAULT_FLUSH_PERIOD_MILLIS = TimeUnit.MINUTES.toMillis(1); // flush every one minute
+ private static final long DEFAULT_GET_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1); // default wait for get operations on the queue 1 sec
private static final String DEFAULT_PROTOCOL = "http";
@JsonProperty
@@ -106,7 +106,7 @@ public class AmbariMetricsEmitterConfig
);
this.alertEmitters = alertEmitters == null ? Collections.emptyList() : alertEmitters;
this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
- this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT : waitForEventTime;
+ this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT_MILLIS : waitForEventTime;
}
@JsonProperty
diff --git a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java
index 531e90e..e29b0cb 100644
--- a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java
+++ b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java
@@ -56,7 +56,7 @@ public class GraphiteEmitter implements Emitter
private final List<Emitter> requestLogEmitters;
private final AtomicBoolean started = new AtomicBoolean(false);
private final LinkedBlockingQueue<GraphiteEvent> eventsQueue;
- private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min
+ private static final long FLUSH_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1); // default flush wait 1 min
private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("GraphiteEmitter-%s")
@@ -222,7 +222,7 @@ public class GraphiteEmitter implements Emitter
if (started.get()) {
Future future = exec.schedule(new ConsumerRunnable(), 0, TimeUnit.MILLISECONDS);
try {
- future.get(FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
+ future.get(FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
if (e instanceof InterruptedException) {
diff --git a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java
index 8013027..ee426a0 100644
--- a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java
+++ b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java
@@ -25,15 +25,15 @@ import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
-
+import java.util.concurrent.TimeUnit;
public class GraphiteEmitterConfig
{
public static final String PLAINTEXT_PROTOCOL = "plaintext";
public static final String PICKLE_PROTOCOL = "pickle";
private static final int DEFAULT_BATCH_SIZE = 100;
- private static final Long DEFAULT_FLUSH_PERIOD = (long) (60 * 1000); // flush every one minute
- private static final long DEFAULT_GET_TIMEOUT = 1000; // default wait for get operations on the queue 1 sec
+ private static final long DEFAULT_FLUSH_PERIOD_MILLIS = TimeUnit.MINUTES.toMillis(1); // flush every one minute
+ private static final long DEFAULT_GET_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1); // default wait for get operations on the queue 1 sec
@JsonProperty
private final String hostname;
@@ -142,7 +142,7 @@ public class GraphiteEmitterConfig
@JsonProperty("waitForEventTime") Long waitForEventTime
)
{
- this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT : waitForEventTime;
+ this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT_MILLIS : waitForEventTime;
this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
this.alertEmitters = alertEmitters == null ? Collections.emptyList() : alertEmitters;
this.requestLogEmitters = requestLogEmitters == null ? Collections.emptyList() : requestLogEmitters;
@@ -150,7 +150,7 @@ public class GraphiteEmitterConfig
druidToGraphiteEventConverter,
"Event converter can not ne null dude"
);
- this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD : flushPeriod;
+ this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD_MILLIS : flushPeriod;
this.maxQueueSize = maxQueueSize == null ? Integer.MAX_VALUE : maxQueueSize;
this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null");
this.port = Preconditions.checkNotNull(port, "port can not be null");
diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java
index 8461c9b..4109a43 100644
--- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java
+++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java
@@ -43,6 +43,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class KafkaEightSimpleConsumerFirehoseFactory implements
@@ -72,7 +73,7 @@ public class KafkaEightSimpleConsumerFirehoseFactory implements
private final List<PartitionConsumerWorker> consumerWorkers = new CopyOnWriteArrayList<>();
private static final int DEFAULT_QUEUE_BUFFER_LENGTH = 20000;
- private static final int CONSUMER_FETCH_TIMEOUT = 10000;
+ private static final int CONSUMER_FETCH_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(10);
@JsonCreator
public KafkaEightSimpleConsumerFirehoseFactory(
@@ -307,7 +308,7 @@ public class KafkaEightSimpleConsumerFirehoseFactory implements
try {
while (!stopped.get()) {
try {
- Iterable<BytesMessageWithOffset> msgs = consumer.fetch(offset, CONSUMER_FETCH_TIMEOUT);
+ Iterable<BytesMessageWithOffset> msgs = consumer.fetch(offset, CONSUMER_FETCH_TIMEOUT_MILLIS);
int count = 0;
for (BytesMessageWithOffset msgWithOffset : msgs) {
offset = msgWithOffset.offset();
diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java
index aabef05..21a3664 100644
--- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java
+++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java
@@ -49,6 +49,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
/**
* refer @{link https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example}
@@ -74,10 +75,10 @@ public class KafkaSimpleConsumer
private List<HostAndPort> replicaBrokers;
private SimpleConsumer consumer = null;
- private static final int SO_TIMEOUT = 30000;
+ private static final int SO_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(30);
private static final int BUFFER_SIZE = 65536;
- private static final long RETRY_INTERVAL = 1000L;
- private static final int FETCH_SIZE = 100000000;
+ private static final long RETRY_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(1);
+ private static final int FETCH_SIZE = 100_000_000;
public KafkaSimpleConsumer(String topic, int partitionId, String clientId, List<String> brokers, boolean earliest)
{
@@ -121,7 +122,7 @@ public class KafkaSimpleConsumer
);
consumer = new SimpleConsumer(
- leaderBroker.host(), leaderBroker.port(), SO_TIMEOUT, BUFFER_SIZE, clientId
+ leaderBroker.host(), leaderBroker.port(), SO_TIMEOUT_MILLIS, BUFFER_SIZE, clientId
);
}
}
@@ -306,7 +307,7 @@ public class KafkaSimpleConsumer
SimpleConsumer consumer = null;
try {
log.info("Finding new leader from Kafka brokers, try broker [%s]", broker.toString());
- consumer = new SimpleConsumer(broker.getHostText(), broker.getPort(), SO_TIMEOUT, BUFFER_SIZE, leaderLookupClientId);
+ consumer = new SimpleConsumer(broker.getHostText(), broker.getPort(), SO_TIMEOUT_MILLIS, BUFFER_SIZE, leaderLookupClientId);
TopicMetadataResponse resp = consumer.send(new TopicMetadataRequest(Collections.singletonList(topic)));
List<TopicMetadata> metaData = resp.topicsMetadata();
@@ -361,7 +362,7 @@ public class KafkaSimpleConsumer
}
}
- Thread.sleep(RETRY_INTERVAL);
+ Thread.sleep(RETRY_INTERVAL_MILLIS);
retryCnt++;
// if could not find the leader for current replicaBrokers, let's try to
// find one via allBrokers
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java
index a2f14aa..e72cbfe 100644
--- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java
@@ -243,7 +243,7 @@ public class SketchHolder
{
UNION,
INTERSECT,
- NOT;
+ NOT
}
public static SketchHolder sketchSetOperation(Func func, int sketchSize, Object... holders)
diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
index a000679..4fcf917 100644
--- a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
+++ b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
@@ -61,6 +61,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
/**
*
@@ -149,7 +150,7 @@ public class TestKafkaExtractionCluster
@Override
public long nanoseconds()
{
- return milliseconds() * 1_000_000;
+ return TimeUnit.MILLISECONDS.toNanos(milliseconds());
}
@Override
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index 6d42416..434e16e 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -448,7 +448,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
// that has not been written yet (which is totally legitimate). So let's wait for it to show up.
ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
try {
- records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT);
+ records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
}
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 8cb6323..0e7ebcb 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -65,6 +65,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class KafkaIndexTask extends AbstractTask implements ChatHandler
@@ -83,7 +84,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private static final EmittingLogger log = new EmittingLogger(KafkaIndexTask.class);
private static final String TYPE = "index_kafka";
private static final Random RANDOM = new Random();
- static final long POLL_TIMEOUT = 100;
+ static final long POLL_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.toMillis(100);
static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
private final DataSchema dataSchema;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index a9dff63..a8016a1 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -371,7 +371,7 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
// that has not been written yet (which is totally legitimate). So let's wait for it to show up.
ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
try {
- records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT);
+ records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
}
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index 4a72114..3982806 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -327,7 +327,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get()
)
);
- final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
+ final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
final TaskLock lock = Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit(
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index a496182..c374604 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -220,7 +220,7 @@ public class RealtimeIndexTask extends AbstractTask
// which will typically be either the main data processing loop or the persist thread.
// Wrap default DataSegmentAnnouncer such that we unlock intervals as we unannounce segments
- final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
+ final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, http timeout error can occur while waiting for a
// lock to be acquired.
final DataSegmentAnnouncer lockingSegmentAnnouncer = new DataSegmentAnnouncer()
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java
index e15d6e5..c9bf0aa 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java
@@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
public class Tasks
{
@@ -42,7 +43,7 @@ public class Tasks
public static final int DEFAULT_BATCH_INDEX_TASK_PRIORITY = 50;
public static final int DEFAULT_MERGE_TASK_PRIORITY = 25;
public static final int DEFAULT_TASK_PRIORITY = 0;
- public static final long DEFAULT_LOCK_TIMEOUT = 5 * 60 * 1000; // 5 min
+ public static final long DEFAULT_LOCK_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
public static final String PRIORITY_KEY = "priority";
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java
index 3946248..a14b7ad 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java
@@ -54,6 +54,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -68,6 +69,8 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public class TaskQueue
{
+ private final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60);
+
private final List<Task> tasks = Lists.newArrayList();
private final Map<String, ListenableFuture<TaskStatus>> taskFutures = Maps.newHashMap();
@@ -290,7 +293,7 @@ public class TaskQueue
}
// awaitNanos because management may become necessary without this condition signalling,
// due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.
- managementMayBeNecessary.awaitNanos(60000000000L /* 60 seconds */);
+ managementMayBeNecessary.awaitNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
}
finally {
giant.unlock();
diff --git a/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java b/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java
index 04124b2..be04e72 100644
--- a/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java
+++ b/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java
@@ -41,6 +41,7 @@ import javax.net.ssl.SSLParameters;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.concurrent.TimeUnit;
/**
*/
@@ -48,7 +49,7 @@ public class ChannelResourceFactory implements ResourceFactory<String, ChannelFu
{
private static final Logger log = new Logger(ChannelResourceFactory.class);
- private static final long DEFAULT_SSL_HANDSHAKE_TIMEOUT = 10000L; /* 10 seconds */
+ private static final long DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
private final ClientBootstrap bootstrap;
private final SSLContext sslContext;
@@ -65,7 +66,7 @@ public class ChannelResourceFactory implements ResourceFactory<String, ChannelFu
this.bootstrap = Preconditions.checkNotNull(bootstrap, "bootstrap");
this.sslContext = sslContext;
this.timer = timer;
- this.sslHandshakeTimeout = sslHandshakeTimeout >= 0 ? sslHandshakeTimeout : DEFAULT_SSL_HANDSHAKE_TIMEOUT;
+ this.sslHandshakeTimeout = sslHandshakeTimeout >= 0 ? sslHandshakeTimeout : DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS;
if (sslContext != null) {
Preconditions.checkNotNull(timer, "timer is required when sslContext is present");
diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java
index 4252ad4..fe5c762 100644
--- a/processing/src/main/java/io/druid/query/QueryContexts.java
+++ b/processing/src/main/java/io/druid/query/QueryContexts.java
@@ -25,6 +25,8 @@ import io.druid.guice.annotations.PublicApi;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.Numbers;
+import java.util.concurrent.TimeUnit;
+
@PublicApi
public class QueryContexts
{
@@ -41,7 +43,7 @@ public class QueryContexts
public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true;
public static final int DEFAULT_PRIORITY = 0;
public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0;
- public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes
+ public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
public static final long NO_TIMEOUT = 0;
public static <T> boolean isBySegment(Query<T> query)
diff --git a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java
index b9f18a6..0ea8fc8 100644
--- a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java
+++ b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java
@@ -38,7 +38,7 @@ import java.util.concurrent.TimeoutException;
public class AsyncQueryRunnerTest
{
- private static final long TEST_TIMEOUT = 60000;
+ private static final long TEST_TIMEOUT_MILLIS = 60_000;
private final ExecutorService executor;
private final Query query;
@@ -53,7 +53,7 @@ public class AsyncQueryRunnerTest
.build();
}
- @Test(timeout = TEST_TIMEOUT)
+ @Test(timeout = TEST_TIMEOUT_MILLIS)
public void testAsyncNature()
{
final CountDownLatch latch = new CountDownLatch(1);
@@ -83,7 +83,7 @@ public class AsyncQueryRunnerTest
Assert.assertEquals(Collections.singletonList(1), lazy.toList());
}
- @Test(timeout = TEST_TIMEOUT)
+ @Test(timeout = TEST_TIMEOUT_MILLIS)
public void testQueryTimeoutHonored()
{
QueryRunner baseRunner = new QueryRunner()
diff --git a/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java b/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java
index 8309848..80f167e 100644
--- a/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java
+++ b/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java
@@ -31,12 +31,13 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import java.lang.annotation.Annotation;
+import java.util.concurrent.TimeUnit;
/**
*/
public class JettyHttpClientModule implements Module
{
- private static final long CLIENT_CONNECT_TIMEOUT = 500;
+ private static final long CLIENT_CONNECT_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.toMillis(500);
public static JettyHttpClientModule global()
{
@@ -120,7 +121,7 @@ public class JettyHttpClientModule implements Module
httpClient.setIdleTimeout(config.getReadTimeout().getMillis());
httpClient.setMaxConnectionsPerDestination(config.getNumConnections());
httpClient.setMaxRequestsQueuedPerDestination(config.getNumRequestsQueued());
- httpClient.setConnectTimeout(CLIENT_CONNECT_TIMEOUT);
+ httpClient.setConnectTimeout(CLIENT_CONNECT_TIMEOUT_MILLIS);
httpClient.setRequestBufferSize(config.getRequestBuffersize());
final QueuedThreadPool pool = new QueuedThreadPool(config.getNumMaxThreads());
pool.setName(JettyHttpClientModule.class.getSimpleName() + "-threadPool-" + pool.hashCode());
diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
index 4b2eddc..306d6c8 100644
--- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
+++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
@@ -26,6 +26,7 @@ import org.hibernate.validator.constraints.NotEmpty;
import java.io.File;
import java.util.List;
+import java.util.concurrent.TimeUnit;
/**
*/
@@ -39,7 +40,7 @@ public class SegmentLoaderConfig
private boolean deleteOnRemove = true;
@JsonProperty("dropSegmentDelayMillis")
- private int dropSegmentDelayMillis = 30 * 1000; // 30 seconds
+ private int dropSegmentDelayMillis = (int) TimeUnit.SECONDS.toMillis(30);
@JsonProperty("announceIntervalMillis")
private int announceIntervalMillis = 0; // do not background announce
diff --git a/server/src/main/java/io/druid/server/initialization/ServerConfig.java b/server/src/main/java/io/druid/server/initialization/ServerConfig.java
index 3827df9..0ef91cc 100644
--- a/server/src/main/java/io/druid/server/initialization/ServerConfig.java
+++ b/server/src/main/java/io/druid/server/initialization/ServerConfig.java
@@ -26,6 +26,7 @@ import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import java.util.zip.Deflater;
/**
@@ -52,7 +53,7 @@ public class ServerConfig
@JsonProperty
@Min(0)
- private long defaultQueryTimeout = 300_000; // 5 minutes
+ private long defaultQueryTimeout = TimeUnit.MINUTES.toMillis(5);
@JsonProperty
@Min(1)
diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 1c7abb1..6eb29d8 100644
--- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -73,7 +73,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
{
private static final String DATA_SOURCE = "foo";
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
- private static final long PUBLISH_TIMEOUT = 5000;
+ private static final long PUBLISH_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(5);
private static final List<InputRow> ROWS = ImmutableList.of(
new MapBasedInputRow(
@@ -153,7 +153,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
StreamAppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
@Test
@@ -191,7 +191,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
StreamAppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
@Test
@@ -229,7 +229,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
StreamAppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
driver.registerHandoff(published).get();
}
@@ -314,7 +314,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
StreamAppenderatorDriverTest.makeFailingPublisher(failWithException),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
catch (Exception e) {
throw e;
diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index b9c5e22..32f04a1 100644
--- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -72,8 +72,8 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
private static final int MAX_ROWS_IN_MEMORY = 100;
private static final int MAX_ROWS_PER_SEGMENT = 3;
- private static final long PUBLISH_TIMEOUT = 10000;
- private static final long HANDOFF_CONDITION_TIMEOUT = 1000;
+ private static final long PUBLISH_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
+ private static final long HANDOFF_CONDITION_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1);
private static final List<InputRow> ROWS = Arrays.asList(
new MapBasedInputRow(
@@ -144,14 +144,14 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
while (driver.getSegments().containsKey("dummy")) {
Thread.sleep(100);
}
final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published)
- .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
+ .get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
Assert.assertEquals(
ImmutableSet.of(
@@ -194,14 +194,14 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
while (driver.getSegments().containsKey("dummy")) {
Thread.sleep(100);
}
final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published)
- .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
+ .get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
Assert.assertEquals(numSegments, segmentsAndMetadata.getSegments().size());
Assert.assertEquals(numSegments * MAX_ROWS_PER_SEGMENT, segmentsAndMetadata.getCommitMetadata());
}
@@ -223,13 +223,13 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
while (driver.getSegments().containsKey("dummy")) {
Thread.sleep(100);
}
- driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
+ driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
@Test
@@ -248,7 +248,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
Assert.assertEquals(
ImmutableSet.of(
@@ -269,7 +269,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
Assert.assertEquals(
ImmutableSet.of(
@@ -290,7 +290,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
- ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
Assert.assertEquals(
ImmutableSet.of(),
@@ -328,11 +328,11 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
);
final SegmentsAndMetadata handedoffFromSequence0 = futureForSequence0.get(
- HANDOFF_CONDITION_TIMEOUT,
+ HANDOFF_CONDITION_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS
);
final SegmentsAndMetadata handedoffFromSequence1 = futureForSequence1.get(
- HANDOFF_CONDITION_TIMEOUT,
+ HANDOFF_CONDITION_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS
);
diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
index 1556b2f..bbf92b1 100644
--- a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
@@ -134,7 +134,7 @@ public class IngestSegmentFirehoseTest
.build()
)
.setMaxRowCount(5000)
- .buildOnheap();
+ .buildOnheap()
) {
final StorageAdapter sa = new QueryableIndexStorageAdapter(qi);
final WindowedStorageAdapter wsa = new WindowedStorageAdapter(sa, sa.getInterval());
@@ -224,7 +224,7 @@ public class IngestSegmentFirehoseTest
.build()
)
.setMaxRowCount(5000)
- .buildOnheap();
+ .buildOnheap()
) {
for (String line : rows) {
index.add(parser.parse(line));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org