You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/07/26 01:45:00 UTC
[pulsar] branch master updated: Reuse ManagedLedgerFactory
instances across SQL queries (#4813)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f88ea9d Reuse ManagedLedgerFactory instances across SQL queries (#4813)
f88ea9d is described below
commit f88ea9df9cdac2efb3511ff82a69593f08d69e3a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jul 25 18:44:53 2019 -0700
Reuse ManagedLedgerFactory instances across SQL queries (#4813)
---
.../apache/pulsar/sql/presto/PulsarConnector.java | 5 --
.../pulsar/sql/presto/PulsarConnectorCache.java | 5 +-
.../pulsar/sql/presto/PulsarSplitManager.java | 89 ++++++++--------------
.../pulsar/sql/presto/TestPulsarConnector.java | 3 +-
4 files changed, 36 insertions(+), 66 deletions(-)
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
index 498583d..1d89b51 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
@@ -87,11 +87,6 @@ public class PulsarConnector implements Connector {
log.error(e, "Failed to close pulsar connector");
}
try {
- PulsarConnectorCache.shutdown();
- } catch (Exception e) {
- log.error("Failed to shutdown pulsar connector cache");
- }
- try {
lifeCycleManager.stop();
} catch (Exception e) {
log.error(e, "Error shutting down connector");
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index 36ba1d2..5478984 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -39,11 +39,14 @@ import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.annotations.VisibleForTesting;
+
public class PulsarConnectorCache {
private static final Logger log = Logger.get(PulsarConnectorCache.class);
- private static PulsarConnectorCache instance;
+ @VisibleForTesting
+ static PulsarConnectorCache instance;
private final ManagedLedgerFactory managedLedgerFactory;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index bb1dea2..77b8f11 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -134,16 +134,6 @@ public class PulsarSplitManager implements ConnectorSplitManager {
}
@VisibleForTesting
- ManagedLedgerFactory getManagedLedgerFactory() throws Exception {
- ClientConfiguration bkClientConfiguration = new ClientConfiguration()
- .setZkServers(this.pulsarConnectorConfig.getZookeeperUri())
- .setClientTcpNoDelay(false)
- .setStickyReadsEnabled(false)
- .setUseV2WireProtocol(true);
- return new ManagedLedgerFactoryImpl(bkClientConfiguration);
- }
-
- @VisibleForTesting
Collection<PulsarSplit> getSplitsPartitionedTopic(int numSplits, TopicName topicName, PulsarTableHandle
tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain) throws Exception {
int numPartitions;
@@ -165,59 +155,40 @@ public class PulsarSplitManager implements ConnectorSplitManager {
int splitRemainder = actualNumSplits % numPartitions;
- ManagedLedgerFactory managedLedgerFactory = getManagedLedgerFactory();
-
- try {
- List<PulsarSplit> splits = new LinkedList<>();
- for (int i = 0; i < numPartitions; i++) {
-
- int splitsForThisPartition = (splitRemainder > i) ? splitsPerPartition + 1 : splitsPerPartition;
- splits.addAll(
- getSplitsForTopic(
- topicName.getPartition(i).getPersistenceNamingEncoding(),
- managedLedgerFactory,
- splitsForThisPartition,
- tableHandle,
- schemaInfo,
- topicName.getPartition(i).getLocalName(),
- tupleDomain)
- );
- }
- return splits;
- } finally {
- if (managedLedgerFactory != null) {
- try {
- managedLedgerFactory.shutdown();
- } catch (Exception e) {
- log.error(e);
- }
- }
+ ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig)
+ .getManagedLedgerFactory();
+
+ List<PulsarSplit> splits = new LinkedList<>();
+ for (int i = 0; i < numPartitions; i++) {
+
+ int splitsForThisPartition = (splitRemainder > i) ? splitsPerPartition + 1 : splitsPerPartition;
+ splits.addAll(
+ getSplitsForTopic(
+ topicName.getPartition(i).getPersistenceNamingEncoding(),
+ managedLedgerFactory,
+ splitsForThisPartition,
+ tableHandle,
+ schemaInfo,
+ topicName.getPartition(i).getLocalName(),
+ tupleDomain));
}
+ return splits;
}
@VisibleForTesting
- Collection<PulsarSplit> getSplitsNonPartitionedTopic(int numSplits, TopicName topicName, PulsarTableHandle
- tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain) throws Exception {
- ManagedLedgerFactory managedLedgerFactory = null;
- try {
- managedLedgerFactory = getManagedLedgerFactory();
-
- return getSplitsForTopic(
- topicName.getPersistenceNamingEncoding(),
- managedLedgerFactory,
- numSplits,
- tableHandle,
- schemaInfo,
- tableHandle.getTableName(), tupleDomain);
- } finally {
- if (managedLedgerFactory != null) {
- try {
- managedLedgerFactory.shutdown();
- } catch (Exception e) {
- log.error(e);
- }
- }
- }
+ Collection<PulsarSplit> getSplitsNonPartitionedTopic(int numSplits, TopicName topicName,
+ PulsarTableHandle tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain)
+ throws Exception {
+ ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig)
+ .getManagedLedgerFactory();
+
+ return getSplitsForTopic(
+ topicName.getPersistenceNamingEncoding(),
+ managedLedgerFactory,
+ numSplits,
+ tableHandle,
+ schemaInfo,
+ tableHandle.getTableName(), tupleDomain);
}
@VisibleForTesting
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 7721324..84228d5 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -920,7 +920,8 @@ public abstract class TestPulsarConnector {
}
});
- doReturn(managedLedgerFactory).when(this.pulsarSplitManager).getManagedLedgerFactory();
+ PulsarConnectorCache.instance = mock(PulsarConnectorCache.class);
+ when(PulsarConnectorCache.instance.getManagedLedgerFactory()).thenReturn(managedLedgerFactory);
for (Map.Entry<TopicName, PulsarSplit> split : splits.entrySet()) {