You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2022/03/02 07:38:18 UTC

[pinot] branch master updated: Adding NoopPinotMetricFactory and corresponding changes (#8270)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fd9c58a  Adding NoopPinotMetricFactory and corresponding changes (#8270)
fd9c58a is described below

commit fd9c58a11ed16d27109baefcee138eea30132ad3
Author: Xiang Fu <xi...@gmail.com>
AuthorDate: Tue Mar 1 23:37:41 2022 -0800

    Adding NoopPinotMetricFactory and corresponding changes (#8270)
---
 .../presto/PinotScatterGatherQueryClient.java      |  49 ++--
 .../presto/grpc/PinotStreamingQueryClient.java     |  13 -
 .../plugin/metrics/NoopPinotMetricFactory.java     | 292 +++++++++++++++++++++
 3 files changed, 317 insertions(+), 37 deletions(-)

diff --git a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/PinotScatterGatherQueryClient.java b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/PinotScatterGatherQueryClient.java
index 8846be4..6cb45ec 100644
--- a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/PinotScatterGatherQueryClient.java
+++ b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/PinotScatterGatherQueryClient.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.connector.presto;
 
+import com.google.common.collect.ImmutableMap;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -35,15 +36,19 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.PinotMetricUtils;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.connector.presto.plugin.metrics.NoopPinotMetricFactory;
 import org.apache.pinot.core.transport.AsyncQueryResponse;
 import org.apache.pinot.core.transport.QueryRouter;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.core.transport.ServerResponse;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
 
+import static org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_METRICS_FACTORY_CLASS_NAME;
+
 
 public class PinotScatterGatherQueryClient {
   private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler();
@@ -72,8 +77,7 @@ public class PinotScatterGatherQueryClient {
     }
   }
 
-  public static class PinotException
-      extends RuntimeException {
+  public static class PinotException extends RuntimeException {
     private final ErrorCode _errorCode;
 
     public PinotException(ErrorCode errorCode, String message, Throwable t) {
@@ -188,6 +192,8 @@ public class PinotScatterGatherQueryClient {
 
   public PinotScatterGatherQueryClient(Config pinotConfig) {
     _prestoHostId = getDefaultPrestoId();
+    PinotMetricUtils.init(new PinotConfiguration(
+        ImmutableMap.of(CONFIG_OF_METRICS_FACTORY_CLASS_NAME, NoopPinotMetricFactory.class.getName())));
     _brokerMetrics = new BrokerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
     _brokerMetrics.initializeGlobalMeters();
     TlsConfig tlsConfig = getTlsConfig(pinotConfig);
@@ -240,13 +246,8 @@ public class PinotScatterGatherQueryClient {
     return defaultBrokerId;
   }
 
-  public Map<ServerInstance, DataTable> queryPinotServerForDataTable(
-      String pql,
-      String serverHost,
-      List<String> segments,
-      long connectionTimeoutInMillis,
-      boolean ignoreEmptyResponses,
-      int pinotRetryCount) {
+  public Map<ServerInstance, DataTable> queryPinotServerForDataTable(String pql, String serverHost,
+      List<String> segments, long connectionTimeoutInMillis, boolean ignoreEmptyResponses, int pinotRetryCount) {
     BrokerRequest brokerRequest;
     try {
       brokerRequest = REQUEST_COMPILER.compileToBrokerRequest(pql);
@@ -260,7 +261,7 @@ public class PinotScatterGatherQueryClient {
         new ArrayList<>(segments));
 
     // Unfortunately the retries will all hit the same server because the routing decision has already been made by
-      // the pinot broker
+    // the pinot broker
     Map<ServerInstance, DataTable> serverResponseMap = doWithRetries(pinotRetryCount, (requestId) -> {
       String rawTableName = TableNameBuilder.extractRawTableName(brokerRequest.getQuerySource().getTableName());
       if (!_concurrentQueriesCountMap.containsKey(serverHost)) {
@@ -276,17 +277,17 @@ public class PinotScatterGatherQueryClient {
       QueryRouter nextAvailableQueryRouter = getNextAvailableQueryRouter();
       if (TableNameBuilder.getTableTypeFromTableName(brokerRequest.getQuerySource().getTableName())
           == TableType.REALTIME) {
-        asyncQueryResponse = nextAvailableQueryRouter.submitQuery(requestId, rawTableName, null, null, brokerRequest,
-            routingTable, connectionTimeoutInMillis);
+        asyncQueryResponse =
+            nextAvailableQueryRouter.submitQuery(requestId, rawTableName, null, null, brokerRequest, routingTable,
+                connectionTimeoutInMillis);
       } else {
-        asyncQueryResponse = nextAvailableQueryRouter
-            .submitQuery(requestId, rawTableName, brokerRequest, routingTable, null, null, connectionTimeoutInMillis);
+        asyncQueryResponse =
+            nextAvailableQueryRouter.submitQuery(requestId, rawTableName, brokerRequest, routingTable, null, null,
+                connectionTimeoutInMillis);
       }
-      Map<ServerInstance, DataTable> serverInstanceDataTableMap = gatherServerResponses(
-          ignoreEmptyResponses,
-          routingTable,
-          asyncQueryResponse,
-          brokerRequest.getQuerySource().getTableName());
+      Map<ServerInstance, DataTable> serverInstanceDataTableMap =
+          gatherServerResponses(ignoreEmptyResponses, routingTable, asyncQueryResponse,
+              brokerRequest.getQuerySource().getTableName());
       _queryRouters.offer(nextAvailableQueryRouter);
       _concurrentQueriesCountMap.get(serverHost).decrementAndGet();
       return serverInstanceDataTableMap;
@@ -320,15 +321,15 @@ public class PinotScatterGatherQueryClient {
                 : entry.getValue().toString();
             routingTableForLogging.put(entry.getKey().toString(), valueToPrint);
           });
-          throw new PinotException(ErrorCode.PINOT_INSUFFICIENT_SERVER_RESPONSE, String
-              .format("%d of %d servers responded with routing table servers: %s, query stats: %s",
+          throw new PinotException(ErrorCode.PINOT_INSUFFICIENT_SERVER_RESPONSE,
+              String.format("%d of %d servers responded with routing table servers: %s, query stats: %s",
                   queryResponses.size(), routingTable.size(), routingTableForLogging, asyncQueryResponse.getStats()));
         }
       }
       Map<ServerInstance, DataTable> serverResponseMap = new HashMap<>();
-      queryResponses.entrySet().forEach(entry -> serverResponseMap.put(
-          new ServerInstance(new InstanceConfig(
-              String.format("Server_%s_%d", entry.getKey().getHostname(), entry.getKey().getPort()))),
+      queryResponses.entrySet().forEach(entry -> serverResponseMap.put(new ServerInstance(
+              new InstanceConfig(String.format("Server_%s_%d", entry.getKey().getHostname(),
+                  entry.getKey().getPort()))),
           entry.getValue().getDataTable()));
       return serverResponseMap;
     } catch (InterruptedException e) {
diff --git a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java
index cbdb820..a5658bb 100644
--- a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java
+++ b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java
@@ -16,19 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.pinot.connector.presto.grpc;
 
 import java.util.HashMap;
diff --git a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/plugin/metrics/NoopPinotMetricFactory.java b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/plugin/metrics/NoopPinotMetricFactory.java
new file mode 100644
index 0000000..b789913
--- /dev/null
+++ b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/plugin/metrics/NoopPinotMetricFactory.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.presto.plugin.metrics;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.pinot.spi.annotations.metrics.MetricsFactory;
+import org.apache.pinot.spi.annotations.metrics.PinotMetricsFactory;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotCounter;
+import org.apache.pinot.spi.metrics.PinotGauge;
+import org.apache.pinot.spi.metrics.PinotHistogram;
+import org.apache.pinot.spi.metrics.PinotJmxReporter;
+import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.metrics.PinotMetric;
+import org.apache.pinot.spi.metrics.PinotMetricName;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistryListener;
+import org.apache.pinot.spi.metrics.PinotTimer;
+
+
+/**
+ * This package name has to match the regex pattern: ".*\\.plugin\\.metrics\\..*"
+ */
+@MetricsFactory
+public class NoopPinotMetricFactory implements PinotMetricsFactory {
+  private PinotMetricsRegistry _pinotMetricsRegistry;
+
+  @Override
+  public void init(PinotConfiguration pinotConfiguration) {
+  }
+
+  @Override
+  public PinotMetricsRegistry getPinotMetricsRegistry() {
+    if (_pinotMetricsRegistry == null) {
+      _pinotMetricsRegistry = new NoopPinotMetricsRegistry();
+    }
+    return _pinotMetricsRegistry;
+  }
+
+  @Override
+  public PinotMetricName makePinotMetricName(Class<?> aClass, String s) {
+    return new NoopPinotMetricName();
+  }
+
+  @Override
+  public <T> PinotGauge<T> makePinotGauge(Function<Void, T> function) {
+    return new NoopPinotGauge<T>();
+  }
+
+  @Override
+  public PinotJmxReporter makePinotJmxReporter(PinotMetricsRegistry pinotMetricsRegistry) {
+    return new NoopPinotJmxReporter();
+  }
+
+  @Override
+  public String getMetricsFactoryName() {
+    return "noop";
+  }
+
+  public static class NoopPinotMetricsRegistry implements PinotMetricsRegistry {
+    @Override
+    public void removeMetric(PinotMetricName pinotMetricName) {
+    }
+
+    @Override
+    public <T> PinotGauge<T> newGauge(PinotMetricName pinotMetricName, PinotGauge<T> pinotGauge) {
+      return new NoopPinotGauge<T>();
+    }
+
+    @Override
+    public PinotMeter newMeter(PinotMetricName pinotMetricName, String s, TimeUnit timeUnit) {
+      return new NoopPinotMeter();
+    }
+
+    @Override
+    public PinotCounter newCounter(PinotMetricName pinotMetricName) {
+      return new NoopPinotCounter();
+    }
+
+    @Override
+    public PinotTimer newTimer(PinotMetricName pinotMetricName, TimeUnit timeUnit, TimeUnit timeUnit1) {
+      return new NoopPinotTimer();
+    }
+
+    @Override
+    public PinotHistogram newHistogram(PinotMetricName pinotMetricName, boolean b) {
+      return new NoopPinotHistogram();
+    }
+
+    @Override
+    public Map<PinotMetricName, PinotMetric> allMetrics() {
+      return ImmutableMap.of();
+    }
+
+    @Override
+    public void addListener(PinotMetricsRegistryListener pinotMetricsRegistryListener) {
+    }
+
+    @Override
+    public Object getMetricsRegistry() {
+      return this;
+    }
+
+    @Override
+    public void shutdown() {
+    }
+  }
+
+  private static class NoopPinotJmxReporter implements PinotJmxReporter {
+    @Override
+    public void start() {
+    }
+  }
+
+  private static class NoopPinotMeter implements PinotMeter {
+    @Override
+    public void mark() {
+    }
+
+    @Override
+    public void mark(long l) {
+    }
+
+    @Override
+    public Object getMetered() {
+      return null;
+    }
+
+    @Override
+    public TimeUnit rateUnit() {
+      return null;
+    }
+
+    @Override
+    public String eventType() {
+      return null;
+    }
+
+    @Override
+    public long count() {
+      return 0;
+    }
+
+    @Override
+    public double fifteenMinuteRate() {
+      return 0;
+    }
+
+    @Override
+    public double fiveMinuteRate() {
+      return 0;
+    }
+
+    @Override
+    public double meanRate() {
+      return 0;
+    }
+
+    @Override
+    public double oneMinuteRate() {
+      return 0;
+    }
+
+    @Override
+    public Object getMetric() {
+      return null;
+    }
+  }
+
+  private static class NoopPinotMetricName implements PinotMetricName {
+    @Override
+    public Object getMetricName() {
+      return null;
+    }
+  }
+
+  private static class NoopPinotGauge<T> implements PinotGauge<T> {
+    @Override
+    public Object getGauge() {
+      return null;
+    }
+
+    @Override
+    public T value() {
+      return null;
+    }
+
+    @Override
+    public Object getMetric() {
+      return null;
+    }
+  }
+
+  private static class NoopPinotCounter implements PinotCounter {
+    @Override
+    public Object getCounter() {
+      return null;
+    }
+
+    @Override
+    public Object getMetric() {
+      return null;
+    }
+  }
+
+  private static class NoopPinotTimer implements PinotTimer {
+    @Override
+    public Object getMetered() {
+      return null;
+    }
+
+    @Override
+    public TimeUnit rateUnit() {
+      return null;
+    }
+
+    @Override
+    public String eventType() {
+      return null;
+    }
+
+    @Override
+    public long count() {
+      return 0;
+    }
+
+    @Override
+    public double fifteenMinuteRate() {
+      return 0;
+    }
+
+    @Override
+    public double fiveMinuteRate() {
+      return 0;
+    }
+
+    @Override
+    public double meanRate() {
+      return 0;
+    }
+
+    @Override
+    public double oneMinuteRate() {
+      return 0;
+    }
+
+    @Override
+    public Object getMetric() {
+      return null;
+    }
+
+    @Override
+    public void update(long duration, TimeUnit unit) {
+    }
+
+    @Override
+    public Object getTimer() {
+      return null;
+    }
+  }
+
+  private static class NoopPinotHistogram implements PinotHistogram {
+    @Override
+    public Object getHistogram() {
+      return null;
+    }
+
+    @Override
+    public Object getMetric() {
+      return null;
+    }
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org