You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by su...@apache.org on 2016/05/17 22:05:50 UTC
drill git commit: DRILL-4654: Add new metrics to the MetricRegistry
Repository: drill
Updated Branches:
refs/heads/master 09b262776 -> b075bf610
DRILL-4654: Add new metrics to the MetricRegistry
+ New metrics:
- drill.queries.enqueued
number of queries that have been submitted to the drillbit but have
not started
- drill.queries.running
number of running queries for which this drillbit is the foreman
- drill.queries.completed
number of completed queries (or cancelled or failed) for which this
drillbit was the foreman
- drill.fragments.running
number of query fragments that are running in the drillbit
- drill.allocator.root.used
amount of memory used in bytes by the internal memory allocator
- drill.allocator.root.peak
peak amount of memory used in bytes by the internal memory allocator
- fd.usage
ratio of used to total file descriptors (on *nix systems)
+ Rename "current" to "used" for RPC allocator current memory usage to
follow convention
+ Borrow SystemPropertyUtil class from Netty
+ Configure DrillMetrics through system properties
+ Remove unused methods and imports
closes #495
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b075bf61
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b075bf61
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b075bf61
Branch: refs/heads/master
Commit: b075bf6102e1561ee3755b483e1d1e6018d7e505
Parents: 09b2627
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Fri May 13 18:16:15 2016 -0700
Committer: Sudheesh Katkam <sk...@maprtech.com>
Committed: Tue May 17 14:59:21 2016 -0700
----------------------------------------------------------------------
.../apache/drill/exec/metrics/DrillMetrics.java | 88 +++++----
.../drill/exec/util/SystemPropertyUtil.java | 189 +++++++++++++++++++
common/src/main/resources/drill-module.conf | 14 +-
.../cache/VectorAccessibleSerializable.java | 2 +-
.../apache/drill/exec/ops/FragmentContext.java | 2 +-
.../apache/drill/exec/ops/FragmentStats.java | 3 +-
.../drill/exec/server/BootStrapContext.java | 6 +-
.../drill/exec/service/ServiceEngine.java | 9 +-
.../exec/store/schedule/BlockMapBuilder.java | 2 +-
.../org/apache/drill/exec/work/WorkManager.java | 30 ++-
.../apache/drill/exec/work/foreman/Foreman.java | 11 ++
.../drill/exec/memory/AllocationManager.java | 2 +-
.../apache/drill/exec/memory/RootAllocator.java | 14 ++
.../apache/drill/exec/rpc/TransportCheck.java | 2 +-
14 files changed, 284 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/b075bf61/common/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java b/common/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
index 568a97b..e046ef4 100644
--- a/common/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
+++ b/common/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
@@ -18,77 +18,86 @@
package org.apache.drill.exec.metrics;
import java.lang.management.ManagementFactory;
-import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
-import org.apache.drill.common.config.DrillConfig;
-
import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Slf4jReporter;
import com.codahale.metrics.jvm.BufferPoolMetricSet;
+import com.codahale.metrics.jvm.FileDescriptorRatioGauge;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
+import org.apache.drill.exec.util.SystemPropertyUtil;
-public class DrillMetrics {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillMetrics.class);
-
- public static final String METRICS_JMX_OUTPUT_ENABLED = "drill.metrics.jmx.enabled";
- public static final String METRICS_LOG_OUTPUT_ENABLED = "drill.metrics.log.enabled";
- public static final String METRICS_LOG_OUTPUT_INTERVAL = "drill.metrics.log.interval";
-
- static final DrillConfig config = DrillConfig.create();
+public final class DrillMetrics {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillMetrics.class);
- private DrillMetrics() {
- }
+ public static final boolean METRICS_JMX_OUTPUT_ENABLED =
+ SystemPropertyUtil.getBoolean("drill.metrics.jmx.enabled", true);
+ public static final boolean METRICS_LOG_OUTPUT_ENABLED =
+ SystemPropertyUtil.getBoolean("drill.metrics.log.enabled", false);
+ public static final int METRICS_LOG_OUTPUT_INTERVAL =
+ SystemPropertyUtil.getInt("drill.metrics.log.interval", 60);
private static class RegistryHolder {
- public static final MetricRegistry REGISTRY;
+
+ private static final MetricRegistry REGISTRY;
private static final JmxReporter JMX_REPORTER;
private static final Slf4jReporter LOG_REPORTER;
static {
REGISTRY = new MetricRegistry();
- registerSysStats();
+ registerSystemMetrics();
JMX_REPORTER = getJmxReporter();
LOG_REPORTER = getLogReporter();
}
- private static void registerSysStats(){
+ private static void registerSystemMetrics() {
REGISTRY.registerAll(new GarbageCollectorMetricSet());
REGISTRY.registerAll(new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()));
REGISTRY.registerAll(new MemoryUsageGaugeSet());
REGISTRY.registerAll(new ThreadStatesGaugeSet());
+ register("fd.usage", new FileDescriptorRatioGauge());
}
private static JmxReporter getJmxReporter() {
- if (config.getBoolean(METRICS_JMX_OUTPUT_ENABLED)) {
- JmxReporter reporter = JmxReporter.forRegistry(getInstance()).build();
+ if (METRICS_JMX_OUTPUT_ENABLED) {
+ JmxReporter reporter = JmxReporter.forRegistry(REGISTRY).build();
reporter.start();
return reporter;
- } else {
- return null;
}
+ return null;
}
private static Slf4jReporter getLogReporter() {
- if (config.getBoolean(METRICS_LOG_OUTPUT_ENABLED)) {
- Slf4jReporter reporter = Slf4jReporter.forRegistry(getInstance()).outputTo(logger)
- .convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).build();
- reporter.start(config.getInt(METRICS_LOG_OUTPUT_INTERVAL), TimeUnit.SECONDS);
+ if (METRICS_LOG_OUTPUT_ENABLED) {
+ Slf4jReporter reporter = Slf4jReporter.forRegistry(REGISTRY)
+ .outputTo(logger)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+ reporter.start(METRICS_LOG_OUTPUT_INTERVAL, TimeUnit.SECONDS);
return reporter;
- } else {
- return null;
}
+ return null;
}
}
+ /**
+ * Note: For counters, histograms, meters and timers, use get or create methods on {@link #getRegistry the
+ * registry} (e.g. {@link MetricRegistry#counter}). For {@link com.codahale.metrics.Gauge gauges} or custom
+ * metric implementations use this method. The registry does not allow registering multiple metrics with
+ * the same name, which is a problem when multiple drillbits are started in the same JVM (e.g. unit tests).
+ *
+ * @param name metric name
+ * @param metric metric instance
+ * @param <T> metric type
+ */
public synchronized static <T extends Metric> void register(String name, T metric) {
boolean removed = RegistryHolder.REGISTRY.remove(name);
if (removed) {
@@ -97,27 +106,16 @@ public class DrillMetrics {
RegistryHolder.REGISTRY.register(name, metric);
}
- private static void registerAll(String prefix, MetricSet metricSet, MetricRegistry registry) {
- for (Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
- if (entry.getValue() instanceof MetricSet) {
- registerAll(prefix + "." + entry.getKey(), (MetricSet) entry.getValue(), registry);
- } else {
- registry.register(prefix + "." + entry.getKey(), entry.getValue());
- }
- }
- }
-
- public static MetricRegistry getInstance() {
+ public static MetricRegistry getRegistry() {
return RegistryHolder.REGISTRY;
}
- public static void resetMetrics(){
- RegistryHolder.REGISTRY.removeMatching(new MetricFilter(){
- @Override
- public boolean matches(String name, Metric metric) {
- return true;
- }});
- RegistryHolder.registerSysStats();
+ public static void resetMetrics() {
+ RegistryHolder.REGISTRY.removeMatching(MetricFilter.ALL);
+ RegistryHolder.registerSystemMetrics();
}
+ // prevents instantiation
+ private DrillMetrics() {
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/b075bf61/common/src/main/java/org/apache/drill/exec/util/SystemPropertyUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/exec/util/SystemPropertyUtil.java b/common/src/main/java/org/apache/drill/exec/util/SystemPropertyUtil.java
new file mode 100644
index 0000000..1b06778
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/exec/util/SystemPropertyUtil.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.regex.Pattern;
+
+/**
+ * A collection of utility methods to retrieve and parse the values of Java system properties.
+ *
+ * This is a modified version of Netty's internal system property utility class.
+ */
+public final class SystemPropertyUtil {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemPropertyUtil.class);
+
+ private static final Pattern INTEGER_PATTERN = Pattern.compile("-?[0-9]+");
+
+ /**
+ * Returns {@code true} if and only if the system property with the specified {@code key}
+ * exists.
+ */
+ public static boolean contains(String key) {
+ return get(key) != null;
+ }
+
+ /**
+ * Returns the value of the Java system property with the specified
+ * {@code key}, while falling back to {@code null} if the property access fails.
+ *
+ * @return the property value or {@code null}
+ */
+ public static String get(String key) {
+ return get(key, null);
+ }
+
+ /**
+ * Returns the value of the Java system property with the specified
+ * {@code key}, while falling back to the specified default value if
+ * the property access fails.
+ *
+ * @return the property value.
+ * {@code def} if there's no such property or if an access to the
+ * specified property is not allowed.
+ */
+ public static String get(final String key, String def) {
+ if (key == null) {
+ throw new NullPointerException("key");
+ }
+ if (key.isEmpty()) {
+ throw new IllegalArgumentException("key must not be empty.");
+ }
+
+ String value = null;
+ try {
+ if (System.getSecurityManager() == null) {
+ value = System.getProperty(key);
+ } else {
+ value = AccessController.doPrivileged(new PrivilegedAction<String>() {
+ @Override
+ public String run() {
+ return System.getProperty(key);
+ }
+ });
+ }
+ } catch (Exception e) {
+ logger.warn("Unable to retrieve a system property '" + key + "'; default values will be used.", e);
+ }
+
+ if (value == null) {
+ return def;
+ }
+
+ return value;
+ }
+
+ /**
+ * Returns the value of the Java system property with the specified
+ * {@code key}, while falling back to the specified default value if
+ * the property access fails.
+ *
+ * @return the property value.
+ * {@code def} if there's no such property or if an access to the
+ * specified property is not allowed.
+ */
+ public static boolean getBoolean(String key, boolean def) {
+ String value = get(key);
+ if (value == null) {
+ return def;
+ }
+
+ value = value.trim().toLowerCase();
+ if (value.isEmpty()) {
+ return true;
+ }
+
+ if ("true".equals(value) || "yes".equals(value) || "1".equals(value)) {
+ return true;
+ }
+
+ if ("false".equals(value) || "no".equals(value) || "0".equals(value)) {
+ return false;
+ }
+
+ logger.warn("Unable to parse the boolean system property '{}':{} - using the default value: {}",
+ key, value, def);
+
+ return def;
+ }
+
+ /**
+ * Returns the value of the Java system property with the specified
+ * {@code key}, while falling back to the specified default value if
+ * the property access fails.
+ *
+ * @return the property value.
+ * {@code def} if there's no such property or if an access to the
+ * specified property is not allowed.
+ */
+ public static int getInt(String key, int def) {
+ String value = get(key);
+ if (value == null) {
+ return def;
+ }
+
+ value = value.trim().toLowerCase();
+ if (INTEGER_PATTERN.matcher(value).matches()) {
+ try {
+ return Integer.parseInt(value);
+ } catch (Exception e) {
+ // Ignore
+ }
+ }
+
+ logger.warn("Unable to parse the integer system property '{}':{} - using the default value: {}",
+ key, value, def);
+
+ return def;
+ }
+
+ /**
+ * Returns the value of the Java system property with the specified
+ * {@code key}, while falling back to the specified default value if
+ * the property access fails.
+ *
+ * @return the property value.
+ * {@code def} if there's no such property or if an access to the
+ * specified property is not allowed.
+ */
+ public static long getLong(String key, long def) {
+ String value = get(key);
+ if (value == null) {
+ return def;
+ }
+
+ value = value.trim().toLowerCase();
+ if (INTEGER_PATTERN.matcher(value).matches()) {
+ try {
+ return Long.parseLong(value);
+ } catch (Exception e) {
+ // Ignore
+ }
+ }
+
+ logger.warn("Unable to parse the long integer system property '{}':{} - using the default value: {}",
+ key, value, def);
+
+ return def;
+ }
+
+ // prevent instantiation
+ private SystemPropertyUtil() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/b075bf61/common/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/common/src/main/resources/drill-module.conf b/common/src/main/resources/drill-module.conf
index 0f0e4f7..f8226f8 100644
--- a/common/src/main/resources/drill-module.conf
+++ b/common/src/main/resources/drill-module.conf
@@ -29,17 +29,5 @@ drill {
org.apache.drill.exec.store.mock,
org.apache.drill.common.logical
]
- },
-
- metrics : {
- context: "drillbit",
- jmx: {
- enabled : true
- },
- log: {
- enabled : false,
- interval : 60
- }
- },
-
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/b075bf61/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 71c904d..e3bf5bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -47,7 +47,7 @@ import com.google.common.collect.Lists;
*/
public class VectorAccessibleSerializable extends AbstractStreamSerializable {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class);
- static final MetricRegistry metrics = DrillMetrics.getInstance();
+ static final MetricRegistry metrics = DrillMetrics.getRegistry();
static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
private VectorContainer va;
http://git-wip-us.apache.org/repos/asf/drill/blob/b075bf61/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 304ecf1..8506b91 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -171,7 +171,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
}
- stats = new FragmentStats(allocator, dbContext.getMetrics(), fragment.getAssignment());
+ stats = new FragmentStats(allocator, fragment.getAssignment());
bufferManager = new BufferManagerImpl(this.allocator);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/b075bf61/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
index a5a334f..a173073 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
@@ -23,7 +23,6 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
-import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Lists;
/**
@@ -37,7 +36,7 @@ public class FragmentStats {
private final DrillbitEndpoint endpoint;
private final BufferAllocator allocator;
- public FragmentStats(BufferAllocator allocator, MetricRegistry metrics, DrillbitEndpoint endpoint) {
+ public FragmentStats(BufferAllocator allocator, DrillbitEndpoint endpoint) {
this.startTime = System.currentTimeMillis();
this.endpoint = endpoint;
this.allocator = allocator;
http://git-wip-us.apache.org/repos/asf/drill/blob/b075bf61/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
index 3f6814e..6554e33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.server;
+import com.codahale.metrics.MetricRegistry;
import io.netty.channel.EventLoopGroup;
import java.util.concurrent.ExecutorService;
@@ -34,8 +35,6 @@ import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.rpc.NamedThreadFactory;
import org.apache.drill.exec.rpc.TransportCheck;
-import com.codahale.metrics.MetricRegistry;
-
public class BootStrapContext implements AutoCloseable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BootStrapContext.class);
@@ -52,7 +51,8 @@ public class BootStrapContext implements AutoCloseable {
this.classpathScan = classpathScan;
this.loop = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitServer-");
this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitClient-");
- this.metrics = DrillMetrics.getInstance();
+ // Note that metrics are stored in a static instance
+ this.metrics = DrillMetrics.getRegistry();
this.allocator = RootAllocatorFactory.newRoot(config);
this.executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
http://git-wip-us.apache.org/repos/asf/drill/blob/b075bf61/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index 17edbc2..d505546 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
@@ -92,9 +91,9 @@ public class ServiceEngine implements AutoCloseable {
}
- private final void registerMetrics(final MetricRegistry registry) {
+ private void registerMetrics(final MetricRegistry registry) {
final String prefix = PooledByteBufAllocatorL.METRIC_PREFIX + "rpc.";
- DrillMetrics.register(prefix + "user.current", new Gauge<Long>() {
+ DrillMetrics.register(prefix + "user.used", new Gauge<Long>() {
@Override
public Long getValue() {
return userAllocator.getAllocatedMemory();
@@ -106,7 +105,7 @@ public class ServiceEngine implements AutoCloseable {
return userAllocator.getPeakMemoryAllocation();
}
});
- DrillMetrics.register(prefix + "bit.control.current", new Gauge<Long>() {
+ DrillMetrics.register(prefix + "bit.control.used", new Gauge<Long>() {
@Override
public Long getValue() {
return controlAllocator.getAllocatedMemory();
@@ -119,7 +118,7 @@ public class ServiceEngine implements AutoCloseable {
}
});
- DrillMetrics.register(prefix + "bit.data.current", new Gauge<Long>() {
+ DrillMetrics.register(prefix + "bit.data.used", new Gauge<Long>() {
@Override
public Long getValue() {
return dataAllocator.getAllocatedMemory();
http://git-wip-us.apache.org/repos/asf/drill/blob/b075bf61/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
index 7a6825f..829bcd8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
@@ -48,7 +48,7 @@ import com.google.common.collect.Range;
public class BlockMapBuilder {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BlockMapBuilder.class);
- static final MetricRegistry metrics = DrillMetrics.getInstance();
+ static final MetricRegistry metrics = DrillMetrics.getRegistry();
static final String BLOCK_MAP_BUILDER_TIMER = MetricRegistry.name(BlockMapBuilder.class, "blockMapBuilderTimer");
private final Map<Path,ImmutableRangeMap<Long,BlockLocation>> blockMapMap = Maps.newConcurrentMap();
http://git-wip-us.apache.org/repos/asf/drill/blob/b075bf61/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index f2305c4..e910150 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -23,9 +23,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
+import com.codahale.metrics.Counter;
import org.apache.drill.common.SelfCleaningRunnable;
import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -48,7 +50,6 @@ import org.apache.drill.exec.work.fragment.FragmentManager;
import org.apache.drill.exec.work.user.UserWorker;
import com.codahale.metrics.Gauge;
-import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -105,19 +106,13 @@ public class WorkManager implements AutoCloseable {
dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, workBus, provider);
statusThread.start();
- // TODO remove try block once metrics moved from singleton, For now catch to avoid unit test failures
- try {
- dContext.getMetrics().register(
- MetricRegistry.name("drill.exec.work.running_fragments." + dContext.getEndpoint().getUserPort()),
- new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return runningFragments.size();
- }
- });
- } catch (final IllegalArgumentException e) {
- logger.warn("Exception while registering metrics", e);
- }
+ DrillMetrics.register("drill.fragments.running",
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return runningFragments.size();
+ }
+ });
}
public Executor getExecutor() {
@@ -144,8 +139,9 @@ public class WorkManager implements AutoCloseable {
public void close() throws Exception {
statusThread.interrupt();
- if (!runningFragments.isEmpty()) {
- logger.warn("Closing WorkManager but there are {} running fragments.", runningFragments.size());
+ final long numRunningFragments = runningFragments.size();
+ if (numRunningFragments != 0) {
+ logger.warn("Closing WorkManager but there are {} running fragments.", numRunningFragments);
if (logger.isDebugEnabled()) {
for (final FragmentHandle handle : runningFragments.keySet()) {
logger.debug("Fragment still running: {} status: {}", QueryIdHelper.getQueryIdentifier(handle),
@@ -256,7 +252,7 @@ public class WorkManager implements AutoCloseable {
/**
* Currently used to start a root fragment that is blocked on data, and intermediate fragments. This method is
- * called, when the first batch arrives, by {@link org.apache.drill.exec.rpc.data.DataResponseHandlerImpl#handle}
+ * called, when the first batch arrives, by {@link org.apache.drill.exec.rpc.data.DataServer#handle}
* @param fragmentManager the manager for the fragment
*/
public void startFragmentPendingRemote(final FragmentManager fragmentManager) {
http://git-wip-us.apache.org/repos/asf/drill/blob/b075bf61/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 5137cde..e7defec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.work.foreman;
+import com.codahale.metrics.Counter;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
@@ -44,6 +45,7 @@ import org.apache.drill.exec.coord.DistributedSemaphore;
import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
import org.apache.drill.exec.exception.OptimizerException;
import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.opt.BasicOptimizer;
@@ -115,6 +117,10 @@ public class Foreman implements Runnable {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000;
+ private static final Counter enqueuedQueries = DrillMetrics.getRegistry().counter("drill.queries.enqueued");
+ private static final Counter runningQueries = DrillMetrics.getRegistry().counter("drill.queries.running");
+ private static final Counter completedQueries = DrillMetrics.getRegistry().counter("drill.queries.completed");
+
private final QueryId queryId;
private final String queryIdString;
private final RunQuery queryRequest;
@@ -170,6 +176,7 @@ public class Foreman implements Runnable {
final QueryState initialState = queuingEnabled ? QueryState.ENQUEUED : QueryState.STARTING;
recordNewState(initialState);
+ enqueuedQueries.inc();
}
private class ConnectionClosedListener implements GenericFutureListener<Future<Void>> {
@@ -233,6 +240,8 @@ public class Foreman implements Runnable {
// track how long the query takes
queryManager.markStartTime();
+ enqueuedQueries.dec();
+ runningQueries.inc();
try {
injector.injectChecked(queryContext.getExecutionControls(), "run-try-beginning", ForemanException.class);
@@ -809,6 +818,8 @@ public class Foreman implements Runnable {
logger.warn("unable to close query manager", e);
}
+ runningQueries.dec();
+ completedQueries.inc();
try {
releaseLease();
} finally {
http://git-wip-us.apache.org/repos/asf/drill/blob/b075bf61/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
index f63aade..34647f9 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
@@ -57,7 +57,7 @@ public class AllocationManager {
private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0);
private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0);
- static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(DrillMetrics.getInstance());
+ static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(DrillMetrics.getRegistry());
private final RootAllocator root;
private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet();
http://git-wip-us.apache.org/repos/asf/drill/blob/b075bf61/exec/memory/base/src/main/java/org/apache/drill/exec/memory/RootAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/RootAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/RootAllocator.java
index 5ab4130..0671702 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/RootAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/RootAllocator.java
@@ -17,7 +17,9 @@
*/
package org.apache.drill.exec.memory;
+import com.codahale.metrics.Gauge;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.drill.exec.metrics.DrillMetrics;
/**
* The root allocator for using direct memory inside a Drillbit. Supports creating a
@@ -27,6 +29,18 @@ public class RootAllocator extends BaseAllocator {
public RootAllocator(final long limit) {
super(null, "ROOT", 0, limit);
+ DrillMetrics.register("drill.allocator.root.used", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return getAllocatedMemory();
+ }
+ });
+ DrillMetrics.register("drill.allocator.root.peak", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return getPeakMemoryAllocation();
+ }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/b075bf61/exec/rpc/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java
index cdc441d..c789af8 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java
@@ -26,7 +26,7 @@ import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.internal.SystemPropertyUtil;
+import org.apache.drill.exec.util.SystemPropertyUtil;
import java.util.Locale;