You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/05 06:46:42 UTC
[pulsar] branch master updated: Enforce checkstyle in the pulsar
sql module (#4882)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f6fee1c Enforce checkstyle in the pulsar sql module (#4882)
f6fee1c is described below
commit f6fee1c6b7976c439215876b6d892591d198a26b
Author: Sergii Zhevzhyk <vz...@users.noreply.github.com>
AuthorDate: Mon Aug 5 08:46:37 2019 +0200
Enforce checkstyle in the pulsar sql module (#4882)
### Modifications
The checksyle plugin was added to the pulsar sql module to enforce the defined style. All violations were fixed:
- Ordering of imports.
- Formatting of the code.
- Absent Javadoc comments.
- Other small issues.
---
pulsar-sql/pom.xml | 22 ++
.../pulsar/sql/presto/AvroSchemaHandler.java | 15 +-
.../pulsar/sql/presto/JSONSchemaHandler.java | 14 +-
.../pulsar/sql/presto/PulsarColumnHandle.java | 62 ++++--
.../pulsar/sql/presto/PulsarColumnMetadata.java | 41 ++--
.../apache/pulsar/sql/presto/PulsarConnector.java | 12 +-
.../pulsar/sql/presto/PulsarConnectorCache.java | 35 ++--
.../pulsar/sql/presto/PulsarConnectorConfig.java | 32 +--
.../pulsar/sql/presto/PulsarConnectorFactory.java | 10 +-
.../pulsar/sql/presto/PulsarConnectorId.java | 11 +-
.../sql/presto/PulsarConnectorMetricsTracker.java | 229 +++++++++++----------
.../pulsar/sql/presto/PulsarConnectorModule.java | 18 +-
.../pulsar/sql/presto/PulsarConnectorUtils.java | 13 +-
.../pulsar/sql/presto/PulsarHandleResolver.java | 17 +-
.../pulsar/sql/presto/PulsarInternalColumn.java | 41 +++-
.../apache/pulsar/sql/presto/PulsarMetadata.java | 64 +++---
.../org/apache/pulsar/sql/presto/PulsarPlugin.java | 6 +-
.../sql/presto/PulsarPrimitiveSchemaHandler.java | 4 +-
.../pulsar/sql/presto/PulsarRecordCursor.java | 37 ++--
.../apache/pulsar/sql/presto/PulsarRecordSet.java | 8 +-
.../pulsar/sql/presto/PulsarRecordSetProvider.java | 11 +-
.../org/apache/pulsar/sql/presto/PulsarSplit.java | 38 ++--
.../pulsar/sql/presto/PulsarSplitManager.java | 42 ++--
.../pulsar/sql/presto/PulsarTableHandle.java | 12 +-
.../pulsar/sql/presto/PulsarTableLayoutHandle.java | 24 +--
.../pulsar/sql/presto/PulsarTopicDescription.java | 9 +-
.../pulsar/sql/presto/PulsarTransactionHandle.java | 3 +
.../apache/pulsar/sql/presto/SchemaHandler.java | 3 +
...sarTransactionHandle.java => package-info.java} | 11 +-
29 files changed, 481 insertions(+), 363 deletions(-)
diff --git a/pulsar-sql/pom.xml b/pulsar-sql/pom.xml
index 1f7a4bb..642fe59 100644
--- a/pulsar-sql/pom.xml
+++ b/pulsar-sql/pom.xml
@@ -114,4 +114,26 @@
</dependencies>
</dependencyManagement>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>check-style</id>
+ <phase>verify</phase>
+ <configuration>
+ <configLocation>../buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation>
+ <suppressionsLocation>../buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation>
+ <encoding>UTF-8</encoding>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
index 5d55682..f6807a2 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
@@ -19,11 +19,12 @@
package org.apache.pulsar.sql.presto;
import io.airlift.log.Logger;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
+import java.io.IOException;
+import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
@@ -31,9 +32,9 @@ import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
-import java.io.IOException;
-import java.util.List;
-
+/**
+ * Schema handler for payload in the Avro format.
+ */
public class AvroSchemaHandler implements SchemaHandler {
private final DatumReader<GenericRecord> datumReader;
@@ -64,7 +65,7 @@ public class AvroSchemaHandler implements SchemaHandler {
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(heapBuffer.array(), heapBuffer.arrayOffset(),
heapBuffer.readableBytes(), decoderFromCache);
- if (decoderFromCache==null) {
+ if (decoderFromCache == null) {
decoders.set(decoder);
}
return this.datumReader.read(null, decoder);
@@ -87,7 +88,7 @@ public class AvroSchemaHandler implements SchemaHandler {
return null;
}
if (positionIndices.length > 0) {
- for (int i = 1 ; i < positionIndices.length; i++) {
+ for (int i = 1; i < positionIndices.length; i++) {
curr = ((GenericRecord) curr).get(positionIndices[i]);
if (curr == null) {
return null;
@@ -96,7 +97,7 @@ public class AvroSchemaHandler implements SchemaHandler {
}
return curr;
} catch (Exception ex) {
- log.debug(ex,"%s", ex);
+ log.debug(ex, "%s", ex);
}
return null;
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
index 5a12d30..8649e41 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
@@ -20,17 +20,17 @@ package org.apache.pulsar.sql.presto;
import com.dslplatform.json.DslJson;
import com.facebook.presto.spi.type.Type;
-
import io.airlift.log.Logger;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
-import io.netty.buffer.ByteBuf;
-import io.netty.util.concurrent.FastThreadLocal;
-
+/**
+ * Schema handler for payload in the JSON format.
+ */
public class JSONSchemaHandler implements SchemaHandler {
private static final Logger log = Logger.get(JSONSchemaHandler.class);
@@ -84,7 +84,7 @@ public class JSONSchemaHandler implements SchemaHandler {
if (field == null) {
return null;
}
- for (int i = 1; i < fieldNames.length ; i++) {
+ for (int i = 1; i < fieldNames.length; i++) {
field = ((Map) field).get(fieldNames[i]);
if (field == null) {
return null;
@@ -101,7 +101,7 @@ public class JSONSchemaHandler implements SchemaHandler {
return field;
} catch (Exception ex) {
- log.debug(ex,"%s", ex);
+ log.debug(ex, "%s", ex);
}
return null;
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
index f98e864..2a1bd43 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
@@ -18,27 +18,29 @@
*/
package org.apache.pulsar.sql.presto;
+import static java.util.Objects.requireNonNull;
+
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.type.Type;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-
import java.util.Arrays;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * This class represents the basic information about a presto column.
+ */
public class PulsarColumnHandle implements ColumnHandle {
private final String connectorId;
/**
- * Column Name
+ * Column Name.
*/
private final String name;
/**
- * Column type
+ * Column type.
*/
private final Type type;
@@ -116,17 +118,33 @@ public class PulsarColumnHandle implements ColumnHandle {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
PulsarColumnHandle that = (PulsarColumnHandle) o;
- if (hidden != that.hidden) return false;
- if (internal != that.internal) return false;
- if (connectorId != null ? !connectorId.equals(that.connectorId) : that.connectorId != null) return false;
- if (name != null ? !name.equals(that.name) : that.name != null) return false;
- if (type != null ? !type.equals(that.type) : that.type != null) return false;
- if (!Arrays.deepEquals(fieldNames, that.fieldNames)) return false;
+ if (hidden != that.hidden) {
+ return false;
+ }
+ if (internal != that.internal) {
+ return false;
+ }
+ if (connectorId != null ? !connectorId.equals(that.connectorId) : that.connectorId != null) {
+ return false;
+ }
+ if (name != null ? !name.equals(that.name) : that.name != null) {
+ return false;
+ }
+ if (type != null ? !type.equals(that.type) : that.type != null) {
+ return false;
+ }
+ if (!Arrays.deepEquals(fieldNames, that.fieldNames)) {
+ return false;
+ }
return Arrays.deepEquals(positionIndices, that.positionIndices);
}
@@ -144,14 +162,14 @@ public class PulsarColumnHandle implements ColumnHandle {
@Override
public String toString() {
- return "PulsarColumnHandle{" +
- "connectorId='" + connectorId + '\'' +
- ", name='" + name + '\'' +
- ", type=" + type +
- ", hidden=" + hidden +
- ", internal=" + internal +
- ", fieldNames=" + Arrays.toString(fieldNames) +
- ", positionIndices=" + Arrays.toString(positionIndices) +
- '}';
+ return "PulsarColumnHandle{"
+ + "connectorId='" + connectorId + '\''
+ + ", name='" + name + '\''
+ + ", type=" + type
+ + ", hidden=" + hidden
+ + ", internal=" + internal
+ + ", fieldNames=" + Arrays.toString(fieldNames)
+ + ", positionIndices=" + Arrays.toString(positionIndices)
+ + '}';
}
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
index 9a484ba..5b033fa 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
@@ -20,10 +20,11 @@ package org.apache.pulsar.sql.presto;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.type.Type;
-
import java.util.Arrays;
-import java.util.List;
+/**
+ * Description of the column metadata.
+ */
public class PulsarColumnMetadata extends ColumnMetadata {
private boolean isInternal;
@@ -60,25 +61,37 @@ public class PulsarColumnMetadata extends ColumnMetadata {
@Override
public String toString() {
- return "PulsarColumnMetadata{" +
- "isInternal=" + isInternal +
- ", nameWithCase='" + nameWithCase + '\'' +
- ", fieldNames=" + Arrays.toString(fieldNames) +
- ", positionIndices=" + Arrays.toString(positionIndices) +
- '}';
+ return "PulsarColumnMetadata{"
+ + "isInternal=" + isInternal
+ + ", nameWithCase='" + nameWithCase + '\''
+ + ", fieldNames=" + Arrays.toString(fieldNames)
+ + ", positionIndices=" + Arrays.toString(positionIndices)
+ + '}';
}
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
PulsarColumnMetadata that = (PulsarColumnMetadata) o;
- if (isInternal != that.isInternal) return false;
- if (nameWithCase != null ? !nameWithCase.equals(that.nameWithCase) : that.nameWithCase != null) return false;
- if (!Arrays.deepEquals(fieldNames, that.fieldNames)) return false;
+ if (isInternal != that.isInternal) {
+ return false;
+ }
+ if (nameWithCase != null ? !nameWithCase.equals(that.nameWithCase) : that.nameWithCase != null) {
+ return false;
+ }
+ if (!Arrays.deepEquals(fieldNames, that.fieldNames)) {
+ return false;
+ }
return Arrays.deepEquals(positionIndices, that.positionIndices);
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
index 1d89b51..f73c5b0 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
@@ -18,6 +18,10 @@
*/
package org.apache.pulsar.sql.presto;
+import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
@@ -26,13 +30,11 @@ import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.transaction.IsolationLevel;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.log.Logger;
-
import javax.inject.Inject;
-import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
-import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * This file contains implementation of the connector to the Presto engine.
+ */
public class PulsarConnector implements Connector {
private static final Logger log = Logger.get(PulsarConnector.class);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index 5478984..a42c9fe 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -18,8 +18,13 @@
*/
package org.apache.pulsar.sql.presto;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
+import java.io.IOException;
+import java.util.Map;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
@@ -34,13 +39,9 @@ import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
-import java.io.IOException;
-import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.annotations.VisibleForTesting;
-
+/**
+ * Implementation of a cache for the Pulsar connector.
+ */
public class PulsarConnectorCache {
private static final Logger log = Logger.get(PulsarConnectorCache.class);
@@ -68,7 +69,7 @@ public class PulsarConnectorCache {
// start stats provider
ClientConfiguration clientConfiguration = new ClientConfiguration();
- pulsarConnectorConfig.getStatsProviderConfigs().forEach((key, value) -> clientConfiguration.setProperty(key, value));
+ pulsarConnectorConfig.getStatsProviderConfigs().forEach(clientConfiguration::setProperty);
this.statsProvider.start(clientConfiguration);
@@ -84,7 +85,8 @@ public class PulsarConnectorCache {
return instance;
}
- private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
+ private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig)
+ throws Exception {
ClientConfiguration bkClientConfiguration = new ClientConfiguration()
.setZkServers(pulsarConnectorConfig.getZookeeperUri())
.setClientTcpNoDelay(false)
@@ -123,16 +125,17 @@ public class PulsarConnectorCache {
Map<String, String> offloaderProperties = conf.getOffloaderProperties();
offloaderProperties.put(OFFLOADERS_DIRECTOR, conf.getOffloadersDirectory());
offloaderProperties.put(MANAGED_LEDGER_OFFLOAD_DRIVER, conf.getManagedLedgerOffloadDriver());
- offloaderProperties.put(MANAGED_LEDGER_OFFLOAD_MAX_THREADS, String.valueOf(conf.getManagedLedgerOffloadMaxThreads()));
+ offloaderProperties
+ .put(MANAGED_LEDGER_OFFLOAD_MAX_THREADS, String.valueOf(conf.getManagedLedgerOffloadMaxThreads()));
try {
return offloaderFactory.create(
- PulsarConnectorUtils.getProperties(offloaderProperties),
- ImmutableMap.of(
- LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
- LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
- ),
- getOffloaderScheduler(conf));
+ PulsarConnectorUtils.getProperties(offloaderProperties),
+ ImmutableMap.of(
+ LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
+ LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
+ ),
+ getOffloaderScheduler(conf));
} catch (IOException ioe) {
log.error("Failed to create offloader: ", ioe);
throw new RuntimeException(ioe.getMessage(), ioe.getCause());
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index edfa97d..b53c589 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -20,20 +20,21 @@ package org.apache.pulsar.sql.presto;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airlift.configuration.Config;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
-import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.protocol.Commands;
-import javax.validation.constraints.NotNull;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.regex.Matcher;
-
+/**
+ * This object handles configuration of the Pulsar connector for the Presto engine.
+ */
public class PulsarConnectorConfig implements AutoCloseable {
private String brokerServiceUrl = "http://localhost:8080";
@@ -55,7 +56,7 @@ public class PulsarConnectorConfig implements AutoCloseable {
private boolean namespaceDelimiterRewriteEnable = false;
private String rewriteNamespaceDelimiter = "/";
- /**** --- Ledger Offloading --- ****/
+ // --- Ledger Offloading ---
private String managedLedgerOffloadDriver = null;
private int managedLedgerOffloadMaxThreads = 2;
private String offloadersDirectory = "./offloaders";
@@ -188,14 +189,15 @@ public class PulsarConnectorConfig implements AutoCloseable {
return this;
}
- /**** --- Ledger Offloading --- ****/
+ // --- Ledger Offloading ---
public int getManagedLedgerOffloadMaxThreads() {
return this.managedLedgerOffloadMaxThreads;
}
@Config("pulsar.managed-ledger-offload-max-threads")
- public PulsarConnectorConfig setManagedLedgerOffloadMaxThreads(int managedLedgerOffloadMaxThreads) throws IOException {
+ public PulsarConnectorConfig setManagedLedgerOffloadMaxThreads(int managedLedgerOffloadMaxThreads)
+ throws IOException {
this.managedLedgerOffloadMaxThreads = managedLedgerOffloadMaxThreads;
return this;
}
@@ -231,7 +233,7 @@ public class PulsarConnectorConfig implements AutoCloseable {
return this;
}
- /**** --- Authentication --- ****/
+ // --- Authentication ---
public String getAuthPlugin() {
return this.authPluginClassName;
@@ -316,8 +318,8 @@ public class PulsarConnectorConfig implements AutoCloseable {
@Override
public String toString() {
- return "PulsarConnectorConfig{" +
- "brokerServiceUrl='" + brokerServiceUrl + '\'' +
- '}';
+ return "PulsarConnectorConfig{"
+ + "brokerServiceUrl='" + brokerServiceUrl + '\''
+ + '}';
}
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java
index 0719e8e..48d1081 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.sql.presto;
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static java.util.Objects.requireNonNull;
+
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorContext;
@@ -26,12 +29,11 @@ import com.google.inject.Injector;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.json.JsonModule;
import io.airlift.log.Logger;
-
import java.util.Map;
-import static com.google.common.base.Throwables.throwIfUnchecked;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * The factory class which helps to build the presto connector.
+ */
public class PulsarConnectorFactory implements ConnectorFactory {
private static final Logger log = Logger.get(PulsarConnectorFactory.class);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorId.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorId.java
index 0ba4e1b..633bccb 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorId.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorId.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.sql.presto;
import static java.util.Objects.requireNonNull;
+/**
+ * Unique identifier of a connector.
+ */
public class PulsarConnectorId {
private final String id;
@@ -34,8 +37,12 @@ public class PulsarConnectorId {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
PulsarConnectorId that = (PulsarConnectorId) o;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java
index 34969d8..e3919d3 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java
@@ -18,21 +18,23 @@
*/
package org.apache.pulsar.sql.presto;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
-import java.util.concurrent.TimeUnit;
-
+/**
+ * This class helps to track metrics related to the connector.
+ */
public class PulsarConnectorMetricsTracker implements AutoCloseable{
private final StatsLogger statsLogger;
private static final String SCOPE = "split";
- /** metric names **/
+ // metric names
// time spend waiting to get entry from entry queue because it is empty
private static final String ENTRY_QUEUE_DEQUEUE_WAIT_TIME = "entry-queue-dequeue-wait-time";
@@ -70,7 +72,7 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{
public static final String READ_ATTEMPTS = "read-attempts";
// number of read attempts per query
- public static final String READ_ATTEMTPS_PER_QUERY= "read-attempts-per-query";
+ public static final String READ_ATTEMTPS_PER_QUERY = "read-attempts-per-query";
// latency of reads per batch
public static final String READ_LATENCY_PER_BATCH = "read-latency-per-batch";
@@ -97,209 +99,208 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{
private static final String TOTAL_EXECUTION_TIME = "total-execution-time";
- /** stats loggers **/
-
- private final OpStatsLogger statsLogger_entryQueueDequeueWaitTime;
- private final Counter statsLogger_bytesRead;
- private final OpStatsLogger statsLogger_entryDeserializetime;
- private final OpStatsLogger statsLogger_messageQueueEnqueueWaitTime;
- private final Counter statsLogger_numMessagesDeserialized;
- private final OpStatsLogger statsLogger_numMessagesDeserializedPerEntry;
- private final OpStatsLogger statsLogger_readAttempts;
- private final OpStatsLogger statsLogger_readLatencyPerBatch;
- private final OpStatsLogger statsLogger_numEntriesPerBatch;
- private final OpStatsLogger statsLogger_recordDeserializeTime;
- private final Counter statsLogger_numRecordDeserialized;
- private final OpStatsLogger statsLogger_totalExecutionTime;
-
- /** internal tracking variables **/
- private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime;
- private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum = 0L;
- private long BYTES_READ_sum = 0L;
- private long ENTRY_DESERIALIZE_TIME_startTime;
- private long ENTRY_DESERIALIZE_TIME_sum = 0L;
- private long MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime;
- private long MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum = 0L;
- private long NUM_MESSAGES_DERSERIALIZED_sum = 0L;
- private long NUM_MESSAGED_DERSERIALIZED_PER_BATCH = 0L;
- private long READ_ATTEMTPS_SUCCESS_sum = 0L;
- private long READ_ATTEMTPS_FAIL_sum = 0L;
- private long READ_LATENCY_SUCCESS_sum = 0L;
- private long READ_LATENCY_FAIL_sum = 0L;
- private long NUM_ENTRIES_PER_BATCH_sum = 0L;
- private long MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum = 0L;
- private long RECORD_DESERIALIZE_TIME_startTime;
- private long RECORD_DESERIALIZE_TIME_sum = 0L;
+ // stats loggers
+
+ private final OpStatsLogger statsLoggerEntryQueueDequeueWaitTime;
+ private final Counter statsLoggerBytesRead;
+ private final OpStatsLogger statsLoggerEntryDeserializeTime;
+ private final OpStatsLogger statsLoggerMessageQueueEnqueueWaitTime;
+ private final Counter statsLoggerNumMessagesDeserialized;
+ private final OpStatsLogger statsLoggerNumMessagesDeserializedPerEntry;
+ private final OpStatsLogger statsLoggerReadAttempts;
+ private final OpStatsLogger statsLoggerReadLatencyPerBatch;
+ private final OpStatsLogger statsLoggerNumEntriesPerBatch;
+ private final OpStatsLogger statsLoggerRecordDeserializeTime;
+ private final Counter statsLoggerNumRecordDeserialized;
+ private final OpStatsLogger statsLoggerTotalExecutionTime;
+
+ // internal tracking variables
+ private long entryQueueDequeueWaitTimeStartTime;
+ private long entryQueueDequeueWaitTimeSum = 0L;
+ private long bytesReadSum = 0L;
+ private long entryDeserializeTimeStartTime;
+ private long entryDeserializeTimeSum = 0L;
+ private long messageQueueEnqueueWaitTimeStartTime;
+ private long messageQueueEnqueueWaitTimeSum = 0L;
+ private long numMessagesDerserializedSum = 0L;
+ private long numMessagedDerserializedPerBatch = 0L;
+ private long readAttemptsSuccessSum = 0L;
+ private long readAttemptsFailSum = 0L;
+ private long readLatencySuccessSum = 0L;
+ private long readLatencyFailSum = 0L;
+ private long numEntriesPerBatchSum = 0L;
+ private long messageQueueDequeueWaitTimeSum = 0L;
+ private long recordDeserializeTimeStartTime;
+ private long recordDeserializeTimeSum = 0L;
public PulsarConnectorMetricsTracker(StatsProvider statsProvider) {
this.statsLogger = statsProvider instanceof NullStatsProvider
? null : statsProvider.getStatsLogger(SCOPE);
if (this.statsLogger != null) {
- statsLogger_entryQueueDequeueWaitTime = statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME);
- statsLogger_bytesRead = statsLogger.getCounter(BYTES_READ);
- statsLogger_entryDeserializetime = statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME);
- statsLogger_messageQueueEnqueueWaitTime = statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME);
- statsLogger_numMessagesDeserialized = statsLogger.getCounter(NUM_MESSAGES_DERSERIALIZED);
- statsLogger_numMessagesDeserializedPerEntry = statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_ENTRY);
- statsLogger_readAttempts = statsLogger.getOpStatsLogger(READ_ATTEMPTS);
- statsLogger_readLatencyPerBatch = statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH);
- statsLogger_numEntriesPerBatch = statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH);
- statsLogger_recordDeserializeTime = statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME);
- statsLogger_numRecordDeserialized = statsLogger.getCounter(NUM_RECORD_DESERIALIZED);
- statsLogger_totalExecutionTime = statsLogger.getOpStatsLogger(TOTAL_EXECUTION_TIME);
+ statsLoggerEntryQueueDequeueWaitTime = statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME);
+ statsLoggerBytesRead = statsLogger.getCounter(BYTES_READ);
+ statsLoggerEntryDeserializeTime = statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME);
+ statsLoggerMessageQueueEnqueueWaitTime = statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME);
+ statsLoggerNumMessagesDeserialized = statsLogger.getCounter(NUM_MESSAGES_DERSERIALIZED);
+ statsLoggerNumMessagesDeserializedPerEntry = statsLogger
+ .getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_ENTRY);
+ statsLoggerReadAttempts = statsLogger.getOpStatsLogger(READ_ATTEMPTS);
+ statsLoggerReadLatencyPerBatch = statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH);
+ statsLoggerNumEntriesPerBatch = statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH);
+ statsLoggerRecordDeserializeTime = statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME);
+ statsLoggerNumRecordDeserialized = statsLogger.getCounter(NUM_RECORD_DESERIALIZED);
+ statsLoggerTotalExecutionTime = statsLogger.getOpStatsLogger(TOTAL_EXECUTION_TIME);
} else {
- statsLogger_entryQueueDequeueWaitTime = null;
- statsLogger_bytesRead = null;
- statsLogger_entryDeserializetime = null;
- statsLogger_messageQueueEnqueueWaitTime = null;
- statsLogger_numMessagesDeserialized = null;
- statsLogger_numMessagesDeserializedPerEntry = null;
- statsLogger_readAttempts = null;
- statsLogger_readLatencyPerBatch = null;
- statsLogger_numEntriesPerBatch = null;
- statsLogger_recordDeserializeTime = null;
- statsLogger_numRecordDeserialized = null;
- statsLogger_totalExecutionTime = null;
+ statsLoggerEntryQueueDequeueWaitTime = null;
+ statsLoggerBytesRead = null;
+ statsLoggerEntryDeserializeTime = null;
+ statsLoggerMessageQueueEnqueueWaitTime = null;
+ statsLoggerNumMessagesDeserialized = null;
+ statsLoggerNumMessagesDeserializedPerEntry = null;
+ statsLoggerReadAttempts = null;
+ statsLoggerReadLatencyPerBatch = null;
+ statsLoggerNumEntriesPerBatch = null;
+ statsLoggerRecordDeserializeTime = null;
+ statsLoggerNumRecordDeserialized = null;
+ statsLoggerTotalExecutionTime = null;
}
}
public void start_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() {
if (statsLogger != null) {
- ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime = System.nanoTime();
+ entryQueueDequeueWaitTimeStartTime = System.nanoTime();
}
}
public void end_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() {
if (statsLogger != null) {
- long time = System.nanoTime() - ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime;
- ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum += time;
- statsLogger_entryQueueDequeueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+ long time = System.nanoTime() - entryQueueDequeueWaitTimeStartTime;
+ entryQueueDequeueWaitTimeSum += time;
+ statsLoggerEntryQueueDequeueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
}
}
public void register_BYTES_READ(long bytes) {
if (statsLogger != null) {
- BYTES_READ_sum += bytes;
- statsLogger_bytesRead.add(bytes);
+ bytesReadSum += bytes;
+ statsLoggerBytesRead.add(bytes);
}
}
public void start_ENTRY_DESERIALIZE_TIME() {
if (statsLogger != null) {
- ENTRY_DESERIALIZE_TIME_startTime = System.nanoTime();
+ entryDeserializeTimeStartTime = System.nanoTime();
}
}
public void end_ENTRY_DESERIALIZE_TIME() {
if (statsLogger != null) {
- long time = System.nanoTime() - ENTRY_DESERIALIZE_TIME_startTime;
- ENTRY_DESERIALIZE_TIME_sum += time;
- statsLogger_entryDeserializetime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+ long time = System.nanoTime() - entryDeserializeTimeStartTime;
+ entryDeserializeTimeSum += time;
+ statsLoggerEntryDeserializeTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
}
}
public void start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME() {
if (statsLogger != null) {
- MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime = System.nanoTime();
+ messageQueueEnqueueWaitTimeStartTime = System.nanoTime();
}
}
public void end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME() {
if (statsLogger != null) {
- long time = System.nanoTime() - MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime;
- MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum += time;
- statsLogger_messageQueueEnqueueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+ long time = System.nanoTime() - messageQueueEnqueueWaitTimeStartTime;
+ messageQueueEnqueueWaitTimeSum += time;
+ statsLoggerMessageQueueEnqueueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
}
}
public void incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() {
if (statsLogger != null) {
- NUM_MESSAGED_DERSERIALIZED_PER_BATCH++;
- statsLogger_numMessagesDeserialized.add(1);
+ numMessagedDerserializedPerBatch++;
+ statsLoggerNumMessagesDeserialized.add(1);
}
}
public void end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() {
if (statsLogger != null) {
- NUM_MESSAGES_DERSERIALIZED_sum += NUM_MESSAGED_DERSERIALIZED_PER_BATCH;
-
- statsLogger_numMessagesDeserializedPerEntry.registerSuccessfulValue(NUM_MESSAGED_DERSERIALIZED_PER_BATCH);
-
- NUM_MESSAGED_DERSERIALIZED_PER_BATCH = 0L;
+ numMessagesDerserializedSum += numMessagedDerserializedPerBatch;
+ statsLoggerNumMessagesDeserializedPerEntry.registerSuccessfulValue(numMessagedDerserializedPerBatch);
+ numMessagedDerserializedPerBatch = 0L;
}
}
public void incr_READ_ATTEMPTS_SUCCESS() {
if (statsLogger != null) {
- READ_ATTEMTPS_SUCCESS_sum++;
- statsLogger_readAttempts.registerSuccessfulValue(1L);
+ readAttemptsSuccessSum++;
+ statsLoggerReadAttempts.registerSuccessfulValue(1L);
}
}
public void incr_READ_ATTEMPTS_FAIL() {
if (statsLogger != null) {
- READ_ATTEMTPS_FAIL_sum++;
- statsLogger_readAttempts.registerFailedValue(1L);
+ readAttemptsFailSum++;
+ statsLoggerReadAttempts.registerFailedValue(1L);
}
}
public void register_READ_LATENCY_PER_BATCH_SUCCESS(long latency) {
if (statsLogger != null) {
- READ_LATENCY_SUCCESS_sum += latency;
- statsLogger_readLatencyPerBatch.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
+ readLatencySuccessSum += latency;
+ statsLoggerReadLatencyPerBatch.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
}
}
public void register_READ_LATENCY_PER_BATCH_FAIL(long latency) {
if (statsLogger != null) {
- READ_LATENCY_FAIL_sum += latency;
- statsLogger_readLatencyPerBatch.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
+ readLatencyFailSum += latency;
+ statsLoggerReadLatencyPerBatch.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
}
}
public void incr_NUM_ENTRIES_PER_BATCH_SUCCESS(long delta) {
if (statsLogger != null) {
- NUM_ENTRIES_PER_BATCH_sum += delta;
- statsLogger_numEntriesPerBatch.registerSuccessfulValue(delta);
+ numEntriesPerBatchSum += delta;
+ statsLoggerNumEntriesPerBatch.registerSuccessfulValue(delta);
}
}
public void incr_NUM_ENTRIES_PER_BATCH_FAIL(long delta) {
if (statsLogger != null) {
- statsLogger_numEntriesPerBatch.registerFailedValue(delta);
+ statsLoggerNumEntriesPerBatch.registerFailedValue(delta);
}
}
public void register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(long latency) {
if (statsLogger != null) {
- MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum += latency;
+ messageQueueDequeueWaitTimeSum += latency;
}
}
public void start_RECORD_DESERIALIZE_TIME() {
if (statsLogger != null) {
- RECORD_DESERIALIZE_TIME_startTime = System.nanoTime();
+ recordDeserializeTimeStartTime = System.nanoTime();
}
}
public void end_RECORD_DESERIALIZE_TIME() {
if (statsLogger != null) {
- long time = System.nanoTime() - RECORD_DESERIALIZE_TIME_startTime;
- RECORD_DESERIALIZE_TIME_sum += time;
- statsLogger_recordDeserializeTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
+ long time = System.nanoTime() - recordDeserializeTimeStartTime;
+ recordDeserializeTimeSum += time;
+ statsLoggerRecordDeserializeTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
}
}
public void incr_NUM_RECORD_DESERIALIZED() {
if (statsLogger != null) {
- statsLogger_numRecordDeserialized.add(1);
+ statsLoggerNumRecordDeserialized.add(1);
}
}
public void register_TOTAL_EXECUTION_TIME(long latency) {
if (statsLogger != null) {
- statsLogger_totalExecutionTime.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
+ statsLoggerTotalExecutionTime.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
}
}
@@ -308,47 +309,47 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{
if (statsLogger != null) {
// register total entry dequeue wait time for query
statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY)
- .registerSuccessfulEvent(ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum, TimeUnit.NANOSECONDS);
+ .registerSuccessfulEvent(entryQueueDequeueWaitTimeSum, TimeUnit.NANOSECONDS);
//register bytes read per query
statsLogger.getOpStatsLogger(BYTES_READ_PER_QUERY)
- .registerSuccessfulValue(BYTES_READ_sum);
+ .registerSuccessfulValue(bytesReadSum);
// register total time spent deserializing entries for query
statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME_PER_QUERY)
- .registerSuccessfulEvent(ENTRY_DESERIALIZE_TIME_sum, TimeUnit.NANOSECONDS);
+ .registerSuccessfulEvent(entryDeserializeTimeSum, TimeUnit.NANOSECONDS);
// register time spent waiting for message queue enqueue because message queue is full per query
statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_PER_QUERY)
- .registerSuccessfulEvent(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum, TimeUnit.NANOSECONDS);
+ .registerSuccessfulEvent(messageQueueEnqueueWaitTimeSum, TimeUnit.NANOSECONDS);
// register number of messages deserialized per query
statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_QUERY)
- .registerSuccessfulValue(NUM_MESSAGES_DERSERIALIZED_sum);
+ .registerSuccessfulValue(numMessagesDerserializedSum);
// register number of read attempts per query
statsLogger.getOpStatsLogger(READ_ATTEMTPS_PER_QUERY)
- .registerSuccessfulValue(READ_ATTEMTPS_SUCCESS_sum);
+ .registerSuccessfulValue(readAttemptsSuccessSum);
statsLogger.getOpStatsLogger(READ_ATTEMTPS_PER_QUERY)
- .registerFailedValue(READ_ATTEMTPS_FAIL_sum);
+ .registerFailedValue(readAttemptsFailSum);
// register total read latency for query
statsLogger.getOpStatsLogger(READ_LATENCY_PER_QUERY)
- .registerSuccessfulEvent(READ_LATENCY_SUCCESS_sum, TimeUnit.NANOSECONDS);
+ .registerSuccessfulEvent(readLatencySuccessSum, TimeUnit.NANOSECONDS);
statsLogger.getOpStatsLogger(READ_LATENCY_PER_QUERY)
- .registerFailedEvent(READ_LATENCY_FAIL_sum, TimeUnit.NANOSECONDS);
+ .registerFailedEvent(readLatencyFailSum, TimeUnit.NANOSECONDS);
// register number of entries per query
statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_QUERY)
- .registerSuccessfulValue(NUM_ENTRIES_PER_BATCH_sum);
+ .registerSuccessfulValue(numEntriesPerBatchSum);
// register time spent waiting to read for message queue per query
statsLogger.getOpStatsLogger(MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY)
- .registerSuccessfulEvent(MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum, TimeUnit.MILLISECONDS);
+ .registerSuccessfulEvent(messageQueueDequeueWaitTimeSum, TimeUnit.MILLISECONDS);
// register time spent deserializing records per query
statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME_PER_QUERY)
- .registerSuccessfulEvent(RECORD_DESERIALIZE_TIME_sum, TimeUnit.NANOSECONDS);
+ .registerSuccessfulEvent(recordDeserializeTimeSum, TimeUnit.NANOSECONDS);
}
}
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java
index 10dcd57..3d42ee0 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java
@@ -18,6 +18,11 @@
*/
package org.apache.pulsar.sql.presto;
+import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
+import static io.airlift.configuration.ConfigBinder.configBinder;
+import static io.airlift.json.JsonBinder.jsonBinder;
+import static java.util.Objects.requireNonNull;
+
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.fasterxml.jackson.databind.DeserializationContext;
@@ -25,14 +30,11 @@ import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
-
import javax.inject.Inject;
-import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
-import static io.airlift.configuration.ConfigBinder.configBinder;
-import static io.airlift.json.JsonBinder.jsonBinder;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * This class defines binding of classes in the Presto connector.
+ */
public class PulsarConnectorModule implements Module {
private final String connectorId;
@@ -56,9 +58,11 @@ public class PulsarConnectorModule implements Module {
configBinder(binder).bindConfig(PulsarConnectorConfig.class);
jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
-
}
+ /**
+ * A wrapper to deserialize the Presto types.
+ */
public static final class TypeDeserializer
extends FromStringDeserializer<Type> {
private static final long serialVersionUID = 1L;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
index b14cb87..395e470 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
@@ -18,17 +18,18 @@
*/
package org.apache.pulsar.sql.presto;
-import org.apache.avro.Schema;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.common.naming.TopicName;
-
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.TopicName;
+/**
+ * A helper class containing repeatable logic used in the other classes.
+ */
public class PulsarConnectorUtils {
public static Schema parseSchema(String schemaJson) {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarHandleResolver.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarHandleResolver.java
index 29c22ad..45a0bed 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarHandleResolver.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarHandleResolver.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.sql.presto;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorSplit;
@@ -25,9 +28,9 @@ import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * This class helps to resolve classes for the Presto connector.
+ */
public class PulsarHandleResolver implements ConnectorHandleResolver {
@Override
public Class<? extends ConnectorTableHandle> getTableHandleClass() {
@@ -57,8 +60,8 @@ public class PulsarHandleResolver implements ConnectorHandleResolver {
static PulsarColumnHandle convertColumnHandle(ColumnHandle columnHandle) {
requireNonNull(columnHandle, "columnHandle is null");
- checkArgument(columnHandle instanceof PulsarColumnHandle, "columnHandle is not an instance of " +
- "PulsarColumnHandle");
+ checkArgument(columnHandle instanceof PulsarColumnHandle, "columnHandle is not an instance of "
+ + "PulsarColumnHandle");
return (PulsarColumnHandle) columnHandle;
}
@@ -70,8 +73,8 @@ public class PulsarHandleResolver implements ConnectorHandleResolver {
static PulsarTableLayoutHandle convertLayout(ConnectorTableLayoutHandle layout) {
requireNonNull(layout, "layout is null");
- checkArgument(layout instanceof PulsarTableLayoutHandle, "layout is not an instance of " +
- "PulsarTableLayoutHandle");
+ checkArgument(layout instanceof PulsarTableLayoutHandle, "layout is not an instance of "
+ + "PulsarTableLayoutHandle");
return (PulsarTableLayoutHandle) layout;
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
index c18a0e2..f511f8a 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
@@ -18,27 +18,31 @@
*/
package org.apache.pulsar.sql.presto;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.Objects.requireNonNull;
+
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.TimestampType;
-import com.facebook.presto.spi.type.TimestampWithTimeZoneType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarcharType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import org.apache.pulsar.common.api.raw.RawMessage;
-
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
+import org.apache.pulsar.common.api.raw.RawMessage;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * This abstract class represents internal columns.
+ */
public abstract class PulsarInternalColumn {
+ /**
+ * Internal column representing the event time.
+ */
public static class EventTimeColumn extends PulsarInternalColumn {
EventTimeColumn(String name, Type type, String comment) {
@@ -51,6 +55,9 @@ public abstract class PulsarInternalColumn {
}
}
+ /**
+ * Internal column representing the publish time.
+ */
public static class PublishTimeColumn extends PulsarInternalColumn {
PublishTimeColumn(String name, Type type, String comment) {
@@ -63,6 +70,9 @@ public abstract class PulsarInternalColumn {
}
}
+ /**
+ * Internal column representing the message id.
+ */
public static class MessageIdColumn extends PulsarInternalColumn {
MessageIdColumn(String name, Type type, String comment) {
@@ -75,6 +85,9 @@ public abstract class PulsarInternalColumn {
}
}
+ /**
+ * Internal column representing the sequence id.
+ */
public static class SequenceIdColumn extends PulsarInternalColumn {
SequenceIdColumn(String name, Type type, String comment) {
@@ -87,6 +100,9 @@ public abstract class PulsarInternalColumn {
}
}
+ /**
+ * Internal column representing the producer name.
+ */
public static class ProducerNameColumn extends PulsarInternalColumn {
ProducerNameColumn(String name, Type type, String comment) {
@@ -99,6 +115,9 @@ public abstract class PulsarInternalColumn {
}
}
+ /**
+ * Internal column representing the key.
+ */
public static class KeyColumn extends PulsarInternalColumn {
KeyColumn(String name, Type type, String comment) {
@@ -111,9 +130,11 @@ public abstract class PulsarInternalColumn {
}
}
+ /**
+ * Internal column representing the message properties.
+ */
public static class PropertiesColumn extends PulsarInternalColumn {
-
private static final ObjectMapper mapper = new ObjectMapper();
PropertiesColumn(String name, Type type, String comment) {
@@ -145,8 +166,8 @@ public abstract class PulsarInternalColumn {
public static final PulsarInternalColumn PRODUCER_NAME = new ProducerNameColumn("__producer_name__", VarcharType
.VARCHAR, "The name of the producer that publish the message used to generate this row");
- public static final PulsarInternalColumn KEY = new KeyColumn("__key__", VarcharType.VARCHAR, "The partition key " +
- "for the topic");
+ public static final PulsarInternalColumn KEY = new KeyColumn("__key__", VarcharType.VARCHAR, "The partition key "
+ + "for the topic");
public static final PulsarInternalColumn PROPERTIES = new PropertiesColumn("__properties__", VarcharType.VARCHAR,
"User defined properties");
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index a431982..360baaf 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -18,6 +18,18 @@
*/
package org.apache.pulsar.sql.presto;
+import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
+import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
+import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
+import static com.facebook.presto.spi.type.DateType.DATE;
+import static com.facebook.presto.spi.type.TimeType.TIME;
+import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
+import static java.util.Objects.requireNonNull;
+import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
+import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded;
+import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
+import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
+
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorSession;
@@ -49,6 +61,16 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.Stack;
+import java.util.stream.Collectors;
+import javax.inject.Inject;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
@@ -62,30 +84,9 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
-import javax.inject.Inject;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.Stack;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
-import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
-import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
-import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
-import static com.facebook.presto.spi.type.DateType.DATE;
-import static com.facebook.presto.spi.type.TimeType.TIME;
-import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
-import static java.util.Objects.requireNonNull;
-import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
-import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
-import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded;
-import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
-
+/**
+ * This connector helps to work with metadata.
+ */
public class PulsarMetadata implements ConnectorMetadata {
private final String connectorId;
@@ -114,7 +115,7 @@ public class PulsarMetadata implements ConnectorMetadata {
List<String> tenants = pulsarAdmin.tenants().getTenants();
for (String tenant : tenants) {
prestoSchemas.addAll(pulsarAdmin.namespaces().getNamespaces(tenant).stream().map(namespace ->
- rewriteNamespaceDelimiterIfNeeded(namespace, pulsarConnectorConfig)).collect(Collectors.toList()));
+ rewriteNamespaceDelimiterIfNeeded(namespace, pulsarConnectorConfig)).collect(Collectors.toList()));
}
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 401) {
@@ -141,7 +142,8 @@ public class PulsarMetadata implements ConnectorMetadata {
Optional<Set<ColumnHandle>> desiredColumns) {
PulsarTableHandle handle = convertTableHandle(table);
- ConnectorTableLayout layout = new ConnectorTableLayout(new PulsarTableLayoutHandle(handle, constraint.getSummary()));
+ ConnectorTableLayout layout = new ConnectorTableLayout(
+ new PulsarTableLayoutHandle(handle, constraint.getSummary()));
return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
}
@@ -173,7 +175,8 @@ public class PulsarMetadata implements ConnectorMetadata {
} else {
List<String> pulsarTopicList = null;
try {
- pulsarTopicList = this.pulsarAdmin.topics().getList(restoreNamespaceDelimiterIfNeeded(schemaNameOrNull, pulsarConnectorConfig));
+ pulsarTopicList = this.pulsarAdmin.topics()
+ .getList(restoreNamespaceDelimiterIfNeeded(schemaNameOrNull, pulsarConnectorConfig));
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) {
log.warn("Schema " + schemaNameOrNull + " does not exsit");
@@ -465,9 +468,9 @@ public class PulsarMetadata implements ConnectorMetadata {
canBeNull = true;
}
} else {
- List<PulsarColumnMetadata> columns = getColumns(fieldName, type, fieldTypes, fieldNames, positionIndices);
+ List<PulsarColumnMetadata> columns = getColumns(fieldName, type, fieldTypes, fieldNames,
+ positionIndices);
columnMetadataList.addAll(columns);
-
}
}
} else if (fieldSchema.getType() == Schema.Type.RECORD) {
@@ -484,7 +487,8 @@ public class PulsarMetadata implements ConnectorMetadata {
if (fieldName == null) {
columns = getColumns(field.name(), field.schema(), fieldTypes, fieldNames, positionIndices);
} else {
- columns = getColumns(String.format("%s.%s", fieldName, field.name()), field.schema(), fieldTypes, fieldNames, positionIndices);
+ columns = getColumns(String.format("%s.%s", fieldName, field.name()), field.schema(),
+ fieldTypes, fieldNames, positionIndices);
}
positionIndices.pop();
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPlugin.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPlugin.java
index c9a94ae..850b3f5 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPlugin.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPlugin.java
@@ -22,10 +22,12 @@ import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.connector.ConnectorFactory;
import com.google.common.collect.ImmutableList;
+/**
+ * Implementation of the Pulsar plugin for Pesto.
+ */
public class PulsarPlugin implements Plugin {
@Override
- public Iterable<ConnectorFactory> getConnectorFactories()
- {
+ public Iterable<ConnectorFactory> getConnectorFactories() {
return ImmutableList.of(new PulsarConnectorFactory());
}
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java
index 6e3324b..28980a9 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java
@@ -23,8 +23,6 @@ import io.netty.buffer.ByteBufUtil;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Date;
-
-import io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -62,4 +60,4 @@ public class PulsarPrimitiveSchemaHandler implements SchemaHandler {
public Object extractField(int index, Object currentRecord) {
return currentRecord;
}
-}
\ No newline at end of file
+}
\ No newline at end of file
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 9afc2cd..5cc6e8f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -37,16 +37,13 @@ import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarbinaryType;
import com.facebook.presto.spi.type.VarcharType;
import com.google.common.annotations.VisibleForTesting;
-
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
-
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -63,7 +60,9 @@ import org.apache.pulsar.common.naming.TopicName;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
-
+/**
+ * Implementation of a cursor to read records.
+ */
public class PulsarRecordCursor implements RecordCursor {
private List<PulsarColumnHandle> columnHandles;
@@ -116,15 +115,16 @@ public class PulsarRecordCursor implements RecordCursor {
// Exposed for testing purposes
PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig
- pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig,
- PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
+ pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig,
+ PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
this.splitSize = pulsarSplit.getSplitSize();
- initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, managedLedgerConfig, pulsarConnectorMetricsTracker);
+ initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, managedLedgerConfig,
+ pulsarConnectorMetricsTracker);
}
private void initialize(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig
- pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig,
- PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
+ pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig,
+ PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
this.columnHandles = columnHandles;
this.pulsarSplit = pulsarSplit;
this.pulsarConnectorConfig = pulsarConnectorConfig;
@@ -147,7 +147,7 @@ public class PulsarRecordCursor implements RecordCursor {
try {
this.cursor = getCursor(TopicName.get("persistent", NamespaceName.get(pulsarSplit.getSchemaName()),
- pulsarSplit.getTableName()), pulsarSplit.getStartPosition(), managedLedgerFactory, managedLedgerConfig);
+ pulsarSplit.getTableName()), pulsarSplit.getStartPosition(), managedLedgerFactory, managedLedgerConfig);
} catch (ManagedLedgerException | InterruptedException e) {
log.error(e, "Failed to get read only cursor");
close();
@@ -298,11 +298,13 @@ public class PulsarRecordCursor implements RecordCursor {
ReadOnlyCursorImpl readOnlyCursorImpl = ((ReadOnlyCursorImpl) cursor);
// check if ledger is offloaded
if (!readOffloaded && readOnlyCursorImpl.getCurrentLedgerInfo().hasOffloadContext()) {
- log.warn("Ledger %s is offloaded for topic %s. Ignoring it because offloader is not configured",
- readOnlyCursorImpl.getCurrentLedgerInfo().getLedgerId(), pulsarSplit.getTableName());
+ log.warn(
+ "Ledger %s is offloaded for topic %s. Ignoring it because offloader is not configured",
+ readOnlyCursorImpl.getCurrentLedgerInfo().getLedgerId(), pulsarSplit.getTableName());
long numEntries = readOnlyCursorImpl.getCurrentLedgerInfo().getEntries();
- long entriesToSkip = (numEntries - ((PositionImpl) cursor.getReadPosition()).getEntryId()) + 1;
+ long entriesToSkip =
+ (numEntries - ((PositionImpl) cursor.getReadPosition()).getEntryId()) + 1;
cursor.skipEntries(Math.toIntExact((entriesToSkip)));
entriesProcessed += entriesToSkip;
@@ -337,13 +339,14 @@ public class PulsarRecordCursor implements RecordCursor {
outstandingReadsRequests.incrementAndGet();
//set read latency stats for success
- metricsTracker.register_READ_LATENCY_PER_BATCH_SUCCESS(System.nanoTime() - (long)ctx);
+ metricsTracker.register_READ_LATENCY_PER_BATCH_SUCCESS(System.nanoTime() - (long) ctx);
//stats for number of entries read
metricsTracker.incr_NUM_ENTRIES_PER_BATCH_SUCCESS(entries.size());
}
public boolean hasFinished() {
- return messageQueue.isEmpty() && isDone && outstandingReadsRequests.get() >=1 && splitSize <= entriesProcessed;
+ return messageQueue.isEmpty() && isDone && outstandingReadsRequests.get() >= 1
+ && splitSize <= entriesProcessed;
}
@Override
@@ -352,7 +355,7 @@ public class PulsarRecordCursor implements RecordCursor {
outstandingReadsRequests.incrementAndGet();
//set read latency stats for failed
- metricsTracker.register_READ_LATENCY_PER_BATCH_FAIL(System.nanoTime() - (long)ctx);
+ metricsTracker.register_READ_LATENCY_PER_BATCH_FAIL(System.nanoTime() - (long) ctx);
//stats for number of entries read failed
metricsTracker.incr_NUM_ENTRIES_PER_BATCH_FAIL((long) maxBatchSize);
}
@@ -375,7 +378,7 @@ public class PulsarRecordCursor implements RecordCursor {
currentMessage = null;
}
- while(true) {
+ while (true) {
if (readEntries.hasFinished()) {
return false;
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSet.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSet.java
index 17e7036..be94a1b 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSet.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSet.java
@@ -18,15 +18,17 @@
*/
package org.apache.pulsar.sql.presto;
+import static java.util.Objects.requireNonNull;
+
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.RecordSet;
import com.facebook.presto.spi.type.Type;
import com.google.common.collect.ImmutableList;
-
import java.util.List;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * Implementation of a record set.
+ */
public class PulsarRecordSet implements RecordSet {
private final List<PulsarColumnHandle> columnHandles;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSetProvider.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSetProvider.java
index 378013b..c49bae9 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSetProvider.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSetProvider.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.sql.presto;
+import static java.util.Objects.requireNonNull;
+
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
@@ -25,12 +27,12 @@ import com.facebook.presto.spi.RecordSet;
import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.google.common.collect.ImmutableList;
-
-import javax.inject.Inject;
import java.util.List;
+import javax.inject.Inject;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * Implementation of the provider for record sets.
+ */
public class PulsarRecordSetProvider implements ConnectorRecordSetProvider {
private final PulsarConnectorConfig pulsarConnectorConfig;
@@ -52,7 +54,6 @@ public class PulsarRecordSetProvider implements ConnectorRecordSetProvider {
handles.add((PulsarColumnHandle) handle);
}
-
return new PulsarRecordSet(pulsarSplit, handles.build(), this.pulsarConnectorConfig);
}
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
index 2fdccc4..eeebbd1 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.sql.presto;
+import static java.util.Objects.requireNonNull;
+
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
@@ -25,15 +27,15 @@ import com.facebook.presto.spi.predicate.TupleDomain;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Map;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
-import java.util.List;
-import java.util.Map;
-
-import static java.util.Objects.requireNonNull;
-
+/**
+ * This class represents information for a split.
+ */
public class PulsarSplit implements ConnectorSplit {
private final long splitId;
@@ -176,19 +178,19 @@ public class PulsarSplit implements ConnectorSplit {
@Override
public String toString() {
- return "PulsarSplit{" +
- "splitId=" + splitId +
- ", connectorId='" + connectorId + '\'' +
- ", schemaName='" + schemaName + '\'' +
- ", tableName='" + tableName + '\'' +
- ", splitSize=" + splitSize +
- ", schema='" + schema + '\'' +
- ", schemaType=" + schemaType +
- ", startPositionEntryId=" + startPositionEntryId +
- ", endPositionEntryId=" + endPositionEntryId +
- ", startPositionLedgerId=" + startPositionLedgerId +
- ", endPositionLedgerId=" + endPositionLedgerId +
- '}';
+ return "PulsarSplit{"
+ + "splitId=" + splitId
+ + ", connectorId='" + connectorId + '\''
+ + ", schemaName='" + schemaName + '\''
+ + ", tableName='" + tableName + '\''
+ + ", splitSize=" + splitSize
+ + ", schema='" + schema + '\''
+ + ", schemaType=" + schemaType
+ + ", startPositionEntryId=" + startPositionEntryId
+ + ", endPositionEntryId=" + endPositionEntryId
+ + ", startPositionLedgerId=" + startPositionLedgerId
+ + ", endPositionLedgerId=" + endPositionLedgerId
+ + '}';
}
public SchemaInfo getSchemaInfo() {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index 77b8f11..6da5df4 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -18,6 +18,12 @@
*/
package org.apache.pulsar.sql.presto;
+import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+import static org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries;
+import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
+
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
@@ -30,14 +36,20 @@ import com.facebook.presto.spi.predicate.Domain;
import com.facebook.presto.spi.predicate.Range;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
import io.airlift.log.Logger;
+import java.sql.Timestamp;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import javax.inject.Inject;
import lombok.Data;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -47,22 +59,10 @@ import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
-import com.google.common.base.Predicate;
-import org.apache.bookkeeper.conf.ClientConfiguration;
-
-import javax.inject.Inject;
-import java.sql.Timestamp;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-import static org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries;
-import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
+/**
+ * The class helping to manage splits.
+ */
public class PulsarSplitManager implements ConnectorSplitManager {
private final String connectorId;
@@ -146,7 +146,7 @@ public class PulsarSplitManager implements ConnectorSplitManager {
}
throw new RuntimeException("Failed to get metadata for partitioned topic "
- + topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(),e);
+ + topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
int actualNumSplits = Math.max(numPartitions, numSplits);
@@ -342,15 +342,15 @@ public class PulsarSplitManager implements ConnectorSplitManager {
// Just use a close bound since presto can always filter out the extra entries even if
// the bound
// should be open or a mixture of open and closed
- com.google.common.collect.Range<PositionImpl> posRange
- = com.google.common.collect.Range.range(overallStartPos,
+ com.google.common.collect.Range<PositionImpl> posRange =
+ com.google.common.collect.Range.range(overallStartPos,
com.google.common.collect.BoundType.CLOSED,
overallEndPos, com.google.common.collect.BoundType.CLOSED);
long numOfEntries = readOnlyCursor.getNumberOfEntries(posRange) - 1;
- PredicatePushdownInfo predicatePushdownInfo
- = new PredicatePushdownInfo(overallStartPos, overallEndPos, numOfEntries);
+ PredicatePushdownInfo predicatePushdownInfo =
+ new PredicatePushdownInfo(overallStartPos, overallEndPos, numOfEntries);
log.debug("Predicate pushdown optimization calculated: %s", predicatePushdownInfo);
return predicatePushdownInfo;
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableHandle.java
index 6e8ba90..2ffcfde 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableHandle.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableHandle.java
@@ -18,20 +18,22 @@
*/
package org.apache.pulsar.sql.presto;
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.SchemaTableName;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-
import java.util.Objects;
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * Description of basic metadata of a table.
+ */
public class PulsarTableHandle implements ConnectorTableHandle {
/**
- * connector id
+ * Connector id.
*/
private final String connectorId;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java
index a7c240b..257c2a2 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java
@@ -18,16 +18,18 @@
*/
package org.apache.pulsar.sql.presto;
+import static java.util.Objects.requireNonNull;
+
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-
import java.util.Objects;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * This class handles the table layout.
+ */
public class PulsarTableLayoutHandle implements ConnectorTableLayoutHandle {
private final PulsarTableHandle table;
private final TupleDomain<ColumnHandle> tupleDomain;
@@ -46,14 +48,12 @@ public class PulsarTableLayoutHandle implements ConnectorTableLayoutHandle {
}
@JsonProperty
- public TupleDomain<ColumnHandle> getTupleDomain()
- {
+ public TupleDomain<ColumnHandle> getTupleDomain() {
return tupleDomain;
}
@Override
- public boolean equals(Object o)
- {
+ public boolean equals(Object o) {
if (this == o) {
return true;
}
@@ -61,19 +61,17 @@ public class PulsarTableLayoutHandle implements ConnectorTableLayoutHandle {
return false;
}
PulsarTableLayoutHandle that = (PulsarTableLayoutHandle) o;
- return Objects.equals(table, that.table) &&
- Objects.equals(tupleDomain, that.tupleDomain);
+ return Objects.equals(table, that.table)
+ && Objects.equals(tupleDomain, that.tupleDomain);
}
@Override
- public int hashCode()
- {
+ public int hashCode() {
return Objects.hash(table, tupleDomain);
}
@Override
- public String toString()
- {
+ public String toString() {
return table.toString();
}
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTopicDescription.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTopicDescription.java
index ae163ef..1658fee 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTopicDescription.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTopicDescription.java
@@ -18,14 +18,17 @@
*/
package org.apache.pulsar.sql.presto;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Objects.requireNonNull;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Represents the basic information about a pulsar topic.
+ */
public class PulsarTopicDescription {
private final String tableName;
private final String topicName;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java
index 52282b2..2d1e104 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.sql.presto;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+/**
+ * A handle for transactions.
+ */
public enum PulsarTransactionHandle implements ConnectorTransactionHandle {
INSTANCE
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
index 1e02d2ba..37fce04 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.sql.presto;
import io.netty.buffer.ByteBuf;
+/**
+ * This interface defines the methods to work with schemas.
+ */
public interface SchemaHandler {
Object deserialize(ByteBuf payload);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/package-info.java
similarity index 80%
copy from pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java
copy to pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/package-info.java
index 52282b2..7c6649e 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/package-info.java
@@ -16,10 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.sql.presto;
-
-import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
-
-public enum PulsarTransactionHandle implements ConnectorTransactionHandle {
- INSTANCE
-}
+/**
+ * Implementation of the connector to the Presto engine.
+ */
+package org.apache.pulsar.sql.presto;
\ No newline at end of file