You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/10/04 00:23:21 UTC
[pulsar] branch master updated: adding metrics to presto pulsar
connector (#2631)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 474645f adding metrics to presto pulsar connector (#2631)
474645f is described below
commit 474645f7b4771e3316ca183ed0a813dd98a25758
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed Oct 3 17:23:17 2018 -0700
adding metrics to presto pulsar connector (#2631)
* adding metrics to presto pulsar connector
* rename batch size
* adding comments
* refactoring metrics
* modifying bytes read metric
* fixing tests
* deleting tmp file
* adding jars to LICENSE
---
conf/presto/catalog/pulsar.properties | 2 +-
distribution/server/src/assemble/LICENSE.bin.txt | 5 +
managed-ledger/pom.xml | 12 +
pulsar-sql/presto-distribution/LICENSE | 9 +
.../pulsar/sql/presto/PulsarConnectorCache.java | 17 ++
.../pulsar/sql/presto/PulsarConnectorConfig.java | 37 ++-
.../sql/presto/PulsarConnectorMetricsTracker.java | 321 +++++++++++++++++++++
.../pulsar/sql/presto/PulsarConnectorUtils.java | 39 +++
.../pulsar/sql/presto/PulsarRecordCursor.java | 111 +++++--
.../pulsar/sql/presto/TestPulsarConnector.java | 7 +-
10 files changed, 529 insertions(+), 31 deletions(-)
diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties
index 5f922e5..0693077 100644
--- a/conf/presto/catalog/pulsar.properties
+++ b/conf/presto/catalog/pulsar.properties
@@ -24,7 +24,7 @@ pulsar.broker-service-url=http://localhost:8080
# URI of Zookeeper cluster
pulsar.zookeeper-uri=localhost:2181
# minimum number of entries to read at a single time
-pulsar.entry-read-batch-size=100
+pulsar.max-entry-read-batch-size=100
# default number of splits to use per query
pulsar.target-num-splits=2
# max message queue size
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 2ce32cb..a35a4c7 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -387,6 +387,7 @@ The Apache Software License, Version 2.0
- org.apache.distributedlog-distributedlog-core-4.7.2-tests.jar
- org.apache.distributedlog-distributedlog-core-4.7.2.jar
- org.apache.distributedlog-distributedlog-protocol-4.7.2.jar
+ - org.apache.bookkeeper.stats-codahale-metrics-provider-4.7.2.jar
* LZ4 -- net.jpountz.lz4-lz4-1.3.0.jar
* AsyncHttpClient
- org.asynchttpclient-async-http-client-2.1.0-alpha26.jar
@@ -459,6 +460,10 @@ The Apache Software License, Version 2.0
- io.kubernetes-client-java-proto-2.0.0.jar
* Joda Time
- joda-time-joda-time-2.9.3.jar
+ * Dropwizard
+ - io.dropwizard.metrics-metrics-core-3.1.0.jar
+ - io.dropwizard.metrics-metrics-graphite-3.1.0.jar
+ - io.dropwizard.metrics-metrics-jvm-3.1.0.jar
BSD 3-clause "New" or "Revised" License
diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index 4b1c04d..4910a62 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -39,6 +39,18 @@
</dependency>
<dependency>
+ <groupId>org.apache.bookkeeper.stats</groupId>
+ <artifactId>prometheus-metrics-provider</artifactId>
+ <version>${bookkeeper.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.bookkeeper.stats</groupId>
+ <artifactId>codahale-metrics-provider</artifactId>
+ <version>${bookkeeper.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf3.version}</version>
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index fb093d6..500ec9f 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -382,6 +382,15 @@ The Apache Software License, Version 2.0
- validation-api-1.1.0.Final.jar
* Objectsize
- objectsize-0.0.12.jar
+ * Dropwizard Metrics
+ - metrics-core-3.1.0.jar
+ - metrics-graphite-3.1.0.jar
+ - metrics-jvm-3.1.0.jar
+ * Prometheus
+ - simpleclient-0.0.23.jar
+ - simpleclient_common-0.0.23.jar
+ - simpleclient_hotspot-0.0.23.jar
+ - simpleclient_servlet-0.0.23.jar
Protocol Buffers License
* Protocol Buffers
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index d13ddcd..d1b24a2 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -22,6 +22,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsProvider;
public class PulsarConnectorCache {
@@ -29,8 +30,19 @@ public class PulsarConnectorCache {
private final ManagedLedgerFactory managedLedgerFactory;
+ private final StatsProvider statsProvider;
+
private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig);
+ this.statsProvider = PulsarConnectorUtils.createInstance(pulsarConnectorConfig.getStatsProvider(),
+ StatsProvider.class, getClass().getClassLoader());
+
+ // start stats provider
+ ClientConfiguration clientConfiguration = new ClientConfiguration();
+
+ pulsarConnectorConfig.getStatsProviderConfigs().forEach((key, value) -> clientConfiguration.setProperty(key, value));
+
+ this.statsProvider.start(clientConfiguration);
}
public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
@@ -55,9 +67,14 @@ public class PulsarConnectorCache {
return managedLedgerFactory;
}
+ public StatsProvider getStatsProvider() {
+ return statsProvider;
+ }
+
public static void shutdown() throws ManagedLedgerException, InterruptedException {
if (instance != null) {
instance.managedLedgerFactory.shutdown();
+ instance.statsProvider.stop();
instance = null;
}
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index 482fab3..a1df7c7 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -18,11 +18,17 @@
*/
package org.apache.pulsar.sql.presto;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import io.airlift.configuration.Config;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;
import javax.validation.constraints.NotNull;
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
public class PulsarConnectorConfig implements AutoCloseable {
@@ -32,6 +38,8 @@ public class PulsarConnectorConfig implements AutoCloseable {
private int targetNumSplits = 2;
private int maxSplitMessageQueueSize = 10000;
private int maxSplitEntryQueueSize = 1000;
+ private String statsProvider = NullStatsProvider.class.getName();
+ private Map<String, String> statsProviderConfigs = new HashMap<>();
private PulsarAdmin pulsarAdmin;
@NotNull
@@ -57,12 +65,12 @@ public class PulsarConnectorConfig implements AutoCloseable {
}
@NotNull
- public int getEntryReadBatchSize() {
+ public int getMaxEntryReadBatchSize() {
return this.entryReadBatchSize;
}
- @Config("pulsar.entry-read-batch-size")
- public PulsarConnectorConfig setEntryReadBatchSize(int batchSize) {
+ @Config("pulsar.max-entry-read-batch-size")
+ public PulsarConnectorConfig setMaxEntryReadBatchSize(int batchSize) {
this.entryReadBatchSize = batchSize;
return this;
}
@@ -101,6 +109,29 @@ public class PulsarConnectorConfig implements AutoCloseable {
}
@NotNull
+ public String getStatsProvider() {
+ return statsProvider;
+ }
+
+ @Config("pulsar.stats-provider")
+ public PulsarConnectorConfig setStatsProvider(String statsProvider) {
+ this.statsProvider = statsProvider;
+ return this;
+ }
+
+ @NotNull
+ public Map<String, String> getStatsProviderConfigs() {
+ return statsProviderConfigs;
+ }
+
+ @Config("pulsar.stats-provider-configs")
+ public PulsarConnectorConfig setStatsProviderConfigs(String statsProviderConfigs) {
+ Type type = new TypeToken<Map<String, String>>(){}.getType();
+ this.statsProviderConfigs = new Gson().fromJson(statsProviderConfigs, type);
+ return this;
+ }
+
+ @NotNull
public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
if (this.pulsarAdmin == null) {
this.pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(getBrokerServiceUrl()).build();
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java
new file mode 100644
index 0000000..9ff337e
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsLogger;
+
+import java.util.concurrent.TimeUnit;
+
+public class PulsarConnectorMetricsTracker implements AutoCloseable{
+
+ private StatsLogger statsLogger;
+
+ private static final String SCOPE = "split";
+
+ /** metric names **/
+
+ // time spend waiting to get entry from entry queue because it is empty
+ private static final String ENTRY_QUEUE_DEQUEUE_WAIT_TIME = "entry-queue-dequeue-wait-time";
+
+ // total time spend waiting to get entry from entry queue per query
+ private static final String ENTRY_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY = "entry-queue-dequeue-wait-time-per-query";
+
+ // number of bytes read from bookkeeper
+ private static final String BYTES_READ = "bytes-read";
+
+ // total number of bytes read per query
+ private static final String BYTES_READ_PER_QUERY = "bytes-read-per-query";
+
+ // time spent derserializing entries
+ private static final String ENTRY_DESERIALIZE_TIME = "entry-deserialize-time";
+
+ // time spent derserializing entries per query
+ private static final String ENTRY_DESERIALIZE_TIME_PER_QUERY = "entry-deserialize-time_per_query";
+
+ // time spent waiting for message queue enqueue because message queue is full
+ private static final String MESSAGE_QUEUE_ENQUEUE_WAIT_TIME = "message-queue-enqueue-wait-time";
+
+ // time spent waiting for message queue enqueue because message queue is full per query
+ private static final String MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_PER_QUERY = "message-queue-enqueue-wait-time-per-query";
+
+ private static final String NUM_MESSAGES_DERSERIALIZED = "num-messages-deserialized";
+
+ // number of messages deserialized
+ public static final String NUM_MESSAGES_DERSERIALIZED_PER_ENTRY = "num-messages-deserialized-per-entry";
+
+ // number of messages deserialized per query
+ public static final String NUM_MESSAGES_DERSERIALIZED_PER_QUERY = "num-messages-deserialized-per-query";
+
+ // number of read attempts. Will fail if queues are full
+ public static final String READ_ATTEMTPS = "read-attempts";
+
+ // number of read attempts per query
+ public static final String READ_ATTEMTPS_PER_QUERY= "read-attempts-per-query";
+
+ // latency of reads per batch
+ public static final String READ_LATENCY_PER_BATCH = "read-latency-per-batch";
+
+ // total read latency per query
+ public static final String READ_LATENCY_PER_QUERY = "read-latency-per-query";
+
+ // number of entries per batch
+ public static final String NUM_ENTRIES_PER_BATCH = "num-entries-per-batch";
+
+ // number of entries per query
+ public static final String NUM_ENTRIES_PER_QUERY = "num-entries-per-query";
+
+ // time spent waiting to dequeue from message queue because its empty
+ public static final String MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY = "message-queue-dequeue-wait-time-per-query";
+
+ // time spent deserializing message to record e.g. avro, json, etc
+ public static final String RECORD_DESERIALIZE_TIME = "record-deserialize-time";
+
+ // time spent deserializing message to record per query
+ private static final String RECORD_DESERIALIZE_TIME_PER_QUERY = "record-deserialize-time-per-query";
+
+ private static final String NUM_RECORD_DESERIALIZED = "num-record-deserialized";
+
+ private static final String TOTAL_EXECUTION_TIME = "total-execution-time";
+
+ /** internal tracking variables **/
+ private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime;
+ private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum = 0L;
+ private long BYTES_READ_sum = 0L;
+ private long ENTRY_DESERIALIZE_TIME_startTime;
+ private long ENTRY_DESERIALIZE_TIME_sum = 0L;
+ private long MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime;
+ private long MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum = 0L;
+ private long NUM_MESSAGES_DERSERIALIZED_sum = 0L;
+ private long NUM_MESSAGED_DERSERIALIZED_PER_BATCH = 0L;
+ private long READ_ATTEMTPS_SUCCESS_sum = 0L;
+ private long READ_ATTEMTPS_FAIL_sum = 0L;
+ private long READ_LATENCY_SUCCESS_sum = 0L;
+ private long READ_LATENCY_FAIL_sum = 0L;
+ private long NUM_ENTRIES_PER_BATCH_sum = 0L;
+ private long MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum = 0L;
+ private long RECORD_DESERIALIZE_TIME_startTime;
+ private long RECORD_DESERIALIZE_TIME_sum = 0L;
+
+ public PulsarConnectorMetricsTracker(StatsProvider statsProvider) {
+ this.statsLogger = statsProvider instanceof NullStatsProvider
+ ? null : statsProvider.getStatsLogger(SCOPE);
+ }
+
+ public void start_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() {
+ if (statsLogger != null) {
+ ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime = System.nanoTime();
+ }
+ }
+
+ public void end_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() {
+ if (statsLogger != null) {
+ long time = System.nanoTime() - ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime;
+ ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum += time;
+ statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME)
+ .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+ }
+ }
+
+ public void register_BYTES_READ(long bytes) {
+ if (statsLogger != null) {
+ BYTES_READ_sum += bytes;
+ statsLogger.getCounter(BYTES_READ).add(bytes);
+ }
+ }
+
+ public void start_ENTRY_DESERIALIZE_TIME() {
+ if (statsLogger != null) {
+ ENTRY_DESERIALIZE_TIME_startTime = System.nanoTime();
+ }
+ }
+
+ public void end_ENTRY_DESERIALIZE_TIME() {
+ if (statsLogger != null) {
+ long time = System.nanoTime() - ENTRY_DESERIALIZE_TIME_startTime;
+ ENTRY_DESERIALIZE_TIME_sum += time;
+ statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME)
+ .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+ }
+ }
+
+ public void start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME() {
+ if (statsLogger != null) {
+ MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime = System.nanoTime();
+ }
+ }
+
+ public void end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME() {
+ if (statsLogger != null) {
+ long time = System.nanoTime() - MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime;
+ MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum += time;
+ statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME)
+ .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+ }
+ }
+
+ public void incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() {
+ if (statsLogger != null) {
+ NUM_MESSAGED_DERSERIALIZED_PER_BATCH++;
+ statsLogger.getCounter(NUM_MESSAGES_DERSERIALIZED).add(1);
+ }
+ }
+
+ public void end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() {
+ if (statsLogger != null) {
+ NUM_MESSAGES_DERSERIALIZED_sum += NUM_MESSAGED_DERSERIALIZED_PER_BATCH;
+
+ statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_ENTRY)
+ .registerSuccessfulValue(NUM_MESSAGED_DERSERIALIZED_PER_BATCH);
+
+ NUM_MESSAGED_DERSERIALIZED_PER_BATCH = 0L;
+ }
+ }
+
+ public void incr_READ_ATTEMPTS_SUCCESS() {
+ if (statsLogger != null) {
+ READ_ATTEMTPS_SUCCESS_sum++;
+ statsLogger.getOpStatsLogger(READ_ATTEMTPS)
+ .registerSuccessfulValue(1L);
+ }
+ }
+
+ public void incr_READ_ATTEMPTS_FAIL() {
+ if (statsLogger != null) {
+ READ_ATTEMTPS_FAIL_sum++;
+ statsLogger.getOpStatsLogger(READ_ATTEMTPS)
+ .registerFailedValue(1L);
+ }
+ }
+
+ public void register_READ_LATENCY_PER_BATCH_SUCCESS(long latency) {
+ if (statsLogger != null) {
+ READ_LATENCY_SUCCESS_sum += latency;
+ statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH)
+ .registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
+ }
+ }
+
+ public void register_READ_LATENCY_PER_BATCH_FAIL(long latency) {
+ if (statsLogger != null) {
+ READ_LATENCY_FAIL_sum += latency;
+ statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH)
+ .registerFailedEvent(latency, TimeUnit.NANOSECONDS);
+ }
+ }
+
+ public void incr_NUM_ENTRIES_PER_BATCH_SUCCESS(long delta) {
+ if (statsLogger != null) {
+ NUM_ENTRIES_PER_BATCH_sum += delta;
+ statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH)
+ .registerSuccessfulValue(delta);
+ }
+ }
+
+ public void incr_NUM_ENTRIES_PER_BATCH_FAIL(long delta) {
+ if (statsLogger != null) {
+ statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH)
+ .registerFailedValue(delta);
+ }
+ }
+
+ public void register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(long latency) {
+ if (statsLogger != null) {
+ MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum += latency;
+ }
+ }
+
+ public void start_RECORD_DESERIALIZE_TIME() {
+ if (statsLogger != null) {
+ RECORD_DESERIALIZE_TIME_startTime = System.nanoTime();
+ }
+ }
+
+ public void end_RECORD_DESERIALIZE_TIME() {
+ if (statsLogger != null) {
+ long time = System.nanoTime() - RECORD_DESERIALIZE_TIME_startTime;
+ RECORD_DESERIALIZE_TIME_sum += time;
+ statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME)
+ .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+ }
+ }
+
+ public void incr_NUM_RECORD_DESERIALIZED() {
+ if (statsLogger != null) {
+ statsLogger.getCounter(NUM_RECORD_DESERIALIZED).add(1);
+ }
+ }
+
+ public void register_TOTAL_EXECUTION_TIME(long latency) {
+ if (statsLogger != null) {
+ statsLogger.getOpStatsLogger(TOTAL_EXECUTION_TIME)
+ .registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (statsLogger != null) {
+ // register total entry dequeue wait time for query
+ statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY)
+ .registerSuccessfulEvent(ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum, TimeUnit.NANOSECONDS);
+
+ //register bytes read per query
+ statsLogger.getOpStatsLogger(BYTES_READ_PER_QUERY)
+ .registerSuccessfulValue(BYTES_READ_sum);
+
+ // register total time spent deserializing entries for query
+ statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME_PER_QUERY)
+ .registerSuccessfulEvent(ENTRY_DESERIALIZE_TIME_sum, TimeUnit.NANOSECONDS);
+
+ // register time spent waiting for message queue enqueue because message queue is full per query
+ statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_PER_QUERY)
+ .registerSuccessfulEvent(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum, TimeUnit.NANOSECONDS);
+
+ // register number of messages deserialized per query
+ statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_QUERY)
+ .registerSuccessfulValue(NUM_MESSAGES_DERSERIALIZED_sum);
+
+ // register number of read attempts per query
+ statsLogger.getOpStatsLogger(READ_ATTEMTPS_PER_QUERY)
+ .registerSuccessfulValue(READ_ATTEMTPS_SUCCESS_sum);
+ statsLogger.getOpStatsLogger(READ_ATTEMTPS_PER_QUERY)
+ .registerFailedValue(READ_ATTEMTPS_FAIL_sum);
+
+ // register total read latency for query
+ statsLogger.getOpStatsLogger(READ_LATENCY_PER_QUERY)
+ .registerSuccessfulEvent(READ_LATENCY_SUCCESS_sum, TimeUnit.NANOSECONDS);
+ statsLogger.getOpStatsLogger(READ_LATENCY_PER_QUERY)
+ .registerFailedEvent(READ_LATENCY_FAIL_sum, TimeUnit.NANOSECONDS);
+
+ // register number of entries per query
+ statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_QUERY)
+ .registerSuccessfulValue(NUM_ENTRIES_PER_BATCH_sum);
+
+ // register time spent waiting to read for message queue per query
+ statsLogger.getOpStatsLogger(MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY)
+ .registerSuccessfulEvent(MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum, TimeUnit.MILLISECONDS);
+
+ // register time spent deserializing records per query
+ statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME_PER_QUERY)
+ .registerSuccessfulEvent(RECORD_DESERIALIZE_TIME_sum, TimeUnit.NANOSECONDS);
+ }
+ }
+}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
index e537574..388df0e 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
@@ -24,6 +24,9 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
public class PulsarConnectorUtils {
public static Schema parseSchema(String schemaJson) {
@@ -39,4 +42,40 @@ public class PulsarConnectorUtils {
+ ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
}
+
+ /**
+ * Create an instance of <code>userClassName</code> using provided <code>classLoader</code>.
+ * This instance should implement the provided interface <code>xface</code>.
+ *
+ * @param userClassName user class name
+ * @param xface the interface that the reflected instance should implement
+ * @param classLoader class loader to load the class.
+ * @return the instance
+ */
+ public static <T> T createInstance(String userClassName,
+ Class<T> xface,
+ ClassLoader classLoader) {
+ Class<?> theCls;
+ try {
+ theCls = Class.forName(userClassName, true, classLoader);
+ } catch (ClassNotFoundException cnfe) {
+ throw new RuntimeException("User class must be in class path", cnfe);
+ }
+ if (!xface.isAssignableFrom(theCls)) {
+ throw new RuntimeException(userClassName + " not " + xface.getName());
+ }
+ Class<T> tCls = (Class<T>) theCls.asSubclass(xface);
+ try {
+ Constructor<T> meth = tCls.getDeclaredConstructor();
+ return meth.newInstance();
+ } catch (InstantiationException ie) {
+ throw new RuntimeException("User class must be concrete", ie);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("User class must have a no-arg constructor", e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("User class must a public constructor", e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException("User class constructor throws exception", e);
+ }
+ }
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index c8106aa..4088868 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -35,14 +35,12 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.MessageParser;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaType;
-import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
import java.io.IOException;
import java.util.List;
@@ -77,24 +75,22 @@ public class PulsarRecordCursor implements RecordCursor {
private Message currentMessage;
private Map<String, PulsarInternalColumn> internalColumnMap = PulsarInternalColumn.getInternalFieldsMap();
private SchemaHandler schemaHandler;
- private int batchSize;
+ private int maxBatchSize;
private AtomicLong completedBytes = new AtomicLong(0L);
private ReadEntries readEntries;
private DeserializeEntries deserializeEntries;
private TopicName topicName;
+ private PulsarConnectorMetricsTracker metricsTracker;
- private static final Logger log = Logger.get(PulsarRecordCursor.class);
+ // Stats total execution time of split
+ private long startTime;
- private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
- ClientConfiguration bkClientConfiguration = new ClientConfiguration()
- .setZkServers(pulsarConnectorConfig.getZookeeperUri())
- .setAllowShadedLedgerManagerFactoryClass(true)
- .setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.");
- return new ManagedLedgerFactoryImpl(bkClientConfiguration);
- }
+ private static final Logger log = Logger.get(PulsarRecordCursor.class);
public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit,
PulsarConnectorConfig pulsarConnectorConfig) {
+ // Set start time for split
+ this.startTime = System.nanoTime();
PulsarConnectorCache pulsarConnectorCache;
try {
pulsarConnectorCache = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig);
@@ -104,26 +100,29 @@ public class PulsarRecordCursor implements RecordCursor {
throw new RuntimeException(e);
}
initialize(columnHandles, pulsarSplit, pulsarConnectorConfig,
- pulsarConnectorCache.getManagedLedgerFactory());
+ pulsarConnectorCache.getManagedLedgerFactory(),
+ new PulsarConnectorMetricsTracker(pulsarConnectorCache.getStatsProvider()));
}
// Exposed for testing purposes
PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig
- pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory) {
- initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory);
+ pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
+ initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, pulsarConnectorMetricsTracker);
}
private void initialize(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig
- pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory) {
+ pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory,
+ PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
this.columnHandles = columnHandles;
this.pulsarSplit = pulsarSplit;
this.pulsarConnectorConfig = pulsarConnectorConfig;
- this.batchSize = pulsarConnectorConfig.getEntryReadBatchSize();
+ this.maxBatchSize = pulsarConnectorConfig.getMaxEntryReadBatchSize();
this.messageQueue = new ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
this.entryQueue = new ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitEntryQueueSize());
this.topicName = TopicName.get("persistent",
NamespaceName.get(pulsarSplit.getSchemaName()),
pulsarSplit.getTableName());
+ this.metricsTracker = pulsarConnectorMetricsTracker;
Schema schema = PulsarConnectorUtils.parseSchema(pulsarSplit.getSchema());
@@ -191,7 +190,7 @@ public class PulsarRecordCursor implements RecordCursor {
private final Thread thread;
public DeserializeEntries() {
- this.thread = new Thread(this);
+ this.thread = new Thread(this, "derserialize-thread-split-" + pulsarSplit.getSplitId());
}
public void interrupt() {
@@ -209,19 +208,42 @@ public class PulsarRecordCursor implements RecordCursor {
while (isRunning.get()) {
Entry entry;
try {
+ // start time for entry queue read
+ metricsTracker.start_ENTRY_QUEUE_DEQUEUE_WAIT_TIME();
+ // read from entry queue and block if empty
entry = entryQueue.take();
+ // record entry queue wait time stats
+ metricsTracker.end_ENTRY_QUEUE_DEQUEUE_WAIT_TIME();
} catch (InterruptedException e) {
break;
}
try {
- completedBytes.addAndGet(entry.getDataBuffer().readableBytes());
+ long bytes = entry.getDataBuffer().readableBytes();
+ completedBytes.addAndGet(bytes);
+ // register stats for bytes read
+ metricsTracker.register_BYTES_READ(bytes);
+
+ // set start time for time deserializing entries for stats
+ metricsTracker.start_ENTRY_DESERIALIZE_TIME();
+
// filter entries that is not part of my split
if (((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) < 0) {
try {
MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(),
entry.getDataBuffer(), (messageId, message, byteBuf) -> {
try {
+
+ // start time for message queue read
+ metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
+
+ // enqueue deserialize message from this entry
messageQueue.put(message);
+
+ // stats for how long a read from message queue took
+ metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
+ // stats for number of messages read
+ metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
+
} catch (InterruptedException e) {
//no-op
}
@@ -230,6 +252,11 @@ public class PulsarRecordCursor implements RecordCursor {
log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString());
throw new RuntimeException(e);
}
+ // stats for time spend deserializing entries
+ metricsTracker.end_ENTRY_DESERIALIZE_TIME();
+
+ // stats for num messages per entry
+ metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
}
} finally {
entry.release();
@@ -256,9 +283,19 @@ public class PulsarRecordCursor implements RecordCursor {
.compareTo(pulsarSplit.getEndPosition()) >= 0) {
isDone.set(true);
- } else if (entryQueue.remainingCapacity() > batchSize) {
- outstandingReadsRequests.decrementAndGet();
- cursor.asyncReadEntries(batchSize, this, System.currentTimeMillis());
+ } else {
+ int batchSize = Math.min(maxBatchSize, entryQueue.remainingCapacity());
+
+ if (batchSize > 0) {
+ outstandingReadsRequests.decrementAndGet();
+ cursor.asyncReadEntries(batchSize, this, System.nanoTime());
+
+ // stats for successful read request
+ metricsTracker.incr_READ_ATTEMPTS_SUCCESS();
+ } else {
+ // stats for failed read request because entry queue is full
+ metricsTracker.incr_READ_ATTEMPTS_FAIL();
+ }
}
}
}
@@ -267,6 +304,11 @@ public class PulsarRecordCursor implements RecordCursor {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
entryQueue.addAll(entries);
outstandingReadsRequests.incrementAndGet();
+
+ //set read latency stats for success
+ metricsTracker.register_READ_LATENCY_PER_BATCH_SUCCESS(System.nanoTime() - (long)ctx);
+ //stats for number of entries read
+ metricsTracker.incr_NUM_ENTRIES_PER_BATCH_SUCCESS(entries.size());
}
public boolean hashFinished() {
@@ -278,6 +320,11 @@ public class PulsarRecordCursor implements RecordCursor {
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
log.debug(exception, "Failed to read entries from topic %s", topicName.toString());
outstandingReadsRequests.incrementAndGet();
+
+ //set read latency stats for failed
+ metricsTracker.register_READ_LATENCY_PER_BATCH_FAIL(System.nanoTime() - (long)ctx);
+ //stats for number of entries read failed
+ metricsTracker.incr_NUM_ENTRIES_PER_BATCH_FAIL((long) maxBatchSize);
}
}
@@ -286,12 +333,12 @@ public class PulsarRecordCursor implements RecordCursor {
public boolean advanceNextPosition() {
if (readEntries == null) {
- readEntries = new ReadEntries();
- readEntries.run();
-
// start deserialize thread
deserializeEntries = new DeserializeEntries();
deserializeEntries.start();
+
+ readEntries = new ReadEntries();
+ readEntries.run();
}
while(true) {
@@ -309,13 +356,23 @@ public class PulsarRecordCursor implements RecordCursor {
} else {
try {
Thread.sleep(5);
+ // stats for time spent wait to read from message queue because its empty
+ metricsTracker.register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
+ //start time for deseralizing record
+ metricsTracker.start_RECORD_DESERIALIZE_TIME();
+
currentRecord = this.schemaHandler.deserialize(this.currentMessage.getData());
+ metricsTracker.incr_NUM_RECORD_DESERIALIZED();
+
+ // stats for time spend deserializing
+ metricsTracker.end_RECORD_DESERIALIZE_TIME();
+
return true;
}
@@ -423,6 +480,12 @@ public class PulsarRecordCursor implements RecordCursor {
log.error(e);
}
}
+
+ // set stat for total execution time of split
+ if (this.metricsTracker != null) {
+ this.metricsTracker.register_TOTAL_EXECUTION_TIME(System.nanoTime() - startTime);
+ this.metricsTracker.close();
+ }
}
private void checkFieldType(int field, Class<?> expected) {
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 5d8472d..fa121e3 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.javax.ws.rs.ClientErrorException;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
+import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -69,7 +70,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -664,7 +664,7 @@ public abstract class TestPulsarConnector {
@BeforeMethod
public void setup() throws Exception {
this.pulsarConnectorConfig = spy(new PulsarConnectorConfig());
- this.pulsarConnectorConfig.setEntryReadBatchSize(1);
+ this.pulsarConnectorConfig.setMaxEntryReadBatchSize(1);
this.pulsarConnectorConfig.setMaxSplitEntryQueueSize(10);
this.pulsarConnectorConfig.setMaxSplitMessageQueueSize(100);
@@ -924,7 +924,8 @@ public abstract class TestPulsarConnector {
for (Map.Entry<TopicName, PulsarSplit> split : splits.entrySet()) {
- PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor(fooColumnHandles, split.getValue(), pulsarConnectorConfig, managedLedgerFactory));
+ PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor(fooColumnHandles, split.getValue(),
+ pulsarConnectorConfig, managedLedgerFactory, new PulsarConnectorMetricsTracker(new NullStatsProvider())));
this.pulsarRecordCursors.put(split.getKey(), pulsarRecordCursor);
}
}