You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2018/08/21 22:29:44 UTC

[incubator-druid] branch master updated: Make time-related variables more readable (#6158)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3647d4c  Make time-related variables more readable (#6158)
3647d4c is described below

commit 3647d4c94ad52ba16ae9ab9ee583dad355f21fd2
Author: Benedict Jin <15...@qq.com>
AuthorDate: Wed Aug 22 06:29:40 2018 +0800

    Make time-related variables more readable (#6158)
    
    * Make time-related variables more readable
    
    * Patch some improvements from the code reviewer
    
    * Remove unnecessary boxing of Long type variables
---
 .../java/io/druid/common/config/Log4jShutdown.java |  5 +++--
 .../ambari/metrics/AmbariMetricsEmitterConfig.java |  8 +++----
 .../io/druid/emitter/graphite/GraphiteEmitter.java |  4 ++--
 .../emitter/graphite/GraphiteEmitterConfig.java    | 10 ++++-----
 .../KafkaEightSimpleConsumerFirehoseFactory.java   |  5 +++--
 .../druid/firehose/kafka/KafkaSimpleConsumer.java  | 13 ++++++-----
 .../datasketches/theta/SketchHolder.java           |  2 +-
 .../query/lookup/TestKafkaExtractionCluster.java   |  3 ++-
 .../IncrementalPublishingKafkaIndexTaskRunner.java |  2 +-
 .../io/druid/indexing/kafka/KafkaIndexTask.java    |  3 ++-
 .../indexing/kafka/LegacyKafkaIndexTaskRunner.java |  2 +-
 .../indexing/common/task/HadoopIndexTask.java      |  2 +-
 .../indexing/common/task/RealtimeIndexTask.java    |  2 +-
 .../java/io/druid/indexing/common/task/Tasks.java  |  3 ++-
 .../java/io/druid/indexing/overlord/TaskQueue.java |  5 ++++-
 .../http/client/pool/ChannelResourceFactory.java   |  5 +++--
 .../main/java/io/druid/query/QueryContexts.java    |  4 +++-
 .../java/io/druid/query/AsyncQueryRunnerTest.java  |  6 ++---
 .../io/druid/guice/http/JettyHttpClientModule.java |  5 +++--
 .../druid/segment/loading/SegmentLoaderConfig.java |  3 ++-
 .../druid/server/initialization/ServerConfig.java  |  3 ++-
 .../StreamAppenderatorDriverFailTest.java          | 10 ++++-----
 .../appenderator/StreamAppenderatorDriverTest.java | 26 +++++++++++-----------
 .../firehose/IngestSegmentFirehoseTest.java        |  4 ++--
 24 files changed, 75 insertions(+), 60 deletions(-)

diff --git a/common/src/main/java/io/druid/common/config/Log4jShutdown.java b/common/src/main/java/io/druid/common/config/Log4jShutdown.java
index d910b18..7b81ada 100644
--- a/common/src/main/java/io/druid/common/config/Log4jShutdown.java
+++ b/common/src/main/java/io/druid/common/config/Log4jShutdown.java
@@ -28,11 +28,12 @@ import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
 import javax.annotation.concurrent.GuardedBy;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class Log4jShutdown implements ShutdownCallbackRegistry, LifeCycle
 {
-  private static final long SHUTDOWN_WAIT_TIMEOUT = 60000;
+  private static final long SHUTDOWN_WAIT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
 
   private final SynchronizedStateHolder state = new SynchronizedStateHolder(State.INITIALIZED);
   private final Queue<Runnable> shutdownCallbacks = new ConcurrentLinkedQueue<>();
@@ -100,7 +101,7 @@ public class Log4jShutdown implements ShutdownCallbackRegistry, LifeCycle
   public void stop()
   {
     if (!state.compareAndSet(State.STARTED, State.STOPPING)) {
-      State current = state.waitForTransition(State.STOPPING, State.STOPPED, SHUTDOWN_WAIT_TIMEOUT);
+      State current = state.waitForTransition(State.STOPPING, State.STOPPED, SHUTDOWN_WAIT_TIMEOUT_MILLIS);
       if (current != State.STOPPED) {
         throw new ISE("Expected state [%s] found [%s]", State.STARTED, current);
       }
diff --git a/extensions-contrib/ambari-metrics-emitter/src/main/java/io/druid/emitter/ambari/metrics/AmbariMetricsEmitterConfig.java b/extensions-contrib/ambari-metrics-emitter/src/main/java/io/druid/emitter/ambari/metrics/AmbariMetricsEmitterConfig.java
index a7df750..fdd3595 100644
--- a/extensions-contrib/ambari-metrics-emitter/src/main/java/io/druid/emitter/ambari/metrics/AmbariMetricsEmitterConfig.java
+++ b/extensions-contrib/ambari-metrics-emitter/src/main/java/io/druid/emitter/ambari/metrics/AmbariMetricsEmitterConfig.java
@@ -25,13 +25,13 @@ import com.google.common.base.Preconditions;
 
 import java.util.Collections;
 import java.util.List;
-
+import java.util.concurrent.TimeUnit;
 
 public class AmbariMetricsEmitterConfig
 {
   private static final int DEFAULT_BATCH_SIZE = 100;
-  private static final Long DEFAULT_FLUSH_PERIOD_MILLIS = (long) (60 * 1000); // flush every one minute
-  private static final long DEFAULT_GET_TIMEOUT = 1000; // default wait for get operations on the queue 1 sec
+  private static final long DEFAULT_FLUSH_PERIOD_MILLIS = TimeUnit.MINUTES.toMillis(1); // flush every one minute
+  private static final long DEFAULT_GET_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1); // default wait for get operations on the queue 1 sec
   private static final String DEFAULT_PROTOCOL = "http";
 
   @JsonProperty
@@ -106,7 +106,7 @@ public class AmbariMetricsEmitterConfig
     );
     this.alertEmitters = alertEmitters == null ? Collections.emptyList() : alertEmitters;
     this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
-    this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT : waitForEventTime;
+    this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT_MILLIS : waitForEventTime;
   }
 
   @JsonProperty
diff --git a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java
index 531e90e..e29b0cb 100644
--- a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java
+++ b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java
@@ -56,7 +56,7 @@ public class GraphiteEmitter implements Emitter
   private final List<Emitter> requestLogEmitters;
   private final AtomicBoolean started = new AtomicBoolean(false);
   private final LinkedBlockingQueue<GraphiteEvent> eventsQueue;
-  private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min
+  private static final long FLUSH_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1); // default flush wait 1 min
   private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
       .setDaemon(true)
       .setNameFormat("GraphiteEmitter-%s")
@@ -222,7 +222,7 @@ public class GraphiteEmitter implements Emitter
     if (started.get()) {
       Future future = exec.schedule(new ConsumerRunnable(), 0, TimeUnit.MILLISECONDS);
       try {
-        future.get(FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
+        future.get(FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
       }
       catch (InterruptedException | ExecutionException | TimeoutException e) {
         if (e instanceof InterruptedException) {
diff --git a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java
index 8013027..ee426a0 100644
--- a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java
+++ b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java
@@ -25,15 +25,15 @@ import com.google.common.base.Preconditions;
 
 import java.util.Collections;
 import java.util.List;
-
+import java.util.concurrent.TimeUnit;
 
 public class GraphiteEmitterConfig
 {
   public static final String PLAINTEXT_PROTOCOL = "plaintext";
   public static final String PICKLE_PROTOCOL = "pickle";
   private static final int DEFAULT_BATCH_SIZE = 100;
-  private static final Long DEFAULT_FLUSH_PERIOD = (long) (60 * 1000); // flush every one minute
-  private static final long DEFAULT_GET_TIMEOUT = 1000; // default wait for get operations on the queue 1 sec
+  private static final long DEFAULT_FLUSH_PERIOD_MILLIS = TimeUnit.MINUTES.toMillis(1); // flush every one minute
+  private static final long DEFAULT_GET_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1); // default wait for get operations on the queue 1 sec
 
   @JsonProperty
   private final String hostname;
@@ -142,7 +142,7 @@ public class GraphiteEmitterConfig
       @JsonProperty("waitForEventTime") Long waitForEventTime
   )
   {
-    this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT : waitForEventTime;
+    this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT_MILLIS : waitForEventTime;
     this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
     this.alertEmitters = alertEmitters == null ? Collections.emptyList() : alertEmitters;
     this.requestLogEmitters = requestLogEmitters == null ? Collections.emptyList() : requestLogEmitters;
@@ -150,7 +150,7 @@ public class GraphiteEmitterConfig
         druidToGraphiteEventConverter,
         "Event converter can not ne null dude"
     );
-    this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD : flushPeriod;
+    this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD_MILLIS : flushPeriod;
     this.maxQueueSize = maxQueueSize == null ? Integer.MAX_VALUE : maxQueueSize;
     this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null");
     this.port = Preconditions.checkNotNull(port, "port can not be null");
diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java
index 8461c9b..4109a43 100644
--- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java
+++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java
@@ -43,6 +43,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class KafkaEightSimpleConsumerFirehoseFactory implements
@@ -72,7 +73,7 @@ public class KafkaEightSimpleConsumerFirehoseFactory implements
 
   private final List<PartitionConsumerWorker> consumerWorkers = new CopyOnWriteArrayList<>();
   private static final int DEFAULT_QUEUE_BUFFER_LENGTH = 20000;
-  private static final int CONSUMER_FETCH_TIMEOUT = 10000;
+  private static final int CONSUMER_FETCH_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(10);
 
   @JsonCreator
   public KafkaEightSimpleConsumerFirehoseFactory(
@@ -307,7 +308,7 @@ public class KafkaEightSimpleConsumerFirehoseFactory implements
           try {
             while (!stopped.get()) {
               try {
-                Iterable<BytesMessageWithOffset> msgs = consumer.fetch(offset, CONSUMER_FETCH_TIMEOUT);
+                Iterable<BytesMessageWithOffset> msgs = consumer.fetch(offset, CONSUMER_FETCH_TIMEOUT_MILLIS);
                 int count = 0;
                 for (BytesMessageWithOffset msgWithOffset : msgs) {
                   offset = msgWithOffset.offset();
diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java
index aabef05..21a3664 100644
--- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java
+++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java
@@ -49,6 +49,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * refer @{link https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example}
@@ -74,10 +75,10 @@ public class KafkaSimpleConsumer
   private List<HostAndPort> replicaBrokers;
   private SimpleConsumer consumer = null;
 
-  private static final int SO_TIMEOUT = 30000;
+  private static final int SO_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(30);
   private static final int BUFFER_SIZE = 65536;
-  private static final long RETRY_INTERVAL = 1000L;
-  private static final int FETCH_SIZE = 100000000;
+  private static final long RETRY_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(1);
+  private static final int FETCH_SIZE = 100_000_000;
 
   public KafkaSimpleConsumer(String topic, int partitionId, String clientId, List<String> brokers, boolean earliest)
   {
@@ -121,7 +122,7 @@ public class KafkaSimpleConsumer
       );
 
       consumer = new SimpleConsumer(
-          leaderBroker.host(), leaderBroker.port(), SO_TIMEOUT, BUFFER_SIZE, clientId
+          leaderBroker.host(), leaderBroker.port(), SO_TIMEOUT_MILLIS, BUFFER_SIZE, clientId
       );
     }
   }
@@ -306,7 +307,7 @@ public class KafkaSimpleConsumer
       SimpleConsumer consumer = null;
       try {
         log.info("Finding new leader from Kafka brokers, try broker [%s]", broker.toString());
-        consumer = new SimpleConsumer(broker.getHostText(), broker.getPort(), SO_TIMEOUT, BUFFER_SIZE, leaderLookupClientId);
+        consumer = new SimpleConsumer(broker.getHostText(), broker.getPort(), SO_TIMEOUT_MILLIS, BUFFER_SIZE, leaderLookupClientId);
         TopicMetadataResponse resp = consumer.send(new TopicMetadataRequest(Collections.singletonList(topic)));
 
         List<TopicMetadata> metaData = resp.topicsMetadata();
@@ -361,7 +362,7 @@ public class KafkaSimpleConsumer
         }
       }
 
-      Thread.sleep(RETRY_INTERVAL);
+      Thread.sleep(RETRY_INTERVAL_MILLIS);
       retryCnt++;
       // if could not find the leader for current replicaBrokers, let's try to
       // find one via allBrokers
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java
index a2f14aa..e72cbfe 100644
--- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java
@@ -243,7 +243,7 @@ public class SketchHolder
   {
     UNION,
     INTERSECT,
-    NOT;
+    NOT
   }
 
   public static SketchHolder sketchSetOperation(Func func, int sketchSize, Object... holders)
diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
index a000679..4fcf917 100644
--- a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
+++ b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
@@ -61,6 +61,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 
 /**
  *
@@ -149,7 +150,7 @@ public class TestKafkaExtractionCluster
           @Override
           public long nanoseconds()
           {
-            return milliseconds() * 1_000_000;
+            return TimeUnit.MILLISECONDS.toNanos(milliseconds());
           }
 
           @Override
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index 6d42416..434e16e 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -448,7 +448,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
           // that has not been written yet (which is totally legitimate). So let's wait for it to show up.
           ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
           try {
-            records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT);
+            records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
           }
           catch (OffsetOutOfRangeException e) {
             log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 8cb6323..0e7ebcb 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -65,6 +65,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 public class KafkaIndexTask extends AbstractTask implements ChatHandler
@@ -83,7 +84,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
   private static final EmittingLogger log = new EmittingLogger(KafkaIndexTask.class);
   private static final String TYPE = "index_kafka";
   private static final Random RANDOM = new Random();
-  static final long POLL_TIMEOUT = 100;
+  static final long POLL_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.toMillis(100);
   static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
 
   private final DataSchema dataSchema;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index a9dff63..a8016a1 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -371,7 +371,7 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
           // that has not been written yet (which is totally legitimate). So let's wait for it to show up.
           ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
           try {
-            records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT);
+            records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
           }
           catch (OffsetOutOfRangeException e) {
             log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index 4a72114..3982806 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -327,7 +327,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
               indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get()
           )
       );
-      final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
+      final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
       // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
       final TaskLock lock = Preconditions.checkNotNull(
           toolbox.getTaskActionClient().submit(
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index a496182..c374604 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -220,7 +220,7 @@ public class RealtimeIndexTask extends AbstractTask
     // which will typically be either the main data processing loop or the persist thread.
 
     // Wrap default DataSegmentAnnouncer such that we unlock intervals as we unannounce segments
-    final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
+    final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
     // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, http timeout error can occur while waiting for a
     // lock to be acquired.
     final DataSegmentAnnouncer lockingSegmentAnnouncer = new DataSegmentAnnouncer()
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java
index e15d6e5..c9bf0aa 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 public class Tasks
 {
@@ -42,7 +43,7 @@ public class Tasks
   public static final int DEFAULT_BATCH_INDEX_TASK_PRIORITY = 50;
   public static final int DEFAULT_MERGE_TASK_PRIORITY = 25;
   public static final int DEFAULT_TASK_PRIORITY = 0;
-  public static final long DEFAULT_LOCK_TIMEOUT = 5 * 60 * 1000; // 5 min
+  public static final long DEFAULT_LOCK_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
 
   public static final String PRIORITY_KEY = "priority";
   public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java
index 3946248..a14b7ad 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java
@@ -54,6 +54,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -68,6 +69,8 @@ import java.util.concurrent.locks.ReentrantLock;
  */
 public class TaskQueue
 {
+  private final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60);
+
   private final List<Task> tasks = Lists.newArrayList();
   private final Map<String, ListenableFuture<TaskStatus>> taskFutures = Maps.newHashMap();
 
@@ -290,7 +293,7 @@ public class TaskQueue
         }
         // awaitNanos because management may become necessary without this condition signalling,
         // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.
-        managementMayBeNecessary.awaitNanos(60000000000L /* 60 seconds */);
+        managementMayBeNecessary.awaitNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
       }
       finally {
         giant.unlock();
diff --git a/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java b/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java
index 04124b2..be04e72 100644
--- a/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java
+++ b/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java
@@ -41,6 +41,7 @@ import javax.net.ssl.SSLParameters;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.concurrent.TimeUnit;
 
 /**
  */
@@ -48,7 +49,7 @@ public class ChannelResourceFactory implements ResourceFactory<String, ChannelFu
 {
   private static final Logger log = new Logger(ChannelResourceFactory.class);
 
-  private static final long DEFAULT_SSL_HANDSHAKE_TIMEOUT = 10000L; /* 10 seconds */
+  private static final long DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
 
   private final ClientBootstrap bootstrap;
   private final SSLContext sslContext;
@@ -65,7 +66,7 @@ public class ChannelResourceFactory implements ResourceFactory<String, ChannelFu
     this.bootstrap = Preconditions.checkNotNull(bootstrap, "bootstrap");
     this.sslContext = sslContext;
     this.timer = timer;
-    this.sslHandshakeTimeout = sslHandshakeTimeout >= 0 ? sslHandshakeTimeout : DEFAULT_SSL_HANDSHAKE_TIMEOUT;
+    this.sslHandshakeTimeout = sslHandshakeTimeout >= 0 ? sslHandshakeTimeout : DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS;
 
     if (sslContext != null) {
       Preconditions.checkNotNull(timer, "timer is required when sslContext is present");
diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java
index 4252ad4..fe5c762 100644
--- a/processing/src/main/java/io/druid/query/QueryContexts.java
+++ b/processing/src/main/java/io/druid/query/QueryContexts.java
@@ -25,6 +25,8 @@ import io.druid.guice.annotations.PublicApi;
 import io.druid.java.util.common.IAE;
 import io.druid.java.util.common.Numbers;
 
+import java.util.concurrent.TimeUnit;
+
 @PublicApi
 public class QueryContexts
 {
@@ -41,7 +43,7 @@ public class QueryContexts
   public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true;
   public static final int DEFAULT_PRIORITY = 0;
   public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0;
-  public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes
+  public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
   public static final long NO_TIMEOUT = 0;
 
   public static <T> boolean isBySegment(Query<T> query)
diff --git a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java
index b9f18a6..0ea8fc8 100644
--- a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java
+++ b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java
@@ -38,7 +38,7 @@ import java.util.concurrent.TimeoutException;
 public class AsyncQueryRunnerTest
 {
 
-  private static final long TEST_TIMEOUT = 60000;
+  private static final long TEST_TIMEOUT_MILLIS = 60_000;
   
   private final ExecutorService executor;
   private final Query query;
@@ -53,7 +53,7 @@ public class AsyncQueryRunnerTest
               .build();
   }
   
-  @Test(timeout = TEST_TIMEOUT)
+  @Test(timeout = TEST_TIMEOUT_MILLIS)
   public void testAsyncNature()
   {
     final CountDownLatch latch = new CountDownLatch(1);
@@ -83,7 +83,7 @@ public class AsyncQueryRunnerTest
     Assert.assertEquals(Collections.singletonList(1), lazy.toList());
   }
   
-  @Test(timeout = TEST_TIMEOUT)
+  @Test(timeout = TEST_TIMEOUT_MILLIS)
   public void testQueryTimeoutHonored()
   {
     QueryRunner baseRunner = new QueryRunner()
diff --git a/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java b/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java
index 8309848..80f167e 100644
--- a/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java
+++ b/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java
@@ -31,12 +31,13 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
 
 import java.lang.annotation.Annotation;
+import java.util.concurrent.TimeUnit;
 
 /**
  */
 public class JettyHttpClientModule implements Module
 {
-  private static final long CLIENT_CONNECT_TIMEOUT = 500;
+  private static final long CLIENT_CONNECT_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.toMillis(500);
 
   public static JettyHttpClientModule global()
   {
@@ -120,7 +121,7 @@ public class JettyHttpClientModule implements Module
       httpClient.setIdleTimeout(config.getReadTimeout().getMillis());
       httpClient.setMaxConnectionsPerDestination(config.getNumConnections());
       httpClient.setMaxRequestsQueuedPerDestination(config.getNumRequestsQueued());
-      httpClient.setConnectTimeout(CLIENT_CONNECT_TIMEOUT);
+      httpClient.setConnectTimeout(CLIENT_CONNECT_TIMEOUT_MILLIS);
       httpClient.setRequestBufferSize(config.getRequestBuffersize());
       final QueuedThreadPool pool = new QueuedThreadPool(config.getNumMaxThreads());
       pool.setName(JettyHttpClientModule.class.getSimpleName() + "-threadPool-" + pool.hashCode());
diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
index 4b2eddc..306d6c8 100644
--- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
+++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
@@ -26,6 +26,7 @@ import org.hibernate.validator.constraints.NotEmpty;
 
 import java.io.File;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 /**
  */
@@ -39,7 +40,7 @@ public class SegmentLoaderConfig
   private boolean deleteOnRemove = true;
 
   @JsonProperty("dropSegmentDelayMillis")
-  private int dropSegmentDelayMillis = 30 * 1000; // 30 seconds
+  private int dropSegmentDelayMillis = (int) TimeUnit.SECONDS.toMillis(30);
 
   @JsonProperty("announceIntervalMillis")
   private int announceIntervalMillis = 0; // do not background announce
diff --git a/server/src/main/java/io/druid/server/initialization/ServerConfig.java b/server/src/main/java/io/druid/server/initialization/ServerConfig.java
index 3827df9..0ef91cc 100644
--- a/server/src/main/java/io/druid/server/initialization/ServerConfig.java
+++ b/server/src/main/java/io/druid/server/initialization/ServerConfig.java
@@ -26,6 +26,7 @@ import javax.validation.constraints.Max;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.Deflater;
 
 /**
@@ -52,7 +53,7 @@ public class ServerConfig
 
   @JsonProperty
   @Min(0)
-  private long defaultQueryTimeout = 300_000; // 5 minutes
+  private long defaultQueryTimeout = TimeUnit.MINUTES.toMillis(5);
 
   @JsonProperty
   @Min(1)
diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 1c7abb1..6eb29d8 100644
--- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -73,7 +73,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
 {
   private static final String DATA_SOURCE = "foo";
   private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
-  private static final long PUBLISH_TIMEOUT = 5000;
+  private static final long PUBLISH_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(5);
 
   private static final List<InputRow> ROWS = ImmutableList.of(
       new MapBasedInputRow(
@@ -153,7 +153,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
         StreamAppenderatorDriverTest.makeOkPublisher(),
         committerSupplier.get(),
         ImmutableList.of("dummy")
-    ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+    ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
   }
 
   @Test
@@ -191,7 +191,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
         StreamAppenderatorDriverTest.makeOkPublisher(),
         committerSupplier.get(),
         ImmutableList.of("dummy")
-    ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+    ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
   }
 
   @Test
@@ -229,7 +229,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
         StreamAppenderatorDriverTest.makeOkPublisher(),
         committerSupplier.get(),
         ImmutableList.of("dummy")
-    ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+    ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
     driver.registerHandoff(published).get();
   }
@@ -314,7 +314,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
           StreamAppenderatorDriverTest.makeFailingPublisher(failWithException),
           committerSupplier.get(),
           ImmutableList.of("dummy")
-      ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+      ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
     }
     catch (Exception e) {
       throw e;
diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index b9c5e22..32f04a1 100644
--- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -72,8 +72,8 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
   private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
   private static final int MAX_ROWS_IN_MEMORY = 100;
   private static final int MAX_ROWS_PER_SEGMENT = 3;
-  private static final long PUBLISH_TIMEOUT = 10000;
-  private static final long HANDOFF_CONDITION_TIMEOUT = 1000;
+  private static final long PUBLISH_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
+  private static final long HANDOFF_CONDITION_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1);
 
   private static final List<InputRow> ROWS = Arrays.asList(
       new MapBasedInputRow(
@@ -144,14 +144,14 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
         makeOkPublisher(),
         committerSupplier.get(),
         ImmutableList.of("dummy")
-    ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+    ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
     while (driver.getSegments().containsKey("dummy")) {
       Thread.sleep(100);
     }
 
     final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published)
-                                                          .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
+                                                          .get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
     Assert.assertEquals(
         ImmutableSet.of(
@@ -194,14 +194,14 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
         makeOkPublisher(),
         committerSupplier.get(),
         ImmutableList.of("dummy")
-    ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+    ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
     while (driver.getSegments().containsKey("dummy")) {
       Thread.sleep(100);
     }
 
     final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published)
-                                                          .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
+                                                          .get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
     Assert.assertEquals(numSegments, segmentsAndMetadata.getSegments().size());
     Assert.assertEquals(numSegments * MAX_ROWS_PER_SEGMENT, segmentsAndMetadata.getCommitMetadata());
   }
@@ -223,13 +223,13 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
         makeOkPublisher(),
         committerSupplier.get(),
         ImmutableList.of("dummy")
-    ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+    ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
     while (driver.getSegments().containsKey("dummy")) {
       Thread.sleep(100);
     }
 
-    driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
+    driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
   }
 
   @Test
@@ -248,7 +248,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
           makeOkPublisher(),
           committerSupplier.get(),
           ImmutableList.of("dummy")
-      ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+      ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
       Assert.assertEquals(
           ImmutableSet.of(
@@ -269,7 +269,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
           makeOkPublisher(),
           committerSupplier.get(),
           ImmutableList.of("dummy")
-      ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+      ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
       Assert.assertEquals(
           ImmutableSet.of(
@@ -290,7 +290,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
         makeOkPublisher(),
         committerSupplier.get(),
         ImmutableList.of("dummy")
-    ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+    ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
     Assert.assertEquals(
         ImmutableSet.of(),
@@ -328,11 +328,11 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
     );
 
     final SegmentsAndMetadata handedoffFromSequence0 = futureForSequence0.get(
-        HANDOFF_CONDITION_TIMEOUT,
+        HANDOFF_CONDITION_TIMEOUT_MILLIS,
         TimeUnit.MILLISECONDS
     );
     final SegmentsAndMetadata handedoffFromSequence1 = futureForSequence1.get(
-        HANDOFF_CONDITION_TIMEOUT,
+        HANDOFF_CONDITION_TIMEOUT_MILLIS,
         TimeUnit.MILLISECONDS
     );
 
diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
index 1556b2f..bbf92b1 100644
--- a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
@@ -134,7 +134,7 @@ public class IngestSegmentFirehoseTest
                     .build()
             )
         .setMaxRowCount(5000)
-        .buildOnheap();
+        .buildOnheap()
     ) {
       final StorageAdapter sa = new QueryableIndexStorageAdapter(qi);
       final WindowedStorageAdapter wsa = new WindowedStorageAdapter(sa, sa.getInterval());
@@ -224,7 +224,7 @@ public class IngestSegmentFirehoseTest
                     .build()
             )
         .setMaxRowCount(5000)
-        .buildOnheap();
+        .buildOnheap()
     ) {
       for (String line : rows) {
         index.add(parser.parse(line));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org