You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/05 06:46:42 UTC

[pulsar] branch master updated: Enforce checkstyle in the pulsar sql module (#4882)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f6fee1c  Enforce checkstyle in the pulsar sql module (#4882)
f6fee1c is described below

commit f6fee1c6b7976c439215876b6d892591d198a26b
Author: Sergii Zhevzhyk <vz...@users.noreply.github.com>
AuthorDate: Mon Aug 5 08:46:37 2019 +0200

    Enforce checkstyle in the pulsar sql module (#4882)
    
    ### Modifications
    
    The checksyle plugin was added to the pulsar sql module to enforce the defined style. All violations were fixed:
    
    - Ordering of imports.
    - Formatting of the code.
    - Absent Javadoc comments.
    - Other small issues.
---
 pulsar-sql/pom.xml                                 |  22 ++
 .../pulsar/sql/presto/AvroSchemaHandler.java       |  15 +-
 .../pulsar/sql/presto/JSONSchemaHandler.java       |  14 +-
 .../pulsar/sql/presto/PulsarColumnHandle.java      |  62 ++++--
 .../pulsar/sql/presto/PulsarColumnMetadata.java    |  41 ++--
 .../apache/pulsar/sql/presto/PulsarConnector.java  |  12 +-
 .../pulsar/sql/presto/PulsarConnectorCache.java    |  35 ++--
 .../pulsar/sql/presto/PulsarConnectorConfig.java   |  32 +--
 .../pulsar/sql/presto/PulsarConnectorFactory.java  |  10 +-
 .../pulsar/sql/presto/PulsarConnectorId.java       |  11 +-
 .../sql/presto/PulsarConnectorMetricsTracker.java  | 229 +++++++++++----------
 .../pulsar/sql/presto/PulsarConnectorModule.java   |  18 +-
 .../pulsar/sql/presto/PulsarConnectorUtils.java    |  13 +-
 .../pulsar/sql/presto/PulsarHandleResolver.java    |  17 +-
 .../pulsar/sql/presto/PulsarInternalColumn.java    |  41 +++-
 .../apache/pulsar/sql/presto/PulsarMetadata.java   |  64 +++---
 .../org/apache/pulsar/sql/presto/PulsarPlugin.java |   6 +-
 .../sql/presto/PulsarPrimitiveSchemaHandler.java   |   4 +-
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  37 ++--
 .../apache/pulsar/sql/presto/PulsarRecordSet.java  |   8 +-
 .../pulsar/sql/presto/PulsarRecordSetProvider.java |  11 +-
 .../org/apache/pulsar/sql/presto/PulsarSplit.java  |  38 ++--
 .../pulsar/sql/presto/PulsarSplitManager.java      |  42 ++--
 .../pulsar/sql/presto/PulsarTableHandle.java       |  12 +-
 .../pulsar/sql/presto/PulsarTableLayoutHandle.java |  24 +--
 .../pulsar/sql/presto/PulsarTopicDescription.java  |   9 +-
 .../pulsar/sql/presto/PulsarTransactionHandle.java |   3 +
 .../apache/pulsar/sql/presto/SchemaHandler.java    |   3 +
 ...sarTransactionHandle.java => package-info.java} |  11 +-
 29 files changed, 481 insertions(+), 363 deletions(-)

diff --git a/pulsar-sql/pom.xml b/pulsar-sql/pom.xml
index 1f7a4bb..642fe59 100644
--- a/pulsar-sql/pom.xml
+++ b/pulsar-sql/pom.xml
@@ -114,4 +114,26 @@
         </dependencies>
     </dependencyManagement>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>check-style</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            <configLocation>../buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation>
+                            <suppressionsLocation>../buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation>
+                            <encoding>UTF-8</encoding>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
index 5d55682..f6807a2 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
@@ -19,11 +19,12 @@
 package org.apache.pulsar.sql.presto;
 
 import io.airlift.log.Logger;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.concurrent.FastThreadLocal;
+import java.io.IOException;
+import java.util.List;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
@@ -31,9 +32,9 @@ import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
 
-import java.io.IOException;
-import java.util.List;
-
+/**
+ * Schema handler for payload in the Avro format.
+ */
 public class AvroSchemaHandler implements SchemaHandler {
 
     private final DatumReader<GenericRecord> datumReader;
@@ -64,7 +65,7 @@ public class AvroSchemaHandler implements SchemaHandler {
 
             BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(heapBuffer.array(), heapBuffer.arrayOffset(),
                     heapBuffer.readableBytes(), decoderFromCache);
-            if (decoderFromCache==null) {
+            if (decoderFromCache == null) {
                 decoders.set(decoder);
             }
             return this.datumReader.read(null, decoder);
@@ -87,7 +88,7 @@ public class AvroSchemaHandler implements SchemaHandler {
                 return null;
             }
             if (positionIndices.length > 0) {
-                for (int i = 1 ; i < positionIndices.length; i++) {
+                for (int i = 1; i < positionIndices.length; i++) {
                     curr = ((GenericRecord) curr).get(positionIndices[i]);
                     if (curr == null) {
                         return null;
@@ -96,7 +97,7 @@ public class AvroSchemaHandler implements SchemaHandler {
             }
             return curr;
         } catch (Exception ex) {
-            log.debug(ex,"%s", ex);
+            log.debug(ex, "%s", ex);
         }
         return null;
     }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
index 5a12d30..8649e41 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
@@ -20,17 +20,17 @@ package org.apache.pulsar.sql.presto;
 
 import com.dslplatform.json.DslJson;
 import com.facebook.presto.spi.type.Type;
-
 import io.airlift.log.Logger;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.FastThreadLocal;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.util.concurrent.FastThreadLocal;
-
+/**
+ * Schema handler for payload in the JSON format.
+ */
 public class JSONSchemaHandler implements SchemaHandler {
 
     private static final Logger log = Logger.get(JSONSchemaHandler.class);
@@ -84,7 +84,7 @@ public class JSONSchemaHandler implements SchemaHandler {
             if (field == null) {
                 return null;
             }
-            for (int i = 1; i < fieldNames.length ; i++) {
+            for (int i = 1; i < fieldNames.length; i++) {
                 field = ((Map) field).get(fieldNames[i]);
                 if (field == null) {
                     return null;
@@ -101,7 +101,7 @@ public class JSONSchemaHandler implements SchemaHandler {
 
             return field;
         } catch (Exception ex) {
-            log.debug(ex,"%s", ex);
+            log.debug(ex, "%s", ex);
         }
         return null;
     }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
index f98e864..2a1bd43 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
@@ -18,27 +18,29 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import static java.util.Objects.requireNonNull;
+
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ColumnMetadata;
 import com.facebook.presto.spi.type.Type;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-
 import java.util.Arrays;
 
-import static java.util.Objects.requireNonNull;
-
+/**
+ * This class represents the basic information about a presto column.
+ */
 public class PulsarColumnHandle implements ColumnHandle {
 
     private final String connectorId;
 
     /**
-     * Column Name
+     * Column Name.
      */
     private final String name;
 
     /**
-     * Column type
+     * Column type.
      */
     private final Type type;
 
@@ -116,17 +118,33 @@ public class PulsarColumnHandle implements ColumnHandle {
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
 
         PulsarColumnHandle that = (PulsarColumnHandle) o;
 
-        if (hidden != that.hidden) return false;
-        if (internal != that.internal) return false;
-        if (connectorId != null ? !connectorId.equals(that.connectorId) : that.connectorId != null) return false;
-        if (name != null ? !name.equals(that.name) : that.name != null) return false;
-        if (type != null ? !type.equals(that.type) : that.type != null) return false;
-        if (!Arrays.deepEquals(fieldNames, that.fieldNames)) return false;
+        if (hidden != that.hidden) {
+            return false;
+        }
+        if (internal != that.internal) {
+            return false;
+        }
+        if (connectorId != null ? !connectorId.equals(that.connectorId) : that.connectorId != null) {
+            return false;
+        }
+        if (name != null ? !name.equals(that.name) : that.name != null) {
+            return false;
+        }
+        if (type != null ? !type.equals(that.type) : that.type != null) {
+            return false;
+        }
+        if (!Arrays.deepEquals(fieldNames, that.fieldNames)) {
+            return false;
+        }
         return Arrays.deepEquals(positionIndices, that.positionIndices);
     }
 
@@ -144,14 +162,14 @@ public class PulsarColumnHandle implements ColumnHandle {
 
     @Override
     public String toString() {
-        return "PulsarColumnHandle{" +
-                "connectorId='" + connectorId + '\'' +
-                ", name='" + name + '\'' +
-                ", type=" + type +
-                ", hidden=" + hidden +
-                ", internal=" + internal +
-                ", fieldNames=" + Arrays.toString(fieldNames) +
-                ", positionIndices=" + Arrays.toString(positionIndices) +
-                '}';
+        return "PulsarColumnHandle{"
+            + "connectorId='" + connectorId + '\''
+            + ", name='" + name + '\''
+            + ", type=" + type
+            + ", hidden=" + hidden
+            + ", internal=" + internal
+            + ", fieldNames=" + Arrays.toString(fieldNames)
+            + ", positionIndices=" + Arrays.toString(positionIndices)
+            + '}';
     }
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
index 9a484ba..5b033fa 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
@@ -20,10 +20,11 @@ package org.apache.pulsar.sql.presto;
 
 import com.facebook.presto.spi.ColumnMetadata;
 import com.facebook.presto.spi.type.Type;
-
 import java.util.Arrays;
-import java.util.List;
 
+/**
+ * Description of the column metadata.
+ */
 public class PulsarColumnMetadata extends ColumnMetadata {
 
     private boolean isInternal;
@@ -60,25 +61,37 @@ public class PulsarColumnMetadata extends ColumnMetadata {
 
     @Override
     public String toString() {
-        return "PulsarColumnMetadata{" +
-                "isInternal=" + isInternal +
-                ", nameWithCase='" + nameWithCase + '\'' +
-                ", fieldNames=" + Arrays.toString(fieldNames) +
-                ", positionIndices=" + Arrays.toString(positionIndices) +
-                '}';
+        return "PulsarColumnMetadata{"
+            + "isInternal=" + isInternal
+            + ", nameWithCase='" + nameWithCase + '\''
+            + ", fieldNames=" + Arrays.toString(fieldNames)
+            + ", positionIndices=" + Arrays.toString(positionIndices)
+            + '}';
     }
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        if (!super.equals(o)) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
 
         PulsarColumnMetadata that = (PulsarColumnMetadata) o;
 
-        if (isInternal != that.isInternal) return false;
-        if (nameWithCase != null ? !nameWithCase.equals(that.nameWithCase) : that.nameWithCase != null) return false;
-        if (!Arrays.deepEquals(fieldNames, that.fieldNames)) return false;
+        if (isInternal != that.isInternal) {
+            return false;
+        }
+        if (nameWithCase != null ? !nameWithCase.equals(that.nameWithCase) : that.nameWithCase != null) {
+            return false;
+        }
+        if (!Arrays.deepEquals(fieldNames, that.fieldNames)) {
+            return false;
+        }
         return Arrays.deepEquals(positionIndices, that.positionIndices);
     }
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
index 1d89b51..f73c5b0 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
 import com.facebook.presto.spi.connector.Connector;
 import com.facebook.presto.spi.connector.ConnectorMetadata;
 import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
@@ -26,13 +30,11 @@ import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
 import com.facebook.presto.spi.transaction.IsolationLevel;
 import io.airlift.bootstrap.LifeCycleManager;
 import io.airlift.log.Logger;
-
 import javax.inject.Inject;
 
-import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
-import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * This file contains implementation of the connector to the Presto engine.
+ */
 public class PulsarConnector implements Connector {
 
     private static final Logger log = Logger.get(PulsarConnector.class);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index 5478984..a42c9fe 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -18,8 +18,13 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import io.airlift.log.Logger;
+import java.io.IOException;
+import java.util.Map;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
@@ -34,13 +39,9 @@ import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.PulsarVersion;
 
-import java.io.IOException;
-import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.annotations.VisibleForTesting;
-
+/**
+ * Implementation of a cache for the Pulsar connector.
+ */
 public class PulsarConnectorCache {
 
     private static final Logger log = Logger.get(PulsarConnectorCache.class);
@@ -68,7 +69,7 @@ public class PulsarConnectorCache {
         // start stats provider
         ClientConfiguration clientConfiguration = new ClientConfiguration();
 
-        pulsarConnectorConfig.getStatsProviderConfigs().forEach((key, value) -> clientConfiguration.setProperty(key, value));
+        pulsarConnectorConfig.getStatsProviderConfigs().forEach(clientConfiguration::setProperty);
 
         this.statsProvider.start(clientConfiguration);
 
@@ -84,7 +85,8 @@ public class PulsarConnectorCache {
         return instance;
     }
 
-    private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
+    private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig)
+        throws Exception {
         ClientConfiguration bkClientConfiguration = new ClientConfiguration()
                 .setZkServers(pulsarConnectorConfig.getZookeeperUri())
                 .setClientTcpNoDelay(false)
@@ -123,16 +125,17 @@ public class PulsarConnectorCache {
                 Map<String, String> offloaderProperties = conf.getOffloaderProperties();
                 offloaderProperties.put(OFFLOADERS_DIRECTOR, conf.getOffloadersDirectory());
                 offloaderProperties.put(MANAGED_LEDGER_OFFLOAD_DRIVER, conf.getManagedLedgerOffloadDriver());
-                offloaderProperties.put(MANAGED_LEDGER_OFFLOAD_MAX_THREADS, String.valueOf(conf.getManagedLedgerOffloadMaxThreads()));
+                offloaderProperties
+                    .put(MANAGED_LEDGER_OFFLOAD_MAX_THREADS, String.valueOf(conf.getManagedLedgerOffloadMaxThreads()));
 
                 try {
                     return offloaderFactory.create(
-                            PulsarConnectorUtils.getProperties(offloaderProperties),
-                            ImmutableMap.of(
-                                    LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
-                                    LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
-                            ),
-                            getOffloaderScheduler(conf));
+                        PulsarConnectorUtils.getProperties(offloaderProperties),
+                        ImmutableMap.of(
+                            LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
+                            LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
+                        ),
+                        getOffloaderScheduler(conf));
                 } catch (IOException ioe) {
                     log.error("Failed to create offloader: ", ioe);
                     throw new RuntimeException(ioe.getMessage(), ioe.getCause());
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index edfa97d..b53c589 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -20,20 +20,21 @@ package org.apache.pulsar.sql.presto;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.airlift.configuration.Config;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
-import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.protocol.Commands;
 
-import javax.validation.constraints.NotNull;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.regex.Matcher;
-
+/**
+ * This object handles configuration of the Pulsar connector for the Presto engine.
+ */
 public class PulsarConnectorConfig implements AutoCloseable {
 
     private String brokerServiceUrl = "http://localhost:8080";
@@ -55,7 +56,7 @@ public class PulsarConnectorConfig implements AutoCloseable {
     private boolean namespaceDelimiterRewriteEnable = false;
     private String rewriteNamespaceDelimiter = "/";
 
-    /**** --- Ledger Offloading --- ****/
+    // --- Ledger Offloading ---
     private String managedLedgerOffloadDriver = null;
     private int managedLedgerOffloadMaxThreads = 2;
     private String offloadersDirectory = "./offloaders";
@@ -188,14 +189,15 @@ public class PulsarConnectorConfig implements AutoCloseable {
         return this;
     }
 
-    /**** --- Ledger Offloading --- ****/
+    // --- Ledger Offloading ---
 
     public int getManagedLedgerOffloadMaxThreads() {
         return this.managedLedgerOffloadMaxThreads;
     }
 
     @Config("pulsar.managed-ledger-offload-max-threads")
-    public PulsarConnectorConfig setManagedLedgerOffloadMaxThreads(int managedLedgerOffloadMaxThreads) throws IOException {
+    public PulsarConnectorConfig setManagedLedgerOffloadMaxThreads(int managedLedgerOffloadMaxThreads)
+        throws IOException {
         this.managedLedgerOffloadMaxThreads = managedLedgerOffloadMaxThreads;
         return this;
     }
@@ -231,7 +233,7 @@ public class PulsarConnectorConfig implements AutoCloseable {
         return this;
     }
 
-    /**** --- Authentication --- ****/
+    // --- Authentication ---
 
     public String getAuthPlugin() {
         return this.authPluginClassName;
@@ -316,8 +318,8 @@ public class PulsarConnectorConfig implements AutoCloseable {
 
     @Override
     public String toString() {
-        return "PulsarConnectorConfig{" +
-                "brokerServiceUrl='" + brokerServiceUrl + '\'' +
-                '}';
+        return "PulsarConnectorConfig{"
+            + "brokerServiceUrl='" + brokerServiceUrl + '\''
+            + '}';
     }
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java
index 0719e8e..48d1081 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static java.util.Objects.requireNonNull;
+
 import com.facebook.presto.spi.ConnectorHandleResolver;
 import com.facebook.presto.spi.connector.Connector;
 import com.facebook.presto.spi.connector.ConnectorContext;
@@ -26,12 +29,11 @@ import com.google.inject.Injector;
 import io.airlift.bootstrap.Bootstrap;
 import io.airlift.json.JsonModule;
 import io.airlift.log.Logger;
-
 import java.util.Map;
 
-import static com.google.common.base.Throwables.throwIfUnchecked;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * The factory class which helps to build the presto connector.
+ */
 public class PulsarConnectorFactory implements ConnectorFactory {
 
     private static final Logger log = Logger.get(PulsarConnectorFactory.class);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorId.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorId.java
index 0ba4e1b..633bccb 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorId.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorId.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.sql.presto;
 
 import static java.util.Objects.requireNonNull;
 
+/**
+ * Unique identifier of a connector.
+ */
 public class PulsarConnectorId {
     private final String id;
 
@@ -34,8 +37,12 @@ public class PulsarConnectorId {
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
 
         PulsarConnectorId that = (PulsarConnectorId) o;
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java
index 34969d8..e3919d3 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java
@@ -18,21 +18,23 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
 
-import java.util.concurrent.TimeUnit;
-
+/**
+ * This class helps to track metrics related to the connector.
+ */
 public class PulsarConnectorMetricsTracker implements AutoCloseable{
 
     private final StatsLogger statsLogger;
 
     private static final String SCOPE = "split";
 
-    /** metric names **/
+    // metric names
 
     // time spend waiting to get entry from entry queue because it is empty
     private static final String ENTRY_QUEUE_DEQUEUE_WAIT_TIME = "entry-queue-dequeue-wait-time";
@@ -70,7 +72,7 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{
     public static final String READ_ATTEMPTS = "read-attempts";
 
     // number of read attempts per query
-    public static final String READ_ATTEMTPS_PER_QUERY= "read-attempts-per-query";
+    public static final String READ_ATTEMTPS_PER_QUERY = "read-attempts-per-query";
 
     // latency of reads per batch
     public static final String READ_LATENCY_PER_BATCH = "read-latency-per-batch";
@@ -97,209 +99,208 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{
 
     private static final String TOTAL_EXECUTION_TIME = "total-execution-time";
 
-    /** stats loggers **/
-
-    private final OpStatsLogger statsLogger_entryQueueDequeueWaitTime;
-    private final Counter statsLogger_bytesRead;
-    private final OpStatsLogger statsLogger_entryDeserializetime;
-    private final OpStatsLogger statsLogger_messageQueueEnqueueWaitTime;
-    private final Counter statsLogger_numMessagesDeserialized;
-    private final OpStatsLogger statsLogger_numMessagesDeserializedPerEntry;
-    private final OpStatsLogger statsLogger_readAttempts;
-    private final OpStatsLogger statsLogger_readLatencyPerBatch;
-    private final OpStatsLogger statsLogger_numEntriesPerBatch;
-    private final OpStatsLogger statsLogger_recordDeserializeTime;
-    private final Counter statsLogger_numRecordDeserialized;
-    private final OpStatsLogger statsLogger_totalExecutionTime;
-
-    /** internal tracking variables **/
-    private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime;
-    private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum = 0L;
-    private long BYTES_READ_sum = 0L;
-    private long ENTRY_DESERIALIZE_TIME_startTime;
-    private long ENTRY_DESERIALIZE_TIME_sum = 0L;
-    private long MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime;
-    private long MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum = 0L;
-    private long NUM_MESSAGES_DERSERIALIZED_sum = 0L;
-    private long NUM_MESSAGED_DERSERIALIZED_PER_BATCH = 0L;
-    private long READ_ATTEMTPS_SUCCESS_sum = 0L;
-    private long READ_ATTEMTPS_FAIL_sum = 0L;
-    private long READ_LATENCY_SUCCESS_sum = 0L;
-    private long READ_LATENCY_FAIL_sum = 0L;
-    private long NUM_ENTRIES_PER_BATCH_sum = 0L;
-    private long MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum = 0L;
-    private long RECORD_DESERIALIZE_TIME_startTime;
-    private long RECORD_DESERIALIZE_TIME_sum = 0L;
+    // stats loggers
+
+    private final OpStatsLogger statsLoggerEntryQueueDequeueWaitTime;
+    private final Counter statsLoggerBytesRead;
+    private final OpStatsLogger statsLoggerEntryDeserializeTime;
+    private final OpStatsLogger statsLoggerMessageQueueEnqueueWaitTime;
+    private final Counter statsLoggerNumMessagesDeserialized;
+    private final OpStatsLogger statsLoggerNumMessagesDeserializedPerEntry;
+    private final OpStatsLogger statsLoggerReadAttempts;
+    private final OpStatsLogger statsLoggerReadLatencyPerBatch;
+    private final OpStatsLogger statsLoggerNumEntriesPerBatch;
+    private final OpStatsLogger statsLoggerRecordDeserializeTime;
+    private final Counter statsLoggerNumRecordDeserialized;
+    private final OpStatsLogger statsLoggerTotalExecutionTime;
+
+    // internal tracking variables
+    private long entryQueueDequeueWaitTimeStartTime;
+    private long entryQueueDequeueWaitTimeSum = 0L;
+    private long bytesReadSum = 0L;
+    private long entryDeserializeTimeStartTime;
+    private long entryDeserializeTimeSum = 0L;
+    private long messageQueueEnqueueWaitTimeStartTime;
+    private long messageQueueEnqueueWaitTimeSum = 0L;
+    private long numMessagesDerserializedSum = 0L;
+    private long numMessagedDerserializedPerBatch = 0L;
+    private long readAttemptsSuccessSum = 0L;
+    private long readAttemptsFailSum = 0L;
+    private long readLatencySuccessSum = 0L;
+    private long readLatencyFailSum = 0L;
+    private long numEntriesPerBatchSum = 0L;
+    private long messageQueueDequeueWaitTimeSum = 0L;
+    private long recordDeserializeTimeStartTime;
+    private long recordDeserializeTimeSum = 0L;
 
     public PulsarConnectorMetricsTracker(StatsProvider statsProvider) {
         this.statsLogger = statsProvider instanceof NullStatsProvider
                 ? null : statsProvider.getStatsLogger(SCOPE);
 
         if (this.statsLogger != null) {
-            statsLogger_entryQueueDequeueWaitTime = statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME);
-            statsLogger_bytesRead = statsLogger.getCounter(BYTES_READ);
-            statsLogger_entryDeserializetime = statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME);
-            statsLogger_messageQueueEnqueueWaitTime = statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME);
-            statsLogger_numMessagesDeserialized = statsLogger.getCounter(NUM_MESSAGES_DERSERIALIZED);
-            statsLogger_numMessagesDeserializedPerEntry = statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_ENTRY);
-            statsLogger_readAttempts = statsLogger.getOpStatsLogger(READ_ATTEMPTS);
-            statsLogger_readLatencyPerBatch = statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH);
-            statsLogger_numEntriesPerBatch = statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH);
-            statsLogger_recordDeserializeTime = statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME);
-            statsLogger_numRecordDeserialized = statsLogger.getCounter(NUM_RECORD_DESERIALIZED);
-            statsLogger_totalExecutionTime = statsLogger.getOpStatsLogger(TOTAL_EXECUTION_TIME);
+            statsLoggerEntryQueueDequeueWaitTime = statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME);
+            statsLoggerBytesRead = statsLogger.getCounter(BYTES_READ);
+            statsLoggerEntryDeserializeTime = statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME);
+            statsLoggerMessageQueueEnqueueWaitTime = statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME);
+            statsLoggerNumMessagesDeserialized = statsLogger.getCounter(NUM_MESSAGES_DERSERIALIZED);
+            statsLoggerNumMessagesDeserializedPerEntry = statsLogger
+                .getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_ENTRY);
+            statsLoggerReadAttempts = statsLogger.getOpStatsLogger(READ_ATTEMPTS);
+            statsLoggerReadLatencyPerBatch = statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH);
+            statsLoggerNumEntriesPerBatch = statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH);
+            statsLoggerRecordDeserializeTime = statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME);
+            statsLoggerNumRecordDeserialized = statsLogger.getCounter(NUM_RECORD_DESERIALIZED);
+            statsLoggerTotalExecutionTime = statsLogger.getOpStatsLogger(TOTAL_EXECUTION_TIME);
         } else {
-            statsLogger_entryQueueDequeueWaitTime = null;
-            statsLogger_bytesRead = null;
-            statsLogger_entryDeserializetime = null;
-            statsLogger_messageQueueEnqueueWaitTime = null;
-            statsLogger_numMessagesDeserialized = null;
-            statsLogger_numMessagesDeserializedPerEntry = null;
-            statsLogger_readAttempts = null;
-            statsLogger_readLatencyPerBatch = null;
-            statsLogger_numEntriesPerBatch = null;
-            statsLogger_recordDeserializeTime = null;
-            statsLogger_numRecordDeserialized = null;
-            statsLogger_totalExecutionTime = null;
+            statsLoggerEntryQueueDequeueWaitTime = null;
+            statsLoggerBytesRead = null;
+            statsLoggerEntryDeserializeTime = null;
+            statsLoggerMessageQueueEnqueueWaitTime = null;
+            statsLoggerNumMessagesDeserialized = null;
+            statsLoggerNumMessagesDeserializedPerEntry = null;
+            statsLoggerReadAttempts = null;
+            statsLoggerReadLatencyPerBatch = null;
+            statsLoggerNumEntriesPerBatch = null;
+            statsLoggerRecordDeserializeTime = null;
+            statsLoggerNumRecordDeserialized = null;
+            statsLoggerTotalExecutionTime = null;
         }
     }
 
     public void start_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() {
         if (statsLogger != null) {
-            ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime = System.nanoTime();
+            entryQueueDequeueWaitTimeStartTime = System.nanoTime();
         }
     }
 
     public void end_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() {
         if (statsLogger != null) {
-            long time = System.nanoTime() - ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime;
-            ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum += time;
-            statsLogger_entryQueueDequeueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+            long time = System.nanoTime() - entryQueueDequeueWaitTimeStartTime;
+            entryQueueDequeueWaitTimeSum += time;
+            statsLoggerEntryQueueDequeueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
         }
     }
 
     public void register_BYTES_READ(long bytes) {
         if (statsLogger != null) {
-            BYTES_READ_sum += bytes;
-            statsLogger_bytesRead.add(bytes);
+            bytesReadSum += bytes;
+            statsLoggerBytesRead.add(bytes);
         }
     }
 
     public void start_ENTRY_DESERIALIZE_TIME() {
         if (statsLogger != null) {
-            ENTRY_DESERIALIZE_TIME_startTime = System.nanoTime();
+            entryDeserializeTimeStartTime = System.nanoTime();
         }
     }
 
     public void end_ENTRY_DESERIALIZE_TIME() {
         if (statsLogger != null) {
-            long time = System.nanoTime() - ENTRY_DESERIALIZE_TIME_startTime;
-            ENTRY_DESERIALIZE_TIME_sum += time;
-            statsLogger_entryDeserializetime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+            long time = System.nanoTime() - entryDeserializeTimeStartTime;
+            entryDeserializeTimeSum += time;
+            statsLoggerEntryDeserializeTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
         }
     }
 
     public void start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME() {
         if (statsLogger != null) {
-            MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime = System.nanoTime();
+            messageQueueEnqueueWaitTimeStartTime = System.nanoTime();
         }
     }
 
     public void end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME() {
         if (statsLogger != null) {
-            long time = System.nanoTime() - MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime;
-            MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum += time;
-            statsLogger_messageQueueEnqueueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+            long time = System.nanoTime() - messageQueueEnqueueWaitTimeStartTime;
+            messageQueueEnqueueWaitTimeSum += time;
+            statsLoggerMessageQueueEnqueueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
         }
     }
 
     public void incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() {
         if (statsLogger != null) {
-            NUM_MESSAGED_DERSERIALIZED_PER_BATCH++;
-            statsLogger_numMessagesDeserialized.add(1);
+            numMessagedDerserializedPerBatch++;
+            statsLoggerNumMessagesDeserialized.add(1);
         }
     }
 
     public void end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() {
         if (statsLogger != null) {
-            NUM_MESSAGES_DERSERIALIZED_sum += NUM_MESSAGED_DERSERIALIZED_PER_BATCH;
-
-            statsLogger_numMessagesDeserializedPerEntry.registerSuccessfulValue(NUM_MESSAGED_DERSERIALIZED_PER_BATCH);
-
-            NUM_MESSAGED_DERSERIALIZED_PER_BATCH = 0L;
+            numMessagesDerserializedSum += numMessagedDerserializedPerBatch;
+            statsLoggerNumMessagesDeserializedPerEntry.registerSuccessfulValue(numMessagedDerserializedPerBatch);
+            numMessagedDerserializedPerBatch = 0L;
         }
     }
 
     public void incr_READ_ATTEMPTS_SUCCESS() {
         if (statsLogger != null) {
-            READ_ATTEMTPS_SUCCESS_sum++;
-            statsLogger_readAttempts.registerSuccessfulValue(1L);
+            readAttemptsSuccessSum++;
+            statsLoggerReadAttempts.registerSuccessfulValue(1L);
         }
     }
 
     public void incr_READ_ATTEMPTS_FAIL() {
         if (statsLogger != null) {
-            READ_ATTEMTPS_FAIL_sum++;
-            statsLogger_readAttempts.registerFailedValue(1L);
+            readAttemptsFailSum++;
+            statsLoggerReadAttempts.registerFailedValue(1L);
         }
     }
 
     public void register_READ_LATENCY_PER_BATCH_SUCCESS(long latency) {
         if (statsLogger != null) {
-            READ_LATENCY_SUCCESS_sum += latency;
-            statsLogger_readLatencyPerBatch.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
+            readLatencySuccessSum += latency;
+            statsLoggerReadLatencyPerBatch.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
         }
     }
 
     public void register_READ_LATENCY_PER_BATCH_FAIL(long latency) {
         if (statsLogger != null) {
-            READ_LATENCY_FAIL_sum += latency;
-            statsLogger_readLatencyPerBatch.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
+            readLatencyFailSum += latency;
+            statsLoggerReadLatencyPerBatch.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
         }
     }
 
     public void incr_NUM_ENTRIES_PER_BATCH_SUCCESS(long delta) {
         if (statsLogger != null) {
-            NUM_ENTRIES_PER_BATCH_sum += delta;
-            statsLogger_numEntriesPerBatch.registerSuccessfulValue(delta);
+            numEntriesPerBatchSum += delta;
+            statsLoggerNumEntriesPerBatch.registerSuccessfulValue(delta);
         }
     }
 
     public void incr_NUM_ENTRIES_PER_BATCH_FAIL(long delta) {
         if (statsLogger != null) {
-            statsLogger_numEntriesPerBatch.registerFailedValue(delta);
+            statsLoggerNumEntriesPerBatch.registerFailedValue(delta);
         }
     }
 
     public void register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(long latency) {
         if (statsLogger != null) {
-            MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum += latency;
+            messageQueueDequeueWaitTimeSum += latency;
         }
     }
 
     public void start_RECORD_DESERIALIZE_TIME() {
         if (statsLogger != null) {
-            RECORD_DESERIALIZE_TIME_startTime = System.nanoTime();
+            recordDeserializeTimeStartTime = System.nanoTime();
         }
     }
 
     public void end_RECORD_DESERIALIZE_TIME() {
         if (statsLogger != null) {
-            long time = System.nanoTime() - RECORD_DESERIALIZE_TIME_startTime;
-            RECORD_DESERIALIZE_TIME_sum += time;
-            statsLogger_recordDeserializeTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+            long time = System.nanoTime() - recordDeserializeTimeStartTime;
+            recordDeserializeTimeSum += time;
+            statsLoggerRecordDeserializeTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
         }
     }
 
     public void incr_NUM_RECORD_DESERIALIZED() {
         if (statsLogger != null) {
-            statsLogger_numRecordDeserialized.add(1);
+            statsLoggerNumRecordDeserialized.add(1);
         }
     }
 
     public void register_TOTAL_EXECUTION_TIME(long latency) {
         if (statsLogger != null) {
-            statsLogger_totalExecutionTime.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
+            statsLoggerTotalExecutionTime.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
         }
     }
 
@@ -308,47 +309,47 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{
         if (statsLogger != null) {
             // register total entry dequeue wait time for query
             statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY)
-                    .registerSuccessfulEvent(ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum, TimeUnit.NANOSECONDS);
+                    .registerSuccessfulEvent(entryQueueDequeueWaitTimeSum, TimeUnit.NANOSECONDS);
 
             //register bytes read per query
             statsLogger.getOpStatsLogger(BYTES_READ_PER_QUERY)
-                    .registerSuccessfulValue(BYTES_READ_sum);
+                    .registerSuccessfulValue(bytesReadSum);
 
             // register total time spent deserializing entries for query
             statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME_PER_QUERY)
-                    .registerSuccessfulEvent(ENTRY_DESERIALIZE_TIME_sum, TimeUnit.NANOSECONDS);
+                    .registerSuccessfulEvent(entryDeserializeTimeSum, TimeUnit.NANOSECONDS);
 
             // register time spent waiting for message queue enqueue because message queue is full per query
             statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_PER_QUERY)
-                    .registerSuccessfulEvent(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum, TimeUnit.NANOSECONDS);
+                    .registerSuccessfulEvent(messageQueueEnqueueWaitTimeSum, TimeUnit.NANOSECONDS);
 
             // register number of messages deserialized per query
             statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_QUERY)
-                    .registerSuccessfulValue(NUM_MESSAGES_DERSERIALIZED_sum);
+                    .registerSuccessfulValue(numMessagesDerserializedSum);
 
             // register number of read attempts per query
             statsLogger.getOpStatsLogger(READ_ATTEMTPS_PER_QUERY)
-                    .registerSuccessfulValue(READ_ATTEMTPS_SUCCESS_sum);
+                    .registerSuccessfulValue(readAttemptsSuccessSum);
             statsLogger.getOpStatsLogger(READ_ATTEMTPS_PER_QUERY)
-                    .registerFailedValue(READ_ATTEMTPS_FAIL_sum);
+                    .registerFailedValue(readAttemptsFailSum);
 
             // register total read latency for query
             statsLogger.getOpStatsLogger(READ_LATENCY_PER_QUERY)
-                    .registerSuccessfulEvent(READ_LATENCY_SUCCESS_sum, TimeUnit.NANOSECONDS);
+                    .registerSuccessfulEvent(readLatencySuccessSum, TimeUnit.NANOSECONDS);
             statsLogger.getOpStatsLogger(READ_LATENCY_PER_QUERY)
-                    .registerFailedEvent(READ_LATENCY_FAIL_sum, TimeUnit.NANOSECONDS);
+                    .registerFailedEvent(readLatencyFailSum, TimeUnit.NANOSECONDS);
 
             // register number of entries per query
             statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_QUERY)
-                    .registerSuccessfulValue(NUM_ENTRIES_PER_BATCH_sum);
+                    .registerSuccessfulValue(numEntriesPerBatchSum);
 
             // register time spent waiting to read for message queue per query
             statsLogger.getOpStatsLogger(MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY)
-                    .registerSuccessfulEvent(MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum, TimeUnit.MILLISECONDS);
+                    .registerSuccessfulEvent(messageQueueDequeueWaitTimeSum, TimeUnit.MILLISECONDS);
 
             // register time spent deserializing records per query
             statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME_PER_QUERY)
-                    .registerSuccessfulEvent(RECORD_DESERIALIZE_TIME_sum, TimeUnit.NANOSECONDS);
+                    .registerSuccessfulEvent(recordDeserializeTimeSum, TimeUnit.NANOSECONDS);
         }
     }
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java
index 10dcd57..3d42ee0 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java
@@ -18,6 +18,11 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
+import static io.airlift.configuration.ConfigBinder.configBinder;
+import static io.airlift.json.JsonBinder.jsonBinder;
+import static java.util.Objects.requireNonNull;
+
 import com.facebook.presto.spi.type.Type;
 import com.facebook.presto.spi.type.TypeManager;
 import com.fasterxml.jackson.databind.DeserializationContext;
@@ -25,14 +30,11 @@ import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
 import com.google.inject.Binder;
 import com.google.inject.Module;
 import com.google.inject.Scopes;
-
 import javax.inject.Inject;
 
-import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
-import static io.airlift.configuration.ConfigBinder.configBinder;
-import static io.airlift.json.JsonBinder.jsonBinder;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * This class defines binding of classes in the Presto connector.
+ */
 public class PulsarConnectorModule implements Module {
 
     private final String connectorId;
@@ -56,9 +58,11 @@ public class PulsarConnectorModule implements Module {
         configBinder(binder).bindConfig(PulsarConnectorConfig.class);
 
         jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
-
     }
 
+    /**
+     * A wrapper to deserialize the Presto types.
+     */
     public static final class TypeDeserializer
             extends FromStringDeserializer<Type> {
         private static final long serialVersionUID = 1L;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
index b14cb87..395e470 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
@@ -18,17 +18,18 @@
  */
 package org.apache.pulsar.sql.presto;
 
-import org.apache.avro.Schema;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.common.naming.TopicName;
-
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Map;
 import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.TopicName;
 
+/**
+ * A helper class containing repeatable logic used in the other classes.
+ */
 public class PulsarConnectorUtils {
 
     public static Schema parseSchema(String schemaJson) {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarHandleResolver.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarHandleResolver.java
index 29c22ad..45a0bed 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarHandleResolver.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarHandleResolver.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorHandleResolver;
 import com.facebook.presto.spi.ConnectorSplit;
@@ -25,9 +28,9 @@ import com.facebook.presto.spi.ConnectorTableHandle;
 import com.facebook.presto.spi.ConnectorTableLayoutHandle;
 import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * This class helps to resolve classes for the Presto connector.
+ */
 public class PulsarHandleResolver implements ConnectorHandleResolver {
     @Override
     public Class<? extends ConnectorTableHandle> getTableHandleClass() {
@@ -57,8 +60,8 @@ public class PulsarHandleResolver implements ConnectorHandleResolver {
 
     static PulsarColumnHandle convertColumnHandle(ColumnHandle columnHandle) {
         requireNonNull(columnHandle, "columnHandle is null");
-        checkArgument(columnHandle instanceof PulsarColumnHandle, "columnHandle is not an instance of " +
-                "PulsarColumnHandle");
+        checkArgument(columnHandle instanceof PulsarColumnHandle, "columnHandle is not an instance of "
+            + "PulsarColumnHandle");
         return (PulsarColumnHandle) columnHandle;
     }
 
@@ -70,8 +73,8 @@ public class PulsarHandleResolver implements ConnectorHandleResolver {
 
     static PulsarTableLayoutHandle convertLayout(ConnectorTableLayoutHandle layout) {
         requireNonNull(layout, "layout is null");
-        checkArgument(layout instanceof PulsarTableLayoutHandle, "layout is not an instance of " +
-                "PulsarTableLayoutHandle");
+        checkArgument(layout instanceof PulsarTableLayoutHandle, "layout is not an instance of "
+            + "PulsarTableLayoutHandle");
         return (PulsarTableLayoutHandle) layout;
     }
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
index c18a0e2..f511f8a 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
@@ -18,27 +18,31 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.Objects.requireNonNull;
+
 import com.facebook.presto.spi.type.BigintType;
 import com.facebook.presto.spi.type.TimestampType;
-import com.facebook.presto.spi.type.TimestampWithTimeZoneType;
 import com.facebook.presto.spi.type.Type;
 import com.facebook.presto.spi.type.VarcharType;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import org.apache.pulsar.common.api.raw.RawMessage;
-
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Consumer;
+import org.apache.pulsar.common.api.raw.RawMessage;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * This abstract class represents internal columns.
+ */
 public abstract class PulsarInternalColumn {
 
+    /**
+     * Internal column representing the event time.
+     */
     public static class EventTimeColumn extends PulsarInternalColumn {
 
         EventTimeColumn(String name, Type type, String comment) {
@@ -51,6 +55,9 @@ public abstract class PulsarInternalColumn {
         }
     }
 
+    /**
+     * Internal column representing the publish time.
+     */
     public static class PublishTimeColumn extends PulsarInternalColumn {
 
         PublishTimeColumn(String name, Type type, String comment) {
@@ -63,6 +70,9 @@ public abstract class PulsarInternalColumn {
         }
     }
 
+    /**
+     * Internal column representing the message id.
+     */
     public static class MessageIdColumn extends PulsarInternalColumn {
 
         MessageIdColumn(String name, Type type, String comment) {
@@ -75,6 +85,9 @@ public abstract class PulsarInternalColumn {
         }
     }
 
+    /**
+     * Internal column representing the sequence id.
+     */
     public static class SequenceIdColumn extends PulsarInternalColumn {
 
         SequenceIdColumn(String name, Type type, String comment) {
@@ -87,6 +100,9 @@ public abstract class PulsarInternalColumn {
         }
     }
 
+    /**
+     * Internal column representing the producer name.
+     */
     public static class ProducerNameColumn extends PulsarInternalColumn {
 
         ProducerNameColumn(String name, Type type, String comment) {
@@ -99,6 +115,9 @@ public abstract class PulsarInternalColumn {
         }
     }
 
+    /**
+     * Internal column representing the key.
+     */
     public static class KeyColumn extends PulsarInternalColumn {
 
         KeyColumn(String name, Type type, String comment) {
@@ -111,9 +130,11 @@ public abstract class PulsarInternalColumn {
         }
     }
 
+    /**
+     * Internal column representing the message properties.
+     */
     public static class PropertiesColumn extends PulsarInternalColumn {
 
-
         private static final ObjectMapper mapper = new ObjectMapper();
 
         PropertiesColumn(String name, Type type, String comment) {
@@ -145,8 +166,8 @@ public abstract class PulsarInternalColumn {
     public static final PulsarInternalColumn PRODUCER_NAME = new ProducerNameColumn("__producer_name__", VarcharType
             .VARCHAR, "The name of the producer that publish the message used to generate this row");
 
-    public static final PulsarInternalColumn KEY = new KeyColumn("__key__", VarcharType.VARCHAR, "The partition key " +
-            "for the topic");
+    public static final PulsarInternalColumn KEY = new KeyColumn("__key__", VarcharType.VARCHAR, "The partition key "
+        + "for the topic");
 
     public static final PulsarInternalColumn PROPERTIES = new PropertiesColumn("__properties__", VarcharType.VARCHAR,
             "User defined properties");
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index a431982..360baaf 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -18,6 +18,18 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
+import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
+import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
+import static com.facebook.presto.spi.type.DateType.DATE;
+import static com.facebook.presto.spi.type.TimeType.TIME;
+import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
+import static java.util.Objects.requireNonNull;
+import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
+import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded;
+import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
+import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
+
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ColumnMetadata;
 import com.facebook.presto.spi.ConnectorSession;
@@ -49,6 +61,16 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import io.airlift.log.Logger;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.Stack;
+import java.util.stream.Collectors;
+import javax.inject.Inject;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
@@ -62,30 +84,9 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
-import javax.inject.Inject;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.Stack;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
-import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
-import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
-import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
-import static com.facebook.presto.spi.type.DateType.DATE;
-import static com.facebook.presto.spi.type.TimeType.TIME;
-import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
-import static java.util.Objects.requireNonNull;
-import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
-import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
-import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded;
-import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
-
+/**
+ * This connector helps to work with metadata.
+ */
 public class PulsarMetadata implements ConnectorMetadata {
 
     private final String connectorId;
@@ -114,7 +115,7 @@ public class PulsarMetadata implements ConnectorMetadata {
             List<String> tenants = pulsarAdmin.tenants().getTenants();
             for (String tenant : tenants) {
                 prestoSchemas.addAll(pulsarAdmin.namespaces().getNamespaces(tenant).stream().map(namespace ->
-                        rewriteNamespaceDelimiterIfNeeded(namespace, pulsarConnectorConfig)).collect(Collectors.toList()));
+                    rewriteNamespaceDelimiterIfNeeded(namespace, pulsarConnectorConfig)).collect(Collectors.toList()));
             }
         } catch (PulsarAdminException e) {
             if (e.getStatusCode() == 401) {
@@ -141,7 +142,8 @@ public class PulsarMetadata implements ConnectorMetadata {
                                                             Optional<Set<ColumnHandle>> desiredColumns) {
 
         PulsarTableHandle handle = convertTableHandle(table);
-        ConnectorTableLayout layout = new ConnectorTableLayout(new PulsarTableLayoutHandle(handle, constraint.getSummary()));
+        ConnectorTableLayout layout = new ConnectorTableLayout(
+            new PulsarTableLayoutHandle(handle, constraint.getSummary()));
         return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
     }
 
@@ -173,7 +175,8 @@ public class PulsarMetadata implements ConnectorMetadata {
             } else {
                 List<String> pulsarTopicList = null;
                 try {
-                    pulsarTopicList = this.pulsarAdmin.topics().getList(restoreNamespaceDelimiterIfNeeded(schemaNameOrNull, pulsarConnectorConfig));
+                    pulsarTopicList = this.pulsarAdmin.topics()
+                        .getList(restoreNamespaceDelimiterIfNeeded(schemaNameOrNull, pulsarConnectorConfig));
                 } catch (PulsarAdminException e) {
                     if (e.getStatusCode() == 404) {
                         log.warn("Schema " + schemaNameOrNull + " does not exsit");
@@ -465,9 +468,9 @@ public class PulsarMetadata implements ConnectorMetadata {
                         canBeNull = true;
                     }
                 } else {
-                    List<PulsarColumnMetadata> columns = getColumns(fieldName, type, fieldTypes, fieldNames, positionIndices);
+                    List<PulsarColumnMetadata> columns = getColumns(fieldName, type, fieldTypes, fieldNames,
+                        positionIndices);
                     columnMetadataList.addAll(columns);
-
                 }
             }
         } else if (fieldSchema.getType() == Schema.Type.RECORD) {
@@ -484,7 +487,8 @@ public class PulsarMetadata implements ConnectorMetadata {
                     if (fieldName == null) {
                         columns = getColumns(field.name(), field.schema(), fieldTypes, fieldNames, positionIndices);
                     } else {
-                        columns = getColumns(String.format("%s.%s", fieldName, field.name()), field.schema(), fieldTypes, fieldNames, positionIndices);
+                        columns = getColumns(String.format("%s.%s", fieldName, field.name()), field.schema(),
+                            fieldTypes, fieldNames, positionIndices);
 
                     }
                     positionIndices.pop();
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPlugin.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPlugin.java
index c9a94ae..850b3f5 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPlugin.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPlugin.java
@@ -22,10 +22,12 @@ import com.facebook.presto.spi.Plugin;
 import com.facebook.presto.spi.connector.ConnectorFactory;
 import com.google.common.collect.ImmutableList;
 
+/**
+ * Implementation of the Pulsar plugin for Pesto.
+ */
 public class PulsarPlugin implements Plugin {
     @Override
-    public Iterable<ConnectorFactory> getConnectorFactories()
-    {
+    public Iterable<ConnectorFactory> getConnectorFactories() {
         return ImmutableList.of(new PulsarConnectorFactory());
     }
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java
index 6e3324b..28980a9 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java
@@ -23,8 +23,6 @@ import io.netty.buffer.ByteBufUtil;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.Date;
-
-import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -62,4 +60,4 @@ public class PulsarPrimitiveSchemaHandler implements SchemaHandler {
     public Object extractField(int index, Object currentRecord) {
         return currentRecord;
     }
-} 
\ No newline at end of file
+}
\ No newline at end of file
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 9afc2cd..5cc6e8f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -37,16 +37,13 @@ import com.facebook.presto.spi.type.Type;
 import com.facebook.presto.spi.type.VarbinaryType;
 import com.facebook.presto.spi.type.VarcharType;
 import com.google.common.annotations.VisibleForTesting;
-
 import io.airlift.log.Logger;
 import io.airlift.slice.Slice;
 import io.airlift.slice.Slices;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -63,7 +60,9 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.jctools.queues.MessagePassingQueue;
 import org.jctools.queues.SpscArrayQueue;
 
-
+/**
+ * Implementation of a cursor to read records.
+ */
 public class PulsarRecordCursor implements RecordCursor {
 
     private List<PulsarColumnHandle> columnHandles;
@@ -116,15 +115,16 @@ public class PulsarRecordCursor implements RecordCursor {
 
     // Exposed for testing purposes
     PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig
-            pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig,
-                       PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
+        pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig,
+        PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
         this.splitSize = pulsarSplit.getSplitSize();
-        initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, managedLedgerConfig, pulsarConnectorMetricsTracker);
+        initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, managedLedgerConfig,
+            pulsarConnectorMetricsTracker);
     }
 
     private void initialize(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig
-            pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig,
-                            PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
+        pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig,
+        PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
         this.columnHandles = columnHandles;
         this.pulsarSplit = pulsarSplit;
         this.pulsarConnectorConfig = pulsarConnectorConfig;
@@ -147,7 +147,7 @@ public class PulsarRecordCursor implements RecordCursor {
 
         try {
             this.cursor = getCursor(TopicName.get("persistent", NamespaceName.get(pulsarSplit.getSchemaName()),
-                    pulsarSplit.getTableName()), pulsarSplit.getStartPosition(), managedLedgerFactory, managedLedgerConfig);
+                pulsarSplit.getTableName()), pulsarSplit.getStartPosition(), managedLedgerFactory, managedLedgerConfig);
         } catch (ManagedLedgerException | InterruptedException e) {
             log.error(e, "Failed to get read only cursor");
             close();
@@ -298,11 +298,13 @@ public class PulsarRecordCursor implements RecordCursor {
                         ReadOnlyCursorImpl readOnlyCursorImpl = ((ReadOnlyCursorImpl) cursor);
                         // check if ledger is offloaded
                         if (!readOffloaded  && readOnlyCursorImpl.getCurrentLedgerInfo().hasOffloadContext()) {
-                            log.warn("Ledger %s is offloaded for topic %s. Ignoring it because offloader is not configured",
-                                    readOnlyCursorImpl.getCurrentLedgerInfo().getLedgerId(), pulsarSplit.getTableName());
+                            log.warn(
+                                "Ledger %s is offloaded for topic %s. Ignoring it because offloader is not configured",
+                                readOnlyCursorImpl.getCurrentLedgerInfo().getLedgerId(), pulsarSplit.getTableName());
 
                             long numEntries = readOnlyCursorImpl.getCurrentLedgerInfo().getEntries();
-                            long entriesToSkip = (numEntries - ((PositionImpl) cursor.getReadPosition()).getEntryId()) + 1;
+                            long entriesToSkip =
+                                (numEntries - ((PositionImpl) cursor.getReadPosition()).getEntryId()) + 1;
                             cursor.skipEntries(Math.toIntExact((entriesToSkip)));
 
                             entriesProcessed += entriesToSkip;
@@ -337,13 +339,14 @@ public class PulsarRecordCursor implements RecordCursor {
             outstandingReadsRequests.incrementAndGet();
 
             //set read latency stats for success
-            metricsTracker.register_READ_LATENCY_PER_BATCH_SUCCESS(System.nanoTime() - (long)ctx);
+            metricsTracker.register_READ_LATENCY_PER_BATCH_SUCCESS(System.nanoTime() - (long) ctx);
             //stats for number of entries read
             metricsTracker.incr_NUM_ENTRIES_PER_BATCH_SUCCESS(entries.size());
         }
 
         public boolean hasFinished() {
-            return messageQueue.isEmpty() && isDone && outstandingReadsRequests.get() >=1 && splitSize <= entriesProcessed;
+            return messageQueue.isEmpty() && isDone && outstandingReadsRequests.get() >= 1
+                && splitSize <= entriesProcessed;
         }
 
         @Override
@@ -352,7 +355,7 @@ public class PulsarRecordCursor implements RecordCursor {
             outstandingReadsRequests.incrementAndGet();
 
             //set read latency stats for failed
-            metricsTracker.register_READ_LATENCY_PER_BATCH_FAIL(System.nanoTime() - (long)ctx);
+            metricsTracker.register_READ_LATENCY_PER_BATCH_FAIL(System.nanoTime() - (long) ctx);
             //stats for number of entries read failed
             metricsTracker.incr_NUM_ENTRIES_PER_BATCH_FAIL((long) maxBatchSize);
         }
@@ -375,7 +378,7 @@ public class PulsarRecordCursor implements RecordCursor {
             currentMessage = null;
         }
 
-        while(true) {
+        while (true) {
             if (readEntries.hasFinished()) {
                 return false;
             }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSet.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSet.java
index 17e7036..be94a1b 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSet.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSet.java
@@ -18,15 +18,17 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import static java.util.Objects.requireNonNull;
+
 import com.facebook.presto.spi.RecordCursor;
 import com.facebook.presto.spi.RecordSet;
 import com.facebook.presto.spi.type.Type;
 import com.google.common.collect.ImmutableList;
-
 import java.util.List;
 
-import static java.util.Objects.requireNonNull;
-
+/**
+ * Implementation of a record set.
+ */
 public class PulsarRecordSet implements RecordSet {
 
     private final List<PulsarColumnHandle> columnHandles;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSetProvider.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSetProvider.java
index 378013b..c49bae9 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSetProvider.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSetProvider.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import static java.util.Objects.requireNonNull;
+
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorSession;
 import com.facebook.presto.spi.ConnectorSplit;
@@ -25,12 +27,12 @@ import com.facebook.presto.spi.RecordSet;
 import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
 import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
 import com.google.common.collect.ImmutableList;
-
-import javax.inject.Inject;
 import java.util.List;
+import javax.inject.Inject;
 
-import static java.util.Objects.requireNonNull;
-
+/**
+ * Implementation of the provider for record sets.
+ */
 public class PulsarRecordSetProvider implements ConnectorRecordSetProvider {
 
     private final PulsarConnectorConfig pulsarConnectorConfig;
@@ -52,7 +54,6 @@ public class PulsarRecordSetProvider implements ConnectorRecordSetProvider {
             handles.add((PulsarColumnHandle) handle);
         }
 
-
         return new PulsarRecordSet(pulsarSplit, handles.build(), this.pulsarConnectorConfig);
     }
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
index 2fdccc4..eeebbd1 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import static java.util.Objects.requireNonNull;
+
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorSplit;
 import com.facebook.presto.spi.HostAddress;
@@ -25,15 +27,15 @@ import com.facebook.presto.spi.predicate.TupleDomain;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Map;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
-import java.util.List;
-import java.util.Map;
-
-import static java.util.Objects.requireNonNull;
-
+/**
+ * This class represents information for a split.
+ */
 public class PulsarSplit implements ConnectorSplit {
 
     private final long splitId;
@@ -176,19 +178,19 @@ public class PulsarSplit implements ConnectorSplit {
 
     @Override
     public String toString() {
-        return "PulsarSplit{" +
-                "splitId=" + splitId +
-                ", connectorId='" + connectorId + '\'' +
-                ", schemaName='" + schemaName + '\'' +
-                ", tableName='" + tableName + '\'' +
-                ", splitSize=" + splitSize +
-                ", schema='" + schema + '\'' +
-                ", schemaType=" + schemaType +
-                ", startPositionEntryId=" + startPositionEntryId +
-                ", endPositionEntryId=" + endPositionEntryId +
-                ", startPositionLedgerId=" + startPositionLedgerId +
-                ", endPositionLedgerId=" + endPositionLedgerId +
-                '}';
+        return "PulsarSplit{"
+            + "splitId=" + splitId
+            + ", connectorId='" + connectorId + '\''
+            + ", schemaName='" + schemaName + '\''
+            + ", tableName='" + tableName + '\''
+            + ", splitSize=" + splitSize
+            + ", schema='" + schema + '\''
+            + ", schemaType=" + schemaType
+            + ", startPositionEntryId=" + startPositionEntryId
+            + ", endPositionEntryId=" + endPositionEntryId
+            + ", startPositionLedgerId=" + startPositionLedgerId
+            + ", endPositionLedgerId=" + endPositionLedgerId
+            + '}';
     }
 
     public SchemaInfo getSchemaInfo() {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index 77b8f11..6da5df4 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -18,6 +18,12 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+import static org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries;
+import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
+
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorSession;
 import com.facebook.presto.spi.ConnectorSplitSource;
@@ -30,14 +36,20 @@ import com.facebook.presto.spi.predicate.Domain;
 import com.facebook.presto.spi.predicate.Range;
 import com.facebook.presto.spi.predicate.TupleDomain;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
 import io.airlift.log.Logger;
+import java.sql.Timestamp;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import javax.inject.Inject;
 import lombok.Data;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ReadOnlyCursor;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -47,22 +59,10 @@ import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
-import com.google.common.base.Predicate;
-import org.apache.bookkeeper.conf.ClientConfiguration;
-
-import javax.inject.Inject;
-import java.sql.Timestamp;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-import static org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries;
-import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
 
+/**
+ * The class helping to manage splits.
+ */
 public class PulsarSplitManager implements ConnectorSplitManager {
 
     private final String connectorId;
@@ -146,7 +146,7 @@ public class PulsarSplitManager implements ConnectorSplitManager {
             }
 
             throw new RuntimeException("Failed to get metadata for partitioned topic "
-                    + topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(),e);
+                + topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
         }
 
         int actualNumSplits = Math.max(numPartitions, numSplits);
@@ -342,15 +342,15 @@ public class PulsarSplitManager implements ConnectorSplitManager {
                             // Just use a close bound since presto can always filter out the extra entries even if
                             // the bound
                             // should be open or a mixture of open and closed
-                            com.google.common.collect.Range<PositionImpl> posRange
-                                    = com.google.common.collect.Range.range(overallStartPos,
+                            com.google.common.collect.Range<PositionImpl> posRange =
+                                com.google.common.collect.Range.range(overallStartPos,
                                     com.google.common.collect.BoundType.CLOSED,
                                     overallEndPos, com.google.common.collect.BoundType.CLOSED);
 
                             long numOfEntries = readOnlyCursor.getNumberOfEntries(posRange) - 1;
 
-                            PredicatePushdownInfo predicatePushdownInfo
-                                    = new PredicatePushdownInfo(overallStartPos, overallEndPos, numOfEntries);
+                            PredicatePushdownInfo predicatePushdownInfo =
+                                new PredicatePushdownInfo(overallStartPos, overallEndPos, numOfEntries);
                             log.debug("Predicate pushdown optimization calculated: %s", predicatePushdownInfo);
                             return predicatePushdownInfo;
                         }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableHandle.java
index 6e8ba90..2ffcfde 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableHandle.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableHandle.java
@@ -18,20 +18,22 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
 import com.facebook.presto.spi.ConnectorTableHandle;
 import com.facebook.presto.spi.SchemaTableName;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-
 import java.util.Objects;
 
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * Description of basic metadata of a table.
+ */
 public class PulsarTableHandle implements ConnectorTableHandle {
 
     /**
-     * connector id
+     * Connector id.
      */
     private final String connectorId;
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java
index a7c240b..257c2a2 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java
@@ -18,16 +18,18 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import static java.util.Objects.requireNonNull;
+
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorTableLayoutHandle;
 import com.facebook.presto.spi.predicate.TupleDomain;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-
 import java.util.Objects;
 
-import static java.util.Objects.requireNonNull;
-
+/**
+ * This class handles the table layout.
+ */
 public class PulsarTableLayoutHandle implements ConnectorTableLayoutHandle {
     private final PulsarTableHandle table;
     private final TupleDomain<ColumnHandle> tupleDomain;
@@ -46,14 +48,12 @@ public class PulsarTableLayoutHandle implements ConnectorTableLayoutHandle {
     }
 
     @JsonProperty
-    public TupleDomain<ColumnHandle> getTupleDomain()
-    {
+    public TupleDomain<ColumnHandle> getTupleDomain() {
         return tupleDomain;
     }
 
     @Override
-    public boolean equals(Object o)
-    {
+    public boolean equals(Object o) {
         if (this == o) {
             return true;
         }
@@ -61,19 +61,17 @@ public class PulsarTableLayoutHandle implements ConnectorTableLayoutHandle {
             return false;
         }
         PulsarTableLayoutHandle that = (PulsarTableLayoutHandle) o;
-        return Objects.equals(table, that.table) &&
-                Objects.equals(tupleDomain, that.tupleDomain);
+        return Objects.equals(table, that.table)
+            && Objects.equals(tupleDomain, that.tupleDomain);
     }
 
     @Override
-    public int hashCode()
-    {
+    public int hashCode() {
         return Objects.hash(table, tupleDomain);
     }
 
     @Override
-    public String toString()
-    {
+    public String toString() {
         return table.toString();
     }
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTopicDescription.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTopicDescription.java
index ae163ef..1658fee 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTopicDescription.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTopicDescription.java
@@ -18,14 +18,17 @@
  */
 package org.apache.pulsar.sql.presto;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
 import static com.google.common.base.MoreObjects.toStringHelper;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.util.Objects.requireNonNull;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Represents the basic information about a pulsar topic.
+ */
 public class PulsarTopicDescription {
     private final String tableName;
     private final String topicName;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java
index 52282b2..2d1e104 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.sql.presto;
 
 import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
 
+/**
+ * A handle for transactions.
+ */
 public enum PulsarTransactionHandle implements ConnectorTransactionHandle {
     INSTANCE
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
index 1e02d2ba..37fce04 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.sql.presto;
 
 import io.netty.buffer.ByteBuf;
 
+/**
+ * This interface defines the methods to work with schemas.
+ */
 public interface SchemaHandler {
 
     Object deserialize(ByteBuf payload);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/package-info.java
similarity index 80%
copy from pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java
copy to pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/package-info.java
index 52282b2..7c6649e 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/package-info.java
@@ -16,10 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.sql.presto;
-
-import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
-
-public enum PulsarTransactionHandle implements ConnectorTransactionHandle {
-    INSTANCE
-}
+/**
+ * Implementation of the connector to the Presto engine.
+ */
+package org.apache.pulsar.sql.presto;
\ No newline at end of file