You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/08/21 22:29:42 UTC

[GitHub] jon-wei closed pull request #6158: Make time-related variables more readable

jon-wei closed pull request #6158: Make time-related variables more readable
URL: https://github.com/apache/incubator-druid/pull/6158
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/io/druid/common/config/Log4jShutdown.java b/common/src/main/java/io/druid/common/config/Log4jShutdown.java
index d910b184fbb..7b81ada90e0 100644
--- a/common/src/main/java/io/druid/common/config/Log4jShutdown.java
+++ b/common/src/main/java/io/druid/common/config/Log4jShutdown.java
@@ -28,11 +28,12 @@
 import javax.annotation.concurrent.GuardedBy;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class Log4jShutdown implements ShutdownCallbackRegistry, LifeCycle
 {
-  private static final long SHUTDOWN_WAIT_TIMEOUT = 60000;
+  private static final long SHUTDOWN_WAIT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
 
   private final SynchronizedStateHolder state = new SynchronizedStateHolder(State.INITIALIZED);
   private final Queue<Runnable> shutdownCallbacks = new ConcurrentLinkedQueue<>();
@@ -100,7 +101,7 @@ public void start()
   public void stop()
   {
     if (!state.compareAndSet(State.STARTED, State.STOPPING)) {
-      State current = state.waitForTransition(State.STOPPING, State.STOPPED, SHUTDOWN_WAIT_TIMEOUT);
+      State current = state.waitForTransition(State.STOPPING, State.STOPPED, SHUTDOWN_WAIT_TIMEOUT_MILLIS);
       if (current != State.STOPPED) {
         throw new ISE("Expected state [%s] found [%s]", State.STARTED, current);
       }
diff --git a/extensions-contrib/ambari-metrics-emitter/src/main/java/io/druid/emitter/ambari/metrics/AmbariMetricsEmitterConfig.java b/extensions-contrib/ambari-metrics-emitter/src/main/java/io/druid/emitter/ambari/metrics/AmbariMetricsEmitterConfig.java
index a7df750cbf2..fdd35958ab8 100644
--- a/extensions-contrib/ambari-metrics-emitter/src/main/java/io/druid/emitter/ambari/metrics/AmbariMetricsEmitterConfig.java
+++ b/extensions-contrib/ambari-metrics-emitter/src/main/java/io/druid/emitter/ambari/metrics/AmbariMetricsEmitterConfig.java
@@ -25,13 +25,13 @@
 
 import java.util.Collections;
 import java.util.List;
-
+import java.util.concurrent.TimeUnit;
 
 public class AmbariMetricsEmitterConfig
 {
   private static final int DEFAULT_BATCH_SIZE = 100;
-  private static final Long DEFAULT_FLUSH_PERIOD_MILLIS = (long) (60 * 1000); // flush every one minute
-  private static final long DEFAULT_GET_TIMEOUT = 1000; // default wait for get operations on the queue 1 sec
+  private static final long DEFAULT_FLUSH_PERIOD_MILLIS = TimeUnit.MINUTES.toMillis(1); // flush every one minute
+  private static final long DEFAULT_GET_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1); // default wait for get operations on the queue 1 sec
   private static final String DEFAULT_PROTOCOL = "http";
 
   @JsonProperty
@@ -106,7 +106,7 @@ public AmbariMetricsEmitterConfig(
     );
     this.alertEmitters = alertEmitters == null ? Collections.emptyList() : alertEmitters;
     this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
-    this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT : waitForEventTime;
+    this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT_MILLIS : waitForEventTime;
   }
 
   @JsonProperty
diff --git a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java
index 531e90efcc4..e29b0cb8109 100644
--- a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java
+++ b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java
@@ -56,7 +56,7 @@
   private final List<Emitter> requestLogEmitters;
   private final AtomicBoolean started = new AtomicBoolean(false);
   private final LinkedBlockingQueue<GraphiteEvent> eventsQueue;
-  private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min
+  private static final long FLUSH_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1); // default flush wait 1 min
   private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
       .setDaemon(true)
       .setNameFormat("GraphiteEmitter-%s")
@@ -222,7 +222,7 @@ public void flush()
     if (started.get()) {
       Future future = exec.schedule(new ConsumerRunnable(), 0, TimeUnit.MILLISECONDS);
       try {
-        future.get(FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
+        future.get(FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
       }
       catch (InterruptedException | ExecutionException | TimeoutException e) {
         if (e instanceof InterruptedException) {
diff --git a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java
index 8013027d809..ee426a0b66d 100644
--- a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java
+++ b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java
@@ -25,15 +25,15 @@
 
 import java.util.Collections;
 import java.util.List;
-
+import java.util.concurrent.TimeUnit;
 
 public class GraphiteEmitterConfig
 {
   public static final String PLAINTEXT_PROTOCOL = "plaintext";
   public static final String PICKLE_PROTOCOL = "pickle";
   private static final int DEFAULT_BATCH_SIZE = 100;
-  private static final Long DEFAULT_FLUSH_PERIOD = (long) (60 * 1000); // flush every one minute
-  private static final long DEFAULT_GET_TIMEOUT = 1000; // default wait for get operations on the queue 1 sec
+  private static final long DEFAULT_FLUSH_PERIOD_MILLIS = TimeUnit.MINUTES.toMillis(1); // flush every one minute
+  private static final long DEFAULT_GET_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1); // default wait for get operations on the queue 1 sec
 
   @JsonProperty
   private final String hostname;
@@ -142,7 +142,7 @@ public GraphiteEmitterConfig(
       @JsonProperty("waitForEventTime") Long waitForEventTime
   )
   {
-    this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT : waitForEventTime;
+    this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT_MILLIS : waitForEventTime;
     this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
     this.alertEmitters = alertEmitters == null ? Collections.emptyList() : alertEmitters;
     this.requestLogEmitters = requestLogEmitters == null ? Collections.emptyList() : requestLogEmitters;
@@ -150,7 +150,7 @@ public GraphiteEmitterConfig(
         druidToGraphiteEventConverter,
         "Event converter can not ne null dude"
     );
-    this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD : flushPeriod;
+    this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD_MILLIS : flushPeriod;
     this.maxQueueSize = maxQueueSize == null ? Integer.MAX_VALUE : maxQueueSize;
     this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null");
     this.port = Preconditions.checkNotNull(port, "port can not be null");
diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java
index 8461c9bd16e..4109a43a98d 100644
--- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java
+++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java
@@ -43,6 +43,7 @@
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class KafkaEightSimpleConsumerFirehoseFactory implements
@@ -72,7 +73,7 @@
 
   private final List<PartitionConsumerWorker> consumerWorkers = new CopyOnWriteArrayList<>();
   private static final int DEFAULT_QUEUE_BUFFER_LENGTH = 20000;
-  private static final int CONSUMER_FETCH_TIMEOUT = 10000;
+  private static final int CONSUMER_FETCH_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(10);
 
   @JsonCreator
   public KafkaEightSimpleConsumerFirehoseFactory(
@@ -307,7 +308,7 @@ public void run()
           try {
             while (!stopped.get()) {
               try {
-                Iterable<BytesMessageWithOffset> msgs = consumer.fetch(offset, CONSUMER_FETCH_TIMEOUT);
+                Iterable<BytesMessageWithOffset> msgs = consumer.fetch(offset, CONSUMER_FETCH_TIMEOUT_MILLIS);
                 int count = 0;
                 for (BytesMessageWithOffset msgWithOffset : msgs) {
                   offset = msgWithOffset.offset();
diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java
index aabef05f6a8..21a366463e3 100644
--- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java
+++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java
@@ -49,6 +49,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * refer @{link https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example}
@@ -74,10 +75,10 @@
   private List<HostAndPort> replicaBrokers;
   private SimpleConsumer consumer = null;
 
-  private static final int SO_TIMEOUT = 30000;
+  private static final int SO_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(30);
   private static final int BUFFER_SIZE = 65536;
-  private static final long RETRY_INTERVAL = 1000L;
-  private static final int FETCH_SIZE = 100000000;
+  private static final long RETRY_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(1);
+  private static final int FETCH_SIZE = 100_000_000;
 
   public KafkaSimpleConsumer(String topic, int partitionId, String clientId, List<String> brokers, boolean earliest)
   {
@@ -121,7 +122,7 @@ private void ensureConsumer(Broker leader) throws InterruptedException
       );
 
       consumer = new SimpleConsumer(
-          leaderBroker.host(), leaderBroker.port(), SO_TIMEOUT, BUFFER_SIZE, clientId
+          leaderBroker.host(), leaderBroker.port(), SO_TIMEOUT_MILLIS, BUFFER_SIZE, clientId
       );
     }
   }
@@ -306,7 +307,7 @@ private PartitionMetadata findLeader() throws InterruptedException
       SimpleConsumer consumer = null;
       try {
         log.info("Finding new leader from Kafka brokers, try broker [%s]", broker.toString());
-        consumer = new SimpleConsumer(broker.getHostText(), broker.getPort(), SO_TIMEOUT, BUFFER_SIZE, leaderLookupClientId);
+        consumer = new SimpleConsumer(broker.getHostText(), broker.getPort(), SO_TIMEOUT_MILLIS, BUFFER_SIZE, leaderLookupClientId);
         TopicMetadataResponse resp = consumer.send(new TopicMetadataRequest(Collections.singletonList(topic)));
 
         List<TopicMetadata> metaData = resp.topicsMetadata();
@@ -361,7 +362,7 @@ private Broker findNewLeader(Broker oldLeader) throws InterruptedException
         }
       }
 
-      Thread.sleep(RETRY_INTERVAL);
+      Thread.sleep(RETRY_INTERVAL_MILLIS);
       retryCnt++;
       // if could not find the leader for current replicaBrokers, let's try to
       // find one via allBrokers
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java
index a2f14aa0fa7..e72cbfe1c08 100644
--- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java
@@ -243,7 +243,7 @@ private static Sketch deserializeFromMemory(Memory mem)
   {
     UNION,
     INTERSECT,
-    NOT;
+    NOT
   }
 
   public static SketchHolder sketchSetOperation(Func func, int sketchSize, Object... holders)
diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
index a0006796a5b..4fcf9176bd1 100644
--- a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
+++ b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
@@ -61,6 +61,7 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 
 /**
  *
@@ -149,7 +150,7 @@ public long milliseconds()
           @Override
           public long nanoseconds()
           {
-            return milliseconds() * 1_000_000;
+            return TimeUnit.MILLISECONDS.toNanos(milliseconds());
           }
 
           @Override
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index a93fde611c5..a2749ef05fe 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -448,7 +448,7 @@ public void run()
           // that has not been written yet (which is totally legitimate). So let's wait for it to show up.
           ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
           try {
-            records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT);
+            records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
           }
           catch (OffsetOutOfRangeException e) {
             log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 8cb63239757..0e7ebcb0816 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -65,6 +65,7 @@
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 public class KafkaIndexTask extends AbstractTask implements ChatHandler
@@ -83,7 +84,7 @@
   private static final EmittingLogger log = new EmittingLogger(KafkaIndexTask.class);
   private static final String TYPE = "index_kafka";
   private static final Random RANDOM = new Random();
-  static final long POLL_TIMEOUT = 100;
+  static final long POLL_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.toMillis(100);
   static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
 
   private final DataSchema dataSchema;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index b7cdd3e96e4..6e3f9c8c52f 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -371,7 +371,7 @@ public void run()
           // that has not been written yet (which is totally legitimate). So let's wait for it to show up.
           ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
           try {
-            records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT);
+            records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
           }
           catch (OffsetOutOfRangeException e) {
             log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index 4a72114be82..3982806cc7f 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -327,7 +327,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
               indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get()
           )
       );
-      final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
+      final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
       // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
       final TaskLock lock = Preconditions.checkNotNull(
           toolbox.getTaskActionClient().submit(
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index a496182b717..c3746049e14 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -220,7 +220,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
     // which will typically be either the main data processing loop or the persist thread.
 
     // Wrap default DataSegmentAnnouncer such that we unlock intervals as we unannounce segments
-    final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
+    final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
     // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, http timeout error can occur while waiting for a
     // lock to be acquired.
     final DataSegmentAnnouncer lockingSegmentAnnouncer = new DataSegmentAnnouncer()
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java
index e15d6e5e710..c9bf0aa5c1b 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java
@@ -35,6 +35,7 @@
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 public class Tasks
 {
@@ -42,7 +43,7 @@
   public static final int DEFAULT_BATCH_INDEX_TASK_PRIORITY = 50;
   public static final int DEFAULT_MERGE_TASK_PRIORITY = 25;
   public static final int DEFAULT_TASK_PRIORITY = 0;
-  public static final long DEFAULT_LOCK_TIMEOUT = 5 * 60 * 1000; // 5 min
+  public static final long DEFAULT_LOCK_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
 
   public static final String PRIORITY_KEY = "priority";
   public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java
index 3946248d6c9..a14b7ad0c19 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java
@@ -54,6 +54,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -68,6 +69,8 @@
  */
 public class TaskQueue
 {
+  private final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60);
+
   private final List<Task> tasks = Lists.newArrayList();
   private final Map<String, ListenableFuture<TaskStatus>> taskFutures = Maps.newHashMap();
 
@@ -290,7 +293,7 @@ public String apply(Task task)
         }
         // awaitNanos because management may become necessary without this condition signalling,
         // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.
-        managementMayBeNecessary.awaitNanos(60000000000L /* 60 seconds */);
+        managementMayBeNecessary.awaitNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
       }
       finally {
         giant.unlock();
diff --git a/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java b/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java
index 04124b2b914..be04e722576 100644
--- a/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java
+++ b/java-util/src/main/java/io/druid/java/util/http/client/pool/ChannelResourceFactory.java
@@ -41,6 +41,7 @@
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.concurrent.TimeUnit;
 
 /**
  */
@@ -48,7 +49,7 @@
 {
   private static final Logger log = new Logger(ChannelResourceFactory.class);
 
-  private static final long DEFAULT_SSL_HANDSHAKE_TIMEOUT = 10000L; /* 10 seconds */
+  private static final long DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
 
   private final ClientBootstrap bootstrap;
   private final SSLContext sslContext;
@@ -65,7 +66,7 @@ public ChannelResourceFactory(
     this.bootstrap = Preconditions.checkNotNull(bootstrap, "bootstrap");
     this.sslContext = sslContext;
     this.timer = timer;
-    this.sslHandshakeTimeout = sslHandshakeTimeout >= 0 ? sslHandshakeTimeout : DEFAULT_SSL_HANDSHAKE_TIMEOUT;
+    this.sslHandshakeTimeout = sslHandshakeTimeout >= 0 ? sslHandshakeTimeout : DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS;
 
     if (sslContext != null) {
       Preconditions.checkNotNull(timer, "timer is required when sslContext is present");
diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java
index 4252ad4537d..fe5c7625e5e 100644
--- a/processing/src/main/java/io/druid/query/QueryContexts.java
+++ b/processing/src/main/java/io/druid/query/QueryContexts.java
@@ -25,6 +25,8 @@
 import io.druid.java.util.common.IAE;
 import io.druid.java.util.common.Numbers;
 
+import java.util.concurrent.TimeUnit;
+
 @PublicApi
 public class QueryContexts
 {
@@ -41,7 +43,7 @@
   public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true;
   public static final int DEFAULT_PRIORITY = 0;
   public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0;
-  public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes
+  public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
   public static final long NO_TIMEOUT = 0;
 
   public static <T> boolean isBySegment(Query<T> query)
diff --git a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java
index b9f18a64558..0ea8fc875c1 100644
--- a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java
+++ b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java
@@ -38,7 +38,7 @@
 public class AsyncQueryRunnerTest
 {
 
-  private static final long TEST_TIMEOUT = 60000;
+  private static final long TEST_TIMEOUT_MILLIS = 60_000;
   
   private final ExecutorService executor;
   private final Query query;
@@ -53,7 +53,7 @@ public AsyncQueryRunnerTest()
               .build();
   }
   
-  @Test(timeout = TEST_TIMEOUT)
+  @Test(timeout = TEST_TIMEOUT_MILLIS)
   public void testAsyncNature()
   {
     final CountDownLatch latch = new CountDownLatch(1);
@@ -83,7 +83,7 @@ public Sequence run(QueryPlus queryPlus, Map responseContext)
     Assert.assertEquals(Collections.singletonList(1), lazy.toList());
   }
   
-  @Test(timeout = TEST_TIMEOUT)
+  @Test(timeout = TEST_TIMEOUT_MILLIS)
   public void testQueryTimeoutHonored()
   {
     QueryRunner baseRunner = new QueryRunner()
diff --git a/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java b/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java
index 83098484ee8..80f167e66e0 100644
--- a/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java
+++ b/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java
@@ -31,12 +31,13 @@
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
 
 import java.lang.annotation.Annotation;
+import java.util.concurrent.TimeUnit;
 
 /**
  */
 public class JettyHttpClientModule implements Module
 {
-  private static final long CLIENT_CONNECT_TIMEOUT = 500;
+  private static final long CLIENT_CONNECT_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.toMillis(500);
 
   public static JettyHttpClientModule global()
   {
@@ -120,7 +121,7 @@ public HttpClient get()
       httpClient.setIdleTimeout(config.getReadTimeout().getMillis());
       httpClient.setMaxConnectionsPerDestination(config.getNumConnections());
       httpClient.setMaxRequestsQueuedPerDestination(config.getNumRequestsQueued());
-      httpClient.setConnectTimeout(CLIENT_CONNECT_TIMEOUT);
+      httpClient.setConnectTimeout(CLIENT_CONNECT_TIMEOUT_MILLIS);
       httpClient.setRequestBufferSize(config.getRequestBuffersize());
       final QueuedThreadPool pool = new QueuedThreadPool(config.getNumMaxThreads());
       pool.setName(JettyHttpClientModule.class.getSimpleName() + "-threadPool-" + pool.hashCode());
diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
index 4b2eddc9154..306d6c88a74 100644
--- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
+++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
@@ -26,6 +26,7 @@
 
 import java.io.File;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 /**
  */
@@ -39,7 +40,7 @@
   private boolean deleteOnRemove = true;
 
   @JsonProperty("dropSegmentDelayMillis")
-  private int dropSegmentDelayMillis = 30 * 1000; // 30 seconds
+  private int dropSegmentDelayMillis = (int) TimeUnit.SECONDS.toMillis(30);
 
   @JsonProperty("announceIntervalMillis")
   private int announceIntervalMillis = 0; // do not background announce
diff --git a/server/src/main/java/io/druid/server/initialization/ServerConfig.java b/server/src/main/java/io/druid/server/initialization/ServerConfig.java
index 3827df969d8..0ef91cce1b0 100644
--- a/server/src/main/java/io/druid/server/initialization/ServerConfig.java
+++ b/server/src/main/java/io/druid/server/initialization/ServerConfig.java
@@ -26,6 +26,7 @@
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.Deflater;
 
 /**
@@ -52,7 +53,7 @@
 
   @JsonProperty
   @Min(0)
-  private long defaultQueryTimeout = 300_000; // 5 minutes
+  private long defaultQueryTimeout = TimeUnit.MINUTES.toMillis(5);
 
   @JsonProperty
   @Min(1)
diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 4d512e11080..b4005c37699 100644
--- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -73,7 +73,7 @@
 {
   private static final String DATA_SOURCE = "foo";
   private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
-  private static final long PUBLISH_TIMEOUT = 5000;
+  private static final long PUBLISH_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(5);
 
   private static final List<InputRow> ROWS = ImmutableList.of(
       new MapBasedInputRow(
@@ -153,7 +153,7 @@ public void testFailDuringPersist() throws IOException, InterruptedException, Ti
         StreamAppenderatorDriverTest.makeOkPublisher(),
         committerSupplier.get(),
         ImmutableList.of("dummy")
-    ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+    ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
   }
 
   @Test
@@ -191,7 +191,7 @@ public void testFailDuringPush() throws IOException, InterruptedException, Timeo
         StreamAppenderatorDriverTest.makeOkPublisher(),
         committerSupplier.get(),
         ImmutableList.of("dummy")
-    ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+    ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
   }
 
   @Test
@@ -229,7 +229,7 @@ public void testFailDuringDrop() throws IOException, InterruptedException, Timeo
         StreamAppenderatorDriverTest.makeOkPublisher(),
         committerSupplier.get(),
         ImmutableList.of("dummy")
-    ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+    ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
     driver.registerHandoff(published).get();
   }
@@ -312,7 +312,7 @@ private void testFailDuringPublishInternal(boolean failWithException) throws Exc
           StreamAppenderatorDriverTest.makeFailingPublisher(failWithException),
           committerSupplier.get(),
           ImmutableList.of("dummy")
-      ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+      ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
     }
     catch (Exception e) {
       throw e;
diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index acdbed52d69..04a6833c968 100644
--- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -70,8 +70,8 @@
   private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
   private static final int MAX_ROWS_IN_MEMORY = 100;
   private static final int MAX_ROWS_PER_SEGMENT = 3;
-  private static final long PUBLISH_TIMEOUT = 10000;
-  private static final long HANDOFF_CONDITION_TIMEOUT = 1000;
+  private static final long PUBLISH_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
+  private static final long HANDOFF_CONDITION_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1);
 
   private static final List<InputRow> ROWS = Arrays.asList(
       new MapBasedInputRow(
@@ -142,14 +142,14 @@ public void testSimple() throws Exception
         makeOkPublisher(),
         committerSupplier.get(),
         ImmutableList.of("dummy")
-    ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+    ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
     while (driver.getSegments().containsKey("dummy")) {
       Thread.sleep(100);
     }
 
     final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published)
-                                                          .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
+                                                          .get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
     Assert.assertEquals(
         ImmutableSet.of(
@@ -192,14 +192,14 @@ public void testMaxRowsPerSegment() throws Exception
         makeOkPublisher(),
         committerSupplier.get(),
         ImmutableList.of("dummy")
-    ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+    ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
     while (driver.getSegments().containsKey("dummy")) {
       Thread.sleep(100);
     }
 
     final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published)
-                                                          .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
+                                                          .get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
     Assert.assertEquals(numSegments, segmentsAndMetadata.getSegments().size());
     Assert.assertEquals(numSegments * MAX_ROWS_PER_SEGMENT, segmentsAndMetadata.getCommitMetadata());
   }
@@ -221,13 +221,13 @@ public void testHandoffTimeout() throws Exception
         makeOkPublisher(),
         committerSupplier.get(),
         ImmutableList.of("dummy")
-    ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+    ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
     while (driver.getSegments().containsKey("dummy")) {
       Thread.sleep(100);
     }
 
-    driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
+    driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
   }
 
   @Test
@@ -246,7 +246,7 @@ public void testPublishPerRow() throws IOException, InterruptedException, Timeou
           makeOkPublisher(),
           committerSupplier.get(),
           ImmutableList.of("dummy")
-      ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+      ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
       Assert.assertEquals(
           ImmutableSet.of(
@@ -267,7 +267,7 @@ public void testPublishPerRow() throws IOException, InterruptedException, Timeou
           makeOkPublisher(),
           committerSupplier.get(),
           ImmutableList.of("dummy")
-      ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+      ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
       Assert.assertEquals(
           ImmutableSet.of(
@@ -288,7 +288,7 @@ public void testPublishPerRow() throws IOException, InterruptedException, Timeou
         makeOkPublisher(),
         committerSupplier.get(),
         ImmutableList.of("dummy")
-    ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
+    ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
     Assert.assertEquals(
         ImmutableSet.of(),
@@ -326,11 +326,11 @@ public void testIncrementalHandoff() throws Exception
     );
 
     final SegmentsAndMetadata handedoffFromSequence0 = futureForSequence0.get(
-        HANDOFF_CONDITION_TIMEOUT,
+        HANDOFF_CONDITION_TIMEOUT_MILLIS,
         TimeUnit.MILLISECONDS
     );
     final SegmentsAndMetadata handedoffFromSequence1 = futureForSequence1.get(
-        HANDOFF_CONDITION_TIMEOUT,
+        HANDOFF_CONDITION_TIMEOUT_MILLIS,
         TimeUnit.MILLISECONDS
     );
 
diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
index 1556b2fe3ca..bbf92b199c2 100644
--- a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
@@ -134,7 +134,7 @@ public void testReadFromIndexAndWriteAnotherIndex() throws Exception
                     .build()
             )
         .setMaxRowCount(5000)
-        .buildOnheap();
+        .buildOnheap()
     ) {
       final StorageAdapter sa = new QueryableIndexStorageAdapter(qi);
       final WindowedStorageAdapter wsa = new WindowedStorageAdapter(sa, sa.getInterval());
@@ -224,7 +224,7 @@ private void createTestIndex(File segmentDir) throws Exception
                     .build()
             )
         .setMaxRowCount(5000)
-        .buildOnheap();
+        .buildOnheap()
     ) {
       for (String line : rows) {
         index.add(parser.parse(line));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org