You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/01/08 21:42:52 UTC

[GitHub] gianm closed pull request #5541: Added merge/buffer metric to print buffer pool size every minute

gianm closed pull request #5541: Added merge/buffer metric to print buffer pool size every minute
URL: https://github.com/apache/incubator-druid/pull/5541
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/io/druid/collections/BlockingPool.java b/common/src/main/java/io/druid/collections/BlockingPool.java
index 99662946e1a..282c9241f97 100644
--- a/common/src/main/java/io/druid/collections/BlockingPool.java
+++ b/common/src/main/java/io/druid/collections/BlockingPool.java
@@ -26,6 +26,11 @@
 {
   int maxSize();
 
+  /**
+   * @return number of buffers used/polled from the pool at that time.
+   */
+  int getUsedBufferCount();
+
   /**
    * Take a resource from the pool, waiting up to the
    * specified wait time if necessary for an element to become available.
diff --git a/common/src/main/java/io/druid/collections/DefaultBlockingPool.java b/common/src/main/java/io/druid/collections/DefaultBlockingPool.java
index e5efc70eacf..00a0294d036 100644
--- a/common/src/main/java/io/druid/collections/DefaultBlockingPool.java
+++ b/common/src/main/java/io/druid/collections/DefaultBlockingPool.java
@@ -74,6 +74,12 @@ public int getPoolSize()
     return objects.size();
   }
 
+  @Override
+  public int getUsedBufferCount()
+  {
+    return maxSize - objects.size();
+  }
+
   @Override
   @Nullable
   public ReferenceCountingResourceHolder<T> take(final long timeoutMs)
diff --git a/common/src/main/java/io/druid/collections/DummyBlockingPool.java b/common/src/main/java/io/druid/collections/DummyBlockingPool.java
index e128d5713f7..6891596750f 100644
--- a/common/src/main/java/io/druid/collections/DummyBlockingPool.java
+++ b/common/src/main/java/io/druid/collections/DummyBlockingPool.java
@@ -44,6 +44,12 @@ public int maxSize()
     return 0;
   }
 
+  @Override
+  public int getUsedBufferCount()
+  {
+    return 0;
+  }
+
   @Override
   public ReferenceCountingResourceHolder<T> take(long timeoutMs)
   {
diff --git a/docs/content/operations/metrics.md b/docs/content/operations/metrics.md
index d2c802d0444..a304ba2a977 100644
--- a/docs/content/operations/metrics.md
+++ b/docs/content/operations/metrics.md
@@ -37,6 +37,7 @@ Available Metrics
 |`query/success/count`|number of queries successfully processed|This metric is only available if the QueryCountStatsMonitor module is included.||
 |`query/failed/count`|number of failed queries|This metric is only available if the QueryCountStatsMonitor module is included.||
 |`query/interrupted/count`|number of queries interrupted due to cancellation or timeout|This metric is only available if the QueryCountStatsMonitor module is included.||
+|`query/merge/buffersUsed`|number of merge buffers allocated to broker while performing groupBy merge queries|||
 
 ### Historical
 
diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingExecutorService.java b/processing/src/main/java/io/druid/query/MetricsEmittingExecutorService.java
index da39026fa62..e0cefe53f74 100644
--- a/processing/src/main/java/io/druid/query/MetricsEmittingExecutorService.java
+++ b/processing/src/main/java/io/druid/query/MetricsEmittingExecutorService.java
@@ -28,18 +28,18 @@
 import java.util.concurrent.Callable;
 
 public class MetricsEmittingExecutorService extends ForwardingListeningExecutorService
-    implements ExecutorServiceMonitor.MetricEmitter
+    implements ProcessingMonitor.MetricEmitter
 {
   private final ListeningExecutorService delegate;
 
   public MetricsEmittingExecutorService(
       ListeningExecutorService delegate,
-      ExecutorServiceMonitor executorServiceMonitor
+      ProcessingMonitor processingMonitor
   )
   {
     super();
     this.delegate = delegate;
-    executorServiceMonitor.add(this);
+    processingMonitor.add(this);
   }
 
   @Override
diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingMergingBlockingPool.java b/processing/src/main/java/io/druid/query/MetricsEmittingMergingBlockingPool.java
new file mode 100644
index 00000000000..df6f1b7634e
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/MetricsEmittingMergingBlockingPool.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query;
+
+import io.druid.collections.DefaultBlockingPool;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+import io.druid.java.util.emitter.service.ServiceMetricEvent;
+import com.google.common.base.Supplier;
+
+public class MetricsEmittingMergingBlockingPool<T> extends DefaultBlockingPool<T>
+    implements ProcessingMonitor.MetricEmitter
+{
+  public MetricsEmittingMergingBlockingPool(
+          Supplier<T> generator,
+          int limit,
+          ProcessingMonitor processingMonitor)
+  {
+    super(generator, limit);
+    processingMonitor.add(this);
+  }
+
+  @Override
+  public void emitMetrics(ServiceEmitter emitter, ServiceMetricEvent.Builder metricBuilder)
+  {
+    emitter.emit(metricBuilder.build("query/merge/buffersUsed", getUsedBufferCount()));
+  }
+}
diff --git a/processing/src/main/java/io/druid/query/ExecutorServiceMonitor.java b/processing/src/main/java/io/druid/query/ProcessingMonitor.java
similarity index 94%
rename from processing/src/main/java/io/druid/query/ExecutorServiceMonitor.java
rename to processing/src/main/java/io/druid/query/ProcessingMonitor.java
index 2c227884e73..701aa86f777 100644
--- a/processing/src/main/java/io/druid/query/ExecutorServiceMonitor.java
+++ b/processing/src/main/java/io/druid/query/ProcessingMonitor.java
@@ -27,14 +27,14 @@
 
 import java.util.List;
 
-public class ExecutorServiceMonitor extends AbstractMonitor
+public class ProcessingMonitor extends AbstractMonitor
 {
 
   private final List<MetricEmitter> metricEmitters;
   private final ServiceMetricEvent.Builder metricBuilder;
 
   @Inject
-  public ExecutorServiceMonitor()
+  public ProcessingMonitor()
   {
     this.metricEmitters = Lists.newArrayList();
     this.metricBuilder = new ServiceMetricEvent.Builder();
diff --git a/server/src/main/java/io/druid/guice/DruidProcessingModule.java b/server/src/main/java/io/druid/guice/DruidProcessingModule.java
index 5d8f369652d..32b2c1ea124 100644
--- a/server/src/main/java/io/druid/guice/DruidProcessingModule.java
+++ b/server/src/main/java/io/druid/guice/DruidProcessingModule.java
@@ -27,7 +27,6 @@
 import com.google.inject.ProvisionException;
 import io.druid.client.cache.CacheConfig;
 import io.druid.collections.BlockingPool;
-import io.druid.collections.DefaultBlockingPool;
 import io.druid.collections.NonBlockingPool;
 import io.druid.collections.StupidPool;
 import io.druid.common.utils.VMUtils;
@@ -41,9 +40,10 @@
 import io.druid.java.util.common.logger.Logger;
 import io.druid.offheap.OffheapBufferGenerator;
 import io.druid.query.DruidProcessingConfig;
-import io.druid.query.ExecutorServiceMonitor;
 import io.druid.query.MetricsEmittingExecutorService;
 import io.druid.query.PrioritizedExecutorService;
+import io.druid.query.ProcessingMonitor;
+import io.druid.query.MetricsEmittingMergingBlockingPool;
 import io.druid.server.metrics.MetricsModule;
 
 import java.nio.ByteBuffer;
@@ -60,7 +60,7 @@
   public void configure(Binder binder)
   {
     binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class);
-    MetricsModule.register(binder, ExecutorServiceMonitor.class);
+    MetricsModule.register(binder, ProcessingMonitor.class);
   }
 
   @Provides
@@ -89,7 +89,7 @@ public ExecutorService getBackgroundExecutorService(
   @ManageLifecycle
   public ExecutorService getProcessingExecutorService(
       DruidProcessingConfig config,
-      ExecutorServiceMonitor executorServiceMonitor,
+      ProcessingMonitor processingMonitor,
       Lifecycle lifecycle
   )
   {
@@ -98,7 +98,7 @@ public ExecutorService getProcessingExecutorService(
             lifecycle,
             config
         ),
-        executorServiceMonitor
+        processingMonitor
     );
   }
 
@@ -119,12 +119,16 @@ public ExecutorService getProcessingExecutorService(
   @Provides
   @LazySingleton
   @Merging
-  public BlockingPool<ByteBuffer> getMergeBufferPool(DruidProcessingConfig config)
+  public BlockingPool<ByteBuffer> getMergeBufferPool(
+          DruidProcessingConfig config,
+          ProcessingMonitor processingMonitor
+  )
   {
     verifyDirectMemory(config);
-    return new DefaultBlockingPool<>(
+    return new MetricsEmittingMergingBlockingPool(
         new OffheapBufferGenerator("result merging", config.intermediateComputeSizeBytes()),
-        config.getNumMergeBuffers()
+        config.getNumMergeBuffers(),
+        processingMonitor
     );
   }
 
diff --git a/server/src/main/java/io/druid/guice/RouterProcessingModule.java b/server/src/main/java/io/druid/guice/RouterProcessingModule.java
index fbc90714bbc..adc38a46c2f 100644
--- a/server/src/main/java/io/druid/guice/RouterProcessingModule.java
+++ b/server/src/main/java/io/druid/guice/RouterProcessingModule.java
@@ -35,7 +35,7 @@
 import io.druid.java.util.common.concurrent.ExecutorServiceConfig;
 import io.druid.java.util.common.logger.Logger;
 import io.druid.query.DruidProcessingConfig;
-import io.druid.query.ExecutorServiceMonitor;
+import io.druid.query.ProcessingMonitor;
 import io.druid.server.metrics.MetricsModule;
 
 import java.nio.ByteBuffer;
@@ -55,7 +55,7 @@
   public void configure(Binder binder)
   {
     binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class);
-    MetricsModule.register(binder, ExecutorServiceMonitor.class);
+    MetricsModule.register(binder, ProcessingMonitor.class);
   }
 
   @Provides
diff --git a/server/src/main/java/io/druid/server/metrics/MetricsModule.java b/server/src/main/java/io/druid/server/metrics/MetricsModule.java
index 6ad8ef43b59..546dcf814b3 100644
--- a/server/src/main/java/io/druid/server/metrics/MetricsModule.java
+++ b/server/src/main/java/io/druid/server/metrics/MetricsModule.java
@@ -40,7 +40,7 @@
 import io.druid.guice.ManageLifecycle;
 import io.druid.java.util.common.concurrent.Execs;
 import io.druid.java.util.common.logger.Logger;
-import io.druid.query.ExecutorServiceMonitor;
+import io.druid.query.ProcessingMonitor;
 
 import java.util.List;
 import java.util.Set;
@@ -69,7 +69,7 @@ public void configure(Binder binder)
     binder.bind(DataSourceTaskIdHolder.class).in(LazySingleton.class);
 
     binder.bind(EventReceiverFirehoseRegister.class).in(LazySingleton.class);
-    binder.bind(ExecutorServiceMonitor.class).in(LazySingleton.class);
+    binder.bind(ProcessingMonitor.class).in(LazySingleton.class);
 
     // Instantiate eagerly so that we get everything registered and put into the Lifecycle
     binder.bind(Key.get(MonitorScheduler.class, Names.named("ForTheEagerness")))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org