You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/10/04 00:23:21 UTC

[pulsar] branch master updated: adding metrics to presto pulsar connector (#2631)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 474645f   adding metrics to presto pulsar connector (#2631)
474645f is described below

commit 474645f7b4771e3316ca183ed0a813dd98a25758
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed Oct 3 17:23:17 2018 -0700

     adding metrics to presto pulsar connector (#2631)
    
    * adding metrics to presto pulsar connector
    
    * rename batch size
    
    * adding comments
    
    * refactoring metrics
    
    * modifying bytes read metric
    
    * fixing tests
    
    * deleting tmp file
    
    * adding jars to LICENSE
---
 conf/presto/catalog/pulsar.properties              |   2 +-
 distribution/server/src/assemble/LICENSE.bin.txt   |   5 +
 managed-ledger/pom.xml                             |  12 +
 pulsar-sql/presto-distribution/LICENSE             |   9 +
 .../pulsar/sql/presto/PulsarConnectorCache.java    |  17 ++
 .../pulsar/sql/presto/PulsarConnectorConfig.java   |  37 ++-
 .../sql/presto/PulsarConnectorMetricsTracker.java  | 321 +++++++++++++++++++++
 .../pulsar/sql/presto/PulsarConnectorUtils.java    |  39 +++
 .../pulsar/sql/presto/PulsarRecordCursor.java      | 111 +++++--
 .../pulsar/sql/presto/TestPulsarConnector.java     |   7 +-
 10 files changed, 529 insertions(+), 31 deletions(-)

diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties
index 5f922e5..0693077 100644
--- a/conf/presto/catalog/pulsar.properties
+++ b/conf/presto/catalog/pulsar.properties
@@ -24,7 +24,7 @@ pulsar.broker-service-url=http://localhost:8080
 # URI of Zookeeper cluster
 pulsar.zookeeper-uri=localhost:2181
 # minimum number of entries to read at a single time
-pulsar.entry-read-batch-size=100
+pulsar.max-entry-read-batch-size=100
 # default number of splits to use per query
 pulsar.target-num-splits=2
 # max message queue size
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 2ce32cb..a35a4c7 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -387,6 +387,7 @@ The Apache Software License, Version 2.0
     - org.apache.distributedlog-distributedlog-core-4.7.2-tests.jar
     - org.apache.distributedlog-distributedlog-core-4.7.2.jar
     - org.apache.distributedlog-distributedlog-protocol-4.7.2.jar
+    - org.apache.bookkeeper.stats-codahale-metrics-provider-4.7.2.jar
  * LZ4 -- net.jpountz.lz4-lz4-1.3.0.jar
  * AsyncHttpClient
     - org.asynchttpclient-async-http-client-2.1.0-alpha26.jar
@@ -459,6 +460,10 @@ The Apache Software License, Version 2.0
     - io.kubernetes-client-java-proto-2.0.0.jar
   * Joda Time
     - joda-time-joda-time-2.9.3.jar
+  * Dropwizard
+    - io.dropwizard.metrics-metrics-core-3.1.0.jar
+    - io.dropwizard.metrics-metrics-graphite-3.1.0.jar
+    - io.dropwizard.metrics-metrics-jvm-3.1.0.jar
 
 
 BSD 3-clause "New" or "Revised" License
diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index 4b1c04d..4910a62 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -39,6 +39,18 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.bookkeeper.stats</groupId>
+      <artifactId>prometheus-metrics-provider</artifactId>
+      <version>${bookkeeper.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.bookkeeper.stats</groupId>
+      <artifactId>codahale-metrics-provider</artifactId>
+      <version>${bookkeeper.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
       <version>${protobuf3.version}</version>
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index fb093d6..500ec9f 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -382,6 +382,15 @@ The Apache Software License, Version 2.0
     - validation-api-1.1.0.Final.jar
   * Objectsize
     - objectsize-0.0.12.jar
+  * Dropwizard Metrics
+    - metrics-core-3.1.0.jar
+    - metrics-graphite-3.1.0.jar
+    - metrics-jvm-3.1.0.jar
+  * Prometheus
+    - simpleclient-0.0.23.jar
+    - simpleclient_common-0.0.23.jar
+    - simpleclient_hotspot-0.0.23.jar
+    - simpleclient_servlet-0.0.23.jar
 
 Protocol Buffers License
  * Protocol Buffers
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index d13ddcd..d1b24a2 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -22,6 +22,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsProvider;
 
 public class PulsarConnectorCache {
 
@@ -29,8 +30,19 @@ public class PulsarConnectorCache {
 
     private final ManagedLedgerFactory managedLedgerFactory;
 
+    private final StatsProvider statsProvider;
+
     private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
         this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig);
+        this.statsProvider = PulsarConnectorUtils.createInstance(pulsarConnectorConfig.getStatsProvider(),
+                StatsProvider.class, getClass().getClassLoader());
+
+        // start stats provider
+        ClientConfiguration clientConfiguration = new ClientConfiguration();
+
+        pulsarConnectorConfig.getStatsProviderConfigs().forEach((key, value) -> clientConfiguration.setProperty(key, value));
+
+        this.statsProvider.start(clientConfiguration);
     }
 
     public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
@@ -55,9 +67,14 @@ public class PulsarConnectorCache {
         return managedLedgerFactory;
     }
 
+    public StatsProvider getStatsProvider() {
+        return statsProvider;
+    }
+
     public static void shutdown() throws ManagedLedgerException, InterruptedException {
         if (instance != null) {
             instance.managedLedgerFactory.shutdown();
+            instance.statsProvider.stop();
             instance = null;
         }
     }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index 482fab3..a1df7c7 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -18,11 +18,17 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import io.airlift.configuration.Config;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;
 
 import javax.validation.constraints.NotNull;
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
 
 public class PulsarConnectorConfig implements AutoCloseable {
 
@@ -32,6 +38,8 @@ public class PulsarConnectorConfig implements AutoCloseable {
     private int targetNumSplits = 2;
     private int maxSplitMessageQueueSize = 10000;
     private int maxSplitEntryQueueSize = 1000;
+    private String statsProvider = NullStatsProvider.class.getName();
+    private Map<String, String> statsProviderConfigs = new HashMap<>();
     private PulsarAdmin pulsarAdmin;
 
     @NotNull
@@ -57,12 +65,12 @@ public class PulsarConnectorConfig implements AutoCloseable {
     }
 
     @NotNull
-    public int getEntryReadBatchSize() {
+    public int getMaxEntryReadBatchSize() {
         return this.entryReadBatchSize;
     }
 
-    @Config("pulsar.entry-read-batch-size")
-    public PulsarConnectorConfig setEntryReadBatchSize(int batchSize) {
+    @Config("pulsar.max-entry-read-batch-size")
+    public PulsarConnectorConfig setMaxEntryReadBatchSize(int batchSize) {
         this.entryReadBatchSize = batchSize;
         return this;
     }
@@ -101,6 +109,29 @@ public class PulsarConnectorConfig implements AutoCloseable {
     }
 
     @NotNull
+    public String getStatsProvider() {
+        return statsProvider;
+    }
+
+    @Config("pulsar.stats-provider")
+    public PulsarConnectorConfig setStatsProvider(String statsProvider) {
+        this.statsProvider = statsProvider;
+        return this;
+    }
+
+    @NotNull
+    public Map<String, String> getStatsProviderConfigs() {
+        return statsProviderConfigs;
+    }
+
+    @Config("pulsar.stats-provider-configs")
+    public PulsarConnectorConfig setStatsProviderConfigs(String statsProviderConfigs) {
+        Type type = new TypeToken<Map<String, String>>(){}.getType();
+        this.statsProviderConfigs = new Gson().fromJson(statsProviderConfigs, type);
+        return this;
+    }
+
+    @NotNull
     public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
         if (this.pulsarAdmin == null) {
             this.pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(getBrokerServiceUrl()).build();
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java
new file mode 100644
index 0000000..9ff337e
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsLogger;
+
+import java.util.concurrent.TimeUnit;
+
+public class PulsarConnectorMetricsTracker implements AutoCloseable{
+
+    private StatsLogger statsLogger;
+
+    private static final String SCOPE = "split";
+
+    /** metric names **/
+
+    // time spend waiting to get entry from entry queue because it is empty
+    private static final String ENTRY_QUEUE_DEQUEUE_WAIT_TIME = "entry-queue-dequeue-wait-time";
+
+    // total time spend waiting to get entry from entry queue per query
+    private static final String ENTRY_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY = "entry-queue-dequeue-wait-time-per-query";
+
+    // number of bytes read from bookkeeper
+    private static final String BYTES_READ = "bytes-read";
+
+    // total number of bytes read per query
+    private static final String BYTES_READ_PER_QUERY = "bytes-read-per-query";
+
+    // time spent derserializing entries
+    private static final String ENTRY_DESERIALIZE_TIME = "entry-deserialize-time";
+
+    // time spent derserializing entries per query
+    private static final String ENTRY_DESERIALIZE_TIME_PER_QUERY = "entry-deserialize-time_per_query";
+
+    // time spent waiting for message queue enqueue because message queue is full
+    private static final String MESSAGE_QUEUE_ENQUEUE_WAIT_TIME = "message-queue-enqueue-wait-time";
+
+    // time spent waiting for message queue enqueue because message queue is full per query
+    private static final String MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_PER_QUERY = "message-queue-enqueue-wait-time-per-query";
+
+    private static final String NUM_MESSAGES_DERSERIALIZED = "num-messages-deserialized";
+
+    // number of messages deserialized
+    public static final String NUM_MESSAGES_DERSERIALIZED_PER_ENTRY = "num-messages-deserialized-per-entry";
+
+    // number of messages deserialized per query
+    public static final String NUM_MESSAGES_DERSERIALIZED_PER_QUERY = "num-messages-deserialized-per-query";
+
+    // number of read attempts.  Will fail if queues are full
+    public static final String READ_ATTEMTPS = "read-attempts";
+
+    // number of read attempts per query
+    public static final String READ_ATTEMTPS_PER_QUERY= "read-attempts-per-query";
+
+    // latency of reads per batch
+    public static final String READ_LATENCY_PER_BATCH = "read-latency-per-batch";
+
+    // total read latency per query
+    public static final String READ_LATENCY_PER_QUERY = "read-latency-per-query";
+
+    // number of entries per batch
+    public static final String NUM_ENTRIES_PER_BATCH = "num-entries-per-batch";
+
+    // number of entries per query
+    public static final String NUM_ENTRIES_PER_QUERY = "num-entries-per-query";
+
+    // time spent waiting to dequeue from message queue because its empty
+    public static final String MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY = "message-queue-dequeue-wait-time-per-query";
+
+    // time spent deserializing message to record e.g. avro, json, etc
+    public static final String RECORD_DESERIALIZE_TIME = "record-deserialize-time";
+
+    // time spent deserializing message to record per query
+    private static final String RECORD_DESERIALIZE_TIME_PER_QUERY = "record-deserialize-time-per-query";
+
+    private static final String NUM_RECORD_DESERIALIZED = "num-record-deserialized";
+
+    private static final String TOTAL_EXECUTION_TIME = "total-execution-time";
+
+    /** internal tracking variables **/
+    private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime;
+    private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum = 0L;
+    private long BYTES_READ_sum = 0L;
+    private long ENTRY_DESERIALIZE_TIME_startTime;
+    private long ENTRY_DESERIALIZE_TIME_sum = 0L;
+    private long MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime;
+    private long MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum = 0L;
+    private long NUM_MESSAGES_DERSERIALIZED_sum = 0L;
+    private long NUM_MESSAGED_DERSERIALIZED_PER_BATCH = 0L;
+    private long READ_ATTEMTPS_SUCCESS_sum = 0L;
+    private long READ_ATTEMTPS_FAIL_sum = 0L;
+    private long READ_LATENCY_SUCCESS_sum = 0L;
+    private long READ_LATENCY_FAIL_sum = 0L;
+    private long NUM_ENTRIES_PER_BATCH_sum = 0L;
+    private long MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum = 0L;
+    private long RECORD_DESERIALIZE_TIME_startTime;
+    private long RECORD_DESERIALIZE_TIME_sum = 0L;
+
+    public PulsarConnectorMetricsTracker(StatsProvider statsProvider) {
+        this.statsLogger = statsProvider instanceof NullStatsProvider
+                ? null : statsProvider.getStatsLogger(SCOPE);
+    }
+
+    public void start_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() {
+        if (statsLogger != null) {
+            ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime = System.nanoTime();
+        }
+    }
+
+    public void end_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() {
+        if (statsLogger != null) {
+            long time = System.nanoTime() - ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime;
+            ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum += time;
+            statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME)
+                    .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+        }
+    }
+
+    public void register_BYTES_READ(long bytes) {
+        if (statsLogger != null) {
+            BYTES_READ_sum += bytes;
+            statsLogger.getCounter(BYTES_READ).add(bytes);
+        }
+    }
+
+    public void start_ENTRY_DESERIALIZE_TIME() {
+        if (statsLogger != null) {
+            ENTRY_DESERIALIZE_TIME_startTime = System.nanoTime();
+        }
+    }
+
+    public void end_ENTRY_DESERIALIZE_TIME() {
+        if (statsLogger != null) {
+            long time = System.nanoTime() - ENTRY_DESERIALIZE_TIME_startTime;
+            ENTRY_DESERIALIZE_TIME_sum += time;
+            statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME)
+                    .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+        }
+    }
+
+    public void start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME() {
+        if (statsLogger != null) {
+            MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime = System.nanoTime();
+        }
+    }
+
+    public void end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME() {
+        if (statsLogger != null) {
+            long time = System.nanoTime() - MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime;
+            MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum += time;
+            statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME)
+                    .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+        }
+    }
+
+    public void incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() {
+        if (statsLogger != null) {
+            NUM_MESSAGED_DERSERIALIZED_PER_BATCH++;
+            statsLogger.getCounter(NUM_MESSAGES_DERSERIALIZED).add(1);
+        }
+    }
+
+    public void end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() {
+        if (statsLogger != null) {
+            NUM_MESSAGES_DERSERIALIZED_sum += NUM_MESSAGED_DERSERIALIZED_PER_BATCH;
+
+            statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_ENTRY)
+                    .registerSuccessfulValue(NUM_MESSAGED_DERSERIALIZED_PER_BATCH);
+
+            NUM_MESSAGED_DERSERIALIZED_PER_BATCH = 0L;
+        }
+    }
+
+    public void incr_READ_ATTEMPTS_SUCCESS() {
+        if (statsLogger != null) {
+            READ_ATTEMTPS_SUCCESS_sum++;
+            statsLogger.getOpStatsLogger(READ_ATTEMTPS)
+                    .registerSuccessfulValue(1L);
+        }
+    }
+
+    public void incr_READ_ATTEMPTS_FAIL() {
+        if (statsLogger != null) {
+            READ_ATTEMTPS_FAIL_sum++;
+            statsLogger.getOpStatsLogger(READ_ATTEMTPS)
+                    .registerFailedValue(1L);
+        }
+    }
+
+    public void register_READ_LATENCY_PER_BATCH_SUCCESS(long latency) {
+        if (statsLogger != null) {
+            READ_LATENCY_SUCCESS_sum += latency;
+            statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH)
+                    .registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
+        }
+    }
+
+    public void register_READ_LATENCY_PER_BATCH_FAIL(long latency) {
+        if (statsLogger != null) {
+            READ_LATENCY_FAIL_sum += latency;
+            statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH)
+                    .registerFailedEvent(latency, TimeUnit.NANOSECONDS);
+        }
+    }
+
+    public void incr_NUM_ENTRIES_PER_BATCH_SUCCESS(long delta) {
+        if (statsLogger != null) {
+            NUM_ENTRIES_PER_BATCH_sum += delta;
+            statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH)
+                    .registerSuccessfulValue(delta);
+        }
+    }
+
+    public void incr_NUM_ENTRIES_PER_BATCH_FAIL(long delta) {
+        if (statsLogger != null) {
+            statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH)
+                    .registerFailedValue(delta);
+        }
+    }
+
+    public void register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(long latency) {
+        if (statsLogger != null) {
+            MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum += latency;
+        }
+    }
+
+    public void start_RECORD_DESERIALIZE_TIME() {
+        if (statsLogger != null) {
+            RECORD_DESERIALIZE_TIME_startTime = System.nanoTime();
+        }
+    }
+
+    public void end_RECORD_DESERIALIZE_TIME() {
+        if (statsLogger != null) {
+            long time = System.nanoTime() - RECORD_DESERIALIZE_TIME_startTime;
+            RECORD_DESERIALIZE_TIME_sum += time;
+            statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME)
+                    .registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+        }
+    }
+
+    public void incr_NUM_RECORD_DESERIALIZED() {
+        if (statsLogger != null) {
+            statsLogger.getCounter(NUM_RECORD_DESERIALIZED).add(1);
+        }
+    }
+
+    public void register_TOTAL_EXECUTION_TIME(long latency) {
+        if (statsLogger != null) {
+            statsLogger.getOpStatsLogger(TOTAL_EXECUTION_TIME)
+                    .registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (statsLogger != null) {
+            // register total entry dequeue wait time for query
+            statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY)
+                    .registerSuccessfulEvent(ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum, TimeUnit.NANOSECONDS);
+
+            //register bytes read per query
+            statsLogger.getOpStatsLogger(BYTES_READ_PER_QUERY)
+                    .registerSuccessfulValue(BYTES_READ_sum);
+
+            // register total time spent deserializing entries for query
+            statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME_PER_QUERY)
+                    .registerSuccessfulEvent(ENTRY_DESERIALIZE_TIME_sum, TimeUnit.NANOSECONDS);
+
+            // register time spent waiting for message queue enqueue because message queue is full per query
+            statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_PER_QUERY)
+                    .registerSuccessfulEvent(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum, TimeUnit.NANOSECONDS);
+
+            // register number of messages deserialized per query
+            statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_QUERY)
+                    .registerSuccessfulValue(NUM_MESSAGES_DERSERIALIZED_sum);
+
+            // register number of read attempts per query
+            statsLogger.getOpStatsLogger(READ_ATTEMTPS_PER_QUERY)
+                    .registerSuccessfulValue(READ_ATTEMTPS_SUCCESS_sum);
+            statsLogger.getOpStatsLogger(READ_ATTEMTPS_PER_QUERY)
+                    .registerFailedValue(READ_ATTEMTPS_FAIL_sum);
+
+            // register total read latency for query
+            statsLogger.getOpStatsLogger(READ_LATENCY_PER_QUERY)
+                    .registerSuccessfulEvent(READ_LATENCY_SUCCESS_sum, TimeUnit.NANOSECONDS);
+            statsLogger.getOpStatsLogger(READ_LATENCY_PER_QUERY)
+                    .registerFailedEvent(READ_LATENCY_FAIL_sum, TimeUnit.NANOSECONDS);
+
+            // register number of entries per query
+            statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_QUERY)
+                    .registerSuccessfulValue(NUM_ENTRIES_PER_BATCH_sum);
+
+            // register time spent waiting to read for message queue per query
+            statsLogger.getOpStatsLogger(MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY)
+                    .registerSuccessfulEvent(MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum, TimeUnit.MILLISECONDS);
+
+            // register time spent deserializing records per query
+            statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME_PER_QUERY)
+                    .registerSuccessfulEvent(RECORD_DESERIALIZE_TIME_sum, TimeUnit.NANOSECONDS);
+        }
+    }
+}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
index e537574..388df0e 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
@@ -24,6 +24,9 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.TopicName;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
 public class PulsarConnectorUtils {
 
     public static Schema parseSchema(String schemaJson) {
@@ -39,4 +42,40 @@ public class PulsarConnectorUtils {
                     + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
         }
     }
+
+    /**
+     * Create an instance of <code>userClassName</code> using provided <code>classLoader</code>.
+     * This instance should implement the provided interface <code>xface</code>.
+     *
+     * @param userClassName user class name
+     * @param xface the interface that the reflected instance should implement
+     * @param classLoader class loader to load the class.
+     * @return the instance
+     */
+    public static <T> T createInstance(String userClassName,
+                                       Class<T> xface,
+                                       ClassLoader classLoader) {
+        Class<?> theCls;
+        try {
+            theCls = Class.forName(userClassName, true, classLoader);
+        } catch (ClassNotFoundException cnfe) {
+            throw new RuntimeException("User class must be in class path", cnfe);
+        }
+        if (!xface.isAssignableFrom(theCls)) {
+            throw new RuntimeException(userClassName + " not " + xface.getName());
+        }
+        Class<T> tCls = (Class<T>) theCls.asSubclass(xface);
+        try {
+            Constructor<T> meth = tCls.getDeclaredConstructor();
+            return meth.newInstance();
+        } catch (InstantiationException ie) {
+            throw new RuntimeException("User class must be concrete", ie);
+        } catch (NoSuchMethodException e) {
+            throw new RuntimeException("User class must have a no-arg constructor", e);
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException("User class must a public constructor", e);
+        } catch (InvocationTargetException e) {
+            throw new RuntimeException("User class constructor throws exception", e);
+        }
+    }
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index c8106aa..4088868 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -35,14 +35,12 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.ReadOnlyCursor;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.impl.MessageParser;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaType;
-import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
 
 import java.io.IOException;
 import java.util.List;
@@ -77,24 +75,22 @@ public class PulsarRecordCursor implements RecordCursor {
     private Message currentMessage;
     private Map<String, PulsarInternalColumn> internalColumnMap = PulsarInternalColumn.getInternalFieldsMap();
     private SchemaHandler schemaHandler;
-    private int batchSize;
+    private int maxBatchSize;
     private AtomicLong completedBytes = new AtomicLong(0L);
     private ReadEntries readEntries;
     private DeserializeEntries deserializeEntries;
     private TopicName topicName;
+    private PulsarConnectorMetricsTracker metricsTracker;
 
-    private static final Logger log = Logger.get(PulsarRecordCursor.class);
+    // Stats total execution time of split
+    private long startTime;
 
-    private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
-        ClientConfiguration bkClientConfiguration = new ClientConfiguration()
-                .setZkServers(pulsarConnectorConfig.getZookeeperUri())
-                .setAllowShadedLedgerManagerFactoryClass(true)
-                .setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.");
-        return new ManagedLedgerFactoryImpl(bkClientConfiguration);
-    }
+    private static final Logger log = Logger.get(PulsarRecordCursor.class);
 
     public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit,
                               PulsarConnectorConfig pulsarConnectorConfig) {
+        // Set start time for split
+        this.startTime = System.nanoTime();
         PulsarConnectorCache pulsarConnectorCache;
         try {
             pulsarConnectorCache = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig);
@@ -104,26 +100,29 @@ public class PulsarRecordCursor implements RecordCursor {
             throw new RuntimeException(e);
         }
         initialize(columnHandles, pulsarSplit, pulsarConnectorConfig,
-                pulsarConnectorCache.getManagedLedgerFactory());
+                pulsarConnectorCache.getManagedLedgerFactory(),
+                new PulsarConnectorMetricsTracker(pulsarConnectorCache.getStatsProvider()));
     }
 
     // Exposed for testing purposes
     PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig
-            pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory) {
-        initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory);
+            pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
+        initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, pulsarConnectorMetricsTracker);
     }
 
     private void initialize(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig
-            pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory) {
+            pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory,
+                            PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
         this.columnHandles = columnHandles;
         this.pulsarSplit = pulsarSplit;
         this.pulsarConnectorConfig = pulsarConnectorConfig;
-        this.batchSize = pulsarConnectorConfig.getEntryReadBatchSize();
+        this.maxBatchSize = pulsarConnectorConfig.getMaxEntryReadBatchSize();
         this.messageQueue = new ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
         this.entryQueue = new ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitEntryQueueSize());
         this.topicName = TopicName.get("persistent",
                 NamespaceName.get(pulsarSplit.getSchemaName()),
                 pulsarSplit.getTableName());
+        this.metricsTracker = pulsarConnectorMetricsTracker;
 
         Schema schema = PulsarConnectorUtils.parseSchema(pulsarSplit.getSchema());
 
@@ -191,7 +190,7 @@ public class PulsarRecordCursor implements RecordCursor {
         private final Thread thread;
 
         public DeserializeEntries() {
-            this.thread = new Thread(this);
+            this.thread = new Thread(this, "derserialize-thread-split-" + pulsarSplit.getSplitId());
         }
 
         public void interrupt() {
@@ -209,19 +208,42 @@ public class PulsarRecordCursor implements RecordCursor {
             while (isRunning.get()) {
                 Entry entry;
                 try {
+                    // start time for entry queue read
+                    metricsTracker.start_ENTRY_QUEUE_DEQUEUE_WAIT_TIME();
+                    // read from entry queue and block if empty
                     entry = entryQueue.take();
+                    // record entry queue wait time stats
+                    metricsTracker.end_ENTRY_QUEUE_DEQUEUE_WAIT_TIME();
                 } catch (InterruptedException e) {
                     break;
                 }
                 try {
-                    completedBytes.addAndGet(entry.getDataBuffer().readableBytes());
+                    long bytes = entry.getDataBuffer().readableBytes();
+                    completedBytes.addAndGet(bytes);
+                    // register stats for bytes read
+                    metricsTracker.register_BYTES_READ(bytes);
+
+                    // set start time for time deserializing entries for stats
+                    metricsTracker.start_ENTRY_DESERIALIZE_TIME();
+
                     // filter entries that is not part of my split
                     if (((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) < 0) {
                         try {
                             MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(),
                                     entry.getDataBuffer(), (messageId, message, byteBuf) -> {
                                         try {
+
+                                            // start time for message queue read
+                                            metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
+
+                                            // enqueue deserialize message from this entry
                                             messageQueue.put(message);
+
+                                            // stats for how long a read from message queue took
+                                            metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
+                                            // stats for number of messages read
+                                            metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
+
                                         } catch (InterruptedException e) {
                                             //no-op
                                         }
@@ -230,6 +252,11 @@ public class PulsarRecordCursor implements RecordCursor {
                             log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString());
                             throw new RuntimeException(e);
                         }
+                        // stats for time spend deserializing entries
+                        metricsTracker.end_ENTRY_DESERIALIZE_TIME();
+
+                        // stats for num messages per entry
+                        metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
                     }
                 } finally {
                     entry.release();
@@ -256,9 +283,19 @@ public class PulsarRecordCursor implements RecordCursor {
                         .compareTo(pulsarSplit.getEndPosition()) >= 0) {
                     isDone.set(true);
 
-                } else if (entryQueue.remainingCapacity() > batchSize) {
-                    outstandingReadsRequests.decrementAndGet();
-                    cursor.asyncReadEntries(batchSize, this, System.currentTimeMillis());
+                } else {
+                    int batchSize = Math.min(maxBatchSize, entryQueue.remainingCapacity());
+
+                    if (batchSize > 0) {
+                        outstandingReadsRequests.decrementAndGet();
+                        cursor.asyncReadEntries(batchSize, this, System.nanoTime());
+
+                        // stats for successful read request
+                        metricsTracker.incr_READ_ATTEMPTS_SUCCESS();
+                    } else {
+                        // stats for failed read request because entry queue is full
+                        metricsTracker.incr_READ_ATTEMPTS_FAIL();
+                    }
                 }
             }
         }
@@ -267,6 +304,11 @@ public class PulsarRecordCursor implements RecordCursor {
         public void readEntriesComplete(List<Entry> entries, Object ctx) {
             entryQueue.addAll(entries);
             outstandingReadsRequests.incrementAndGet();
+
+            //set read latency stats for success
+            metricsTracker.register_READ_LATENCY_PER_BATCH_SUCCESS(System.nanoTime() - (long)ctx);
+            //stats for number of entries read
+            metricsTracker.incr_NUM_ENTRIES_PER_BATCH_SUCCESS(entries.size());
         }
 
         public boolean hashFinished() {
@@ -278,6 +320,11 @@ public class PulsarRecordCursor implements RecordCursor {
         public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
             log.debug(exception, "Failed to read entries from topic %s", topicName.toString());
             outstandingReadsRequests.incrementAndGet();
+
+            //set read latency stats for failed
+            metricsTracker.register_READ_LATENCY_PER_BATCH_FAIL(System.nanoTime() - (long)ctx);
+            //stats for number of entries read failed
+            metricsTracker.incr_NUM_ENTRIES_PER_BATCH_FAIL((long) maxBatchSize);
         }
     }
 
@@ -286,12 +333,12 @@ public class PulsarRecordCursor implements RecordCursor {
     public boolean advanceNextPosition() {
 
         if (readEntries == null) {
-            readEntries = new ReadEntries();
-            readEntries.run();
-
             // start deserialize thread
             deserializeEntries = new DeserializeEntries();
             deserializeEntries.start();
+
+            readEntries = new ReadEntries();
+            readEntries.run();
         }
 
         while(true) {
@@ -309,13 +356,23 @@ public class PulsarRecordCursor implements RecordCursor {
             } else {
                 try {
                     Thread.sleep(5);
+                    // stats for time spent wait to read from message queue because its empty
+                    metricsTracker.register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(5);
                 } catch (InterruptedException e) {
                     throw new RuntimeException(e);
                 }
             }
         }
 
+        //start time for deseralizing record
+        metricsTracker.start_RECORD_DESERIALIZE_TIME();
+
         currentRecord = this.schemaHandler.deserialize(this.currentMessage.getData());
+        metricsTracker.incr_NUM_RECORD_DESERIALIZED();
+
+        // stats for time spend deserializing
+        metricsTracker.end_RECORD_DESERIALIZE_TIME();
+
         return true;
     }
 
@@ -423,6 +480,12 @@ public class PulsarRecordCursor implements RecordCursor {
                 log.error(e);
             }
         }
+
+        // set stat for total execution time of split
+        if (this.metricsTracker != null) {
+            this.metricsTracker.register_TOTAL_EXECUTION_TIME(System.nanoTime() - startTime);
+            this.metricsTracker.close();
+        }
     }
 
     private void checkFieldType(int field, Class<?> expected) {
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 5d8472d..fa121e3 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
 import org.apache.pulsar.shade.javax.ws.rs.ClientErrorException;
 import org.apache.pulsar.shade.javax.ws.rs.core.Response;
+import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -69,7 +70,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -664,7 +664,7 @@ public abstract class TestPulsarConnector {
     @BeforeMethod
     public void setup() throws Exception {
         this.pulsarConnectorConfig = spy(new PulsarConnectorConfig());
-        this.pulsarConnectorConfig.setEntryReadBatchSize(1);
+        this.pulsarConnectorConfig.setMaxEntryReadBatchSize(1);
         this.pulsarConnectorConfig.setMaxSplitEntryQueueSize(10);
         this.pulsarConnectorConfig.setMaxSplitMessageQueueSize(100);
 
@@ -924,7 +924,8 @@ public abstract class TestPulsarConnector {
 
         for (Map.Entry<TopicName, PulsarSplit> split : splits.entrySet()) {
 
-            PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor(fooColumnHandles, split.getValue(), pulsarConnectorConfig, managedLedgerFactory));
+            PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor(fooColumnHandles, split.getValue(),
+                    pulsarConnectorConfig, managedLedgerFactory, new PulsarConnectorMetricsTracker(new NullStatsProvider())));
             this.pulsarRecordCursors.put(split.getKey(), pulsarRecordCursor);
         }
     }