You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2022/03/02 07:38:18 UTC
[pinot] branch master updated: Adding NoopPinotMetricFactory and corresponding changes (#8270)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fd9c58a Adding NoopPinotMetricFactory and corresponding changes (#8270)
fd9c58a is described below
commit fd9c58a11ed16d27109baefcee138eea30132ad3
Author: Xiang Fu <xi...@gmail.com>
AuthorDate: Tue Mar 1 23:37:41 2022 -0800
Adding NoopPinotMetricFactory and corresponding changes (#8270)
---
.../presto/PinotScatterGatherQueryClient.java | 49 ++--
.../presto/grpc/PinotStreamingQueryClient.java | 13 -
.../plugin/metrics/NoopPinotMetricFactory.java | 292 +++++++++++++++++++++
3 files changed, 317 insertions(+), 37 deletions(-)
diff --git a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/PinotScatterGatherQueryClient.java b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/PinotScatterGatherQueryClient.java
index 8846be4..6cb45ec 100644
--- a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/PinotScatterGatherQueryClient.java
+++ b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/PinotScatterGatherQueryClient.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.connector.presto;
+import com.google.common.collect.ImmutableMap;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
@@ -35,15 +36,19 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.PinotMetricUtils;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.connector.presto.plugin.metrics.NoopPinotMetricFactory;
import org.apache.pinot.core.transport.AsyncQueryResponse;
import org.apache.pinot.core.transport.QueryRouter;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerResponse;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import static org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_METRICS_FACTORY_CLASS_NAME;
+
public class PinotScatterGatherQueryClient {
private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler();
@@ -72,8 +77,7 @@ public class PinotScatterGatherQueryClient {
}
}
- public static class PinotException
- extends RuntimeException {
+ public static class PinotException extends RuntimeException {
private final ErrorCode _errorCode;
public PinotException(ErrorCode errorCode, String message, Throwable t) {
@@ -188,6 +192,8 @@ public class PinotScatterGatherQueryClient {
public PinotScatterGatherQueryClient(Config pinotConfig) {
_prestoHostId = getDefaultPrestoId();
+ PinotMetricUtils.init(new PinotConfiguration(
+ ImmutableMap.of(CONFIG_OF_METRICS_FACTORY_CLASS_NAME, NoopPinotMetricFactory.class.getName())));
_brokerMetrics = new BrokerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
_brokerMetrics.initializeGlobalMeters();
TlsConfig tlsConfig = getTlsConfig(pinotConfig);
@@ -240,13 +246,8 @@ public class PinotScatterGatherQueryClient {
return defaultBrokerId;
}
- public Map<ServerInstance, DataTable> queryPinotServerForDataTable(
- String pql,
- String serverHost,
- List<String> segments,
- long connectionTimeoutInMillis,
- boolean ignoreEmptyResponses,
- int pinotRetryCount) {
+ public Map<ServerInstance, DataTable> queryPinotServerForDataTable(String pql, String serverHost,
+ List<String> segments, long connectionTimeoutInMillis, boolean ignoreEmptyResponses, int pinotRetryCount) {
BrokerRequest brokerRequest;
try {
brokerRequest = REQUEST_COMPILER.compileToBrokerRequest(pql);
@@ -260,7 +261,7 @@ public class PinotScatterGatherQueryClient {
new ArrayList<>(segments));
// Unfortunately the retries will all hit the same server because the routing decision has already been made by
- // the pinot broker
+ // the pinot broker
Map<ServerInstance, DataTable> serverResponseMap = doWithRetries(pinotRetryCount, (requestId) -> {
String rawTableName = TableNameBuilder.extractRawTableName(brokerRequest.getQuerySource().getTableName());
if (!_concurrentQueriesCountMap.containsKey(serverHost)) {
@@ -276,17 +277,17 @@ public class PinotScatterGatherQueryClient {
QueryRouter nextAvailableQueryRouter = getNextAvailableQueryRouter();
if (TableNameBuilder.getTableTypeFromTableName(brokerRequest.getQuerySource().getTableName())
== TableType.REALTIME) {
- asyncQueryResponse = nextAvailableQueryRouter.submitQuery(requestId, rawTableName, null, null, brokerRequest,
- routingTable, connectionTimeoutInMillis);
+ asyncQueryResponse =
+ nextAvailableQueryRouter.submitQuery(requestId, rawTableName, null, null, brokerRequest, routingTable,
+ connectionTimeoutInMillis);
} else {
- asyncQueryResponse = nextAvailableQueryRouter
- .submitQuery(requestId, rawTableName, brokerRequest, routingTable, null, null, connectionTimeoutInMillis);
+ asyncQueryResponse =
+ nextAvailableQueryRouter.submitQuery(requestId, rawTableName, brokerRequest, routingTable, null, null,
+ connectionTimeoutInMillis);
}
- Map<ServerInstance, DataTable> serverInstanceDataTableMap = gatherServerResponses(
- ignoreEmptyResponses,
- routingTable,
- asyncQueryResponse,
- brokerRequest.getQuerySource().getTableName());
+ Map<ServerInstance, DataTable> serverInstanceDataTableMap =
+ gatherServerResponses(ignoreEmptyResponses, routingTable, asyncQueryResponse,
+ brokerRequest.getQuerySource().getTableName());
_queryRouters.offer(nextAvailableQueryRouter);
_concurrentQueriesCountMap.get(serverHost).decrementAndGet();
return serverInstanceDataTableMap;
@@ -320,15 +321,15 @@ public class PinotScatterGatherQueryClient {
: entry.getValue().toString();
routingTableForLogging.put(entry.getKey().toString(), valueToPrint);
});
- throw new PinotException(ErrorCode.PINOT_INSUFFICIENT_SERVER_RESPONSE, String
- .format("%d of %d servers responded with routing table servers: %s, query stats: %s",
+ throw new PinotException(ErrorCode.PINOT_INSUFFICIENT_SERVER_RESPONSE,
+ String.format("%d of %d servers responded with routing table servers: %s, query stats: %s",
queryResponses.size(), routingTable.size(), routingTableForLogging, asyncQueryResponse.getStats()));
}
}
Map<ServerInstance, DataTable> serverResponseMap = new HashMap<>();
- queryResponses.entrySet().forEach(entry -> serverResponseMap.put(
- new ServerInstance(new InstanceConfig(
- String.format("Server_%s_%d", entry.getKey().getHostname(), entry.getKey().getPort()))),
+ queryResponses.entrySet().forEach(entry -> serverResponseMap.put(new ServerInstance(
+ new InstanceConfig(String.format("Server_%s_%d", entry.getKey().getHostname(),
+ entry.getKey().getPort()))),
entry.getValue().getDataTable()));
return serverResponseMap;
} catch (InterruptedException e) {
diff --git a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java
index cbdb820..a5658bb 100644
--- a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java
+++ b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java
@@ -16,19 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package org.apache.pinot.connector.presto.grpc;
import java.util.HashMap;
diff --git a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/plugin/metrics/NoopPinotMetricFactory.java b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/plugin/metrics/NoopPinotMetricFactory.java
new file mode 100644
index 0000000..b789913
--- /dev/null
+++ b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/plugin/metrics/NoopPinotMetricFactory.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.presto.plugin.metrics;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.pinot.spi.annotations.metrics.MetricsFactory;
+import org.apache.pinot.spi.annotations.metrics.PinotMetricsFactory;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotCounter;
+import org.apache.pinot.spi.metrics.PinotGauge;
+import org.apache.pinot.spi.metrics.PinotHistogram;
+import org.apache.pinot.spi.metrics.PinotJmxReporter;
+import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.metrics.PinotMetric;
+import org.apache.pinot.spi.metrics.PinotMetricName;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistryListener;
+import org.apache.pinot.spi.metrics.PinotTimer;
+
+
+/**
+ * This package name has to match the regex pattern: ".*\\.plugin\\.metrics\\..*"
+ */
+@MetricsFactory
+public class NoopPinotMetricFactory implements PinotMetricsFactory {
+ private PinotMetricsRegistry _pinotMetricsRegistry;
+
+ @Override
+ public void init(PinotConfiguration pinotConfiguration) {
+ }
+
+ @Override
+ public PinotMetricsRegistry getPinotMetricsRegistry() {
+ if (_pinotMetricsRegistry == null) {
+ _pinotMetricsRegistry = new NoopPinotMetricsRegistry();
+ }
+ return _pinotMetricsRegistry;
+ }
+
+ @Override
+ public PinotMetricName makePinotMetricName(Class<?> aClass, String s) {
+ return new NoopPinotMetricName();
+ }
+
+ @Override
+ public <T> PinotGauge<T> makePinotGauge(Function<Void, T> function) {
+ return new NoopPinotGauge<T>();
+ }
+
+ @Override
+ public PinotJmxReporter makePinotJmxReporter(PinotMetricsRegistry pinotMetricsRegistry) {
+ return new NoopPinotJmxReporter();
+ }
+
+ @Override
+ public String getMetricsFactoryName() {
+ return "noop";
+ }
+
+ public static class NoopPinotMetricsRegistry implements PinotMetricsRegistry {
+ @Override
+ public void removeMetric(PinotMetricName pinotMetricName) {
+ }
+
+ @Override
+ public <T> PinotGauge<T> newGauge(PinotMetricName pinotMetricName, PinotGauge<T> pinotGauge) {
+ return new NoopPinotGauge<T>();
+ }
+
+ @Override
+ public PinotMeter newMeter(PinotMetricName pinotMetricName, String s, TimeUnit timeUnit) {
+ return new NoopPinotMeter();
+ }
+
+ @Override
+ public PinotCounter newCounter(PinotMetricName pinotMetricName) {
+ return new NoopPinotCounter();
+ }
+
+ @Override
+ public PinotTimer newTimer(PinotMetricName pinotMetricName, TimeUnit timeUnit, TimeUnit timeUnit1) {
+ return new NoopPinotTimer();
+ }
+
+ @Override
+ public PinotHistogram newHistogram(PinotMetricName pinotMetricName, boolean b) {
+ return new NoopPinotHistogram();
+ }
+
+ @Override
+ public Map<PinotMetricName, PinotMetric> allMetrics() {
+ return ImmutableMap.of();
+ }
+
+ @Override
+ public void addListener(PinotMetricsRegistryListener pinotMetricsRegistryListener) {
+ }
+
+ @Override
+ public Object getMetricsRegistry() {
+ return this;
+ }
+
+ @Override
+ public void shutdown() {
+ }
+ }
+
+ private static class NoopPinotJmxReporter implements PinotJmxReporter {
+ @Override
+ public void start() {
+ }
+ }
+
+ private static class NoopPinotMeter implements PinotMeter {
+ @Override
+ public void mark() {
+ }
+
+ @Override
+ public void mark(long l) {
+ }
+
+ @Override
+ public Object getMetered() {
+ return null;
+ }
+
+ @Override
+ public TimeUnit rateUnit() {
+ return null;
+ }
+
+ @Override
+ public String eventType() {
+ return null;
+ }
+
+ @Override
+ public long count() {
+ return 0;
+ }
+
+ @Override
+ public double fifteenMinuteRate() {
+ return 0;
+ }
+
+ @Override
+ public double fiveMinuteRate() {
+ return 0;
+ }
+
+ @Override
+ public double meanRate() {
+ return 0;
+ }
+
+ @Override
+ public double oneMinuteRate() {
+ return 0;
+ }
+
+ @Override
+ public Object getMetric() {
+ return null;
+ }
+ }
+
+ private static class NoopPinotMetricName implements PinotMetricName {
+ @Override
+ public Object getMetricName() {
+ return null;
+ }
+ }
+
+ private static class NoopPinotGauge<T> implements PinotGauge<T> {
+ @Override
+ public Object getGauge() {
+ return null;
+ }
+
+ @Override
+ public T value() {
+ return null;
+ }
+
+ @Override
+ public Object getMetric() {
+ return null;
+ }
+ }
+
+ private static class NoopPinotCounter implements PinotCounter {
+ @Override
+ public Object getCounter() {
+ return null;
+ }
+
+ @Override
+ public Object getMetric() {
+ return null;
+ }
+ }
+
+ private static class NoopPinotTimer implements PinotTimer {
+ @Override
+ public Object getMetered() {
+ return null;
+ }
+
+ @Override
+ public TimeUnit rateUnit() {
+ return null;
+ }
+
+ @Override
+ public String eventType() {
+ return null;
+ }
+
+ @Override
+ public long count() {
+ return 0;
+ }
+
+ @Override
+ public double fifteenMinuteRate() {
+ return 0;
+ }
+
+ @Override
+ public double fiveMinuteRate() {
+ return 0;
+ }
+
+ @Override
+ public double meanRate() {
+ return 0;
+ }
+
+ @Override
+ public double oneMinuteRate() {
+ return 0;
+ }
+
+ @Override
+ public Object getMetric() {
+ return null;
+ }
+
+ @Override
+ public void update(long duration, TimeUnit unit) {
+ }
+
+ @Override
+ public Object getTimer() {
+ return null;
+ }
+ }
+
+ private static class NoopPinotHistogram implements PinotHistogram {
+ @Override
+ public Object getHistogram() {
+ return null;
+ }
+
+ @Override
+ public Object getMetric() {
+ return null;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org