You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/07/26 01:45:00 UTC

[pulsar] branch master updated: Reuse ManagedLedgerFactory instances across SQL queries (#4813)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f88ea9d  Reuse ManagedLedgerFactory instances across SQL queries (#4813)
f88ea9d is described below

commit f88ea9df9cdac2efb3511ff82a69593f08d69e3a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jul 25 18:44:53 2019 -0700

    Reuse ManagedLedgerFactory instances across SQL queries (#4813)
---
 .../apache/pulsar/sql/presto/PulsarConnector.java  |  5 --
 .../pulsar/sql/presto/PulsarConnectorCache.java    |  5 +-
 .../pulsar/sql/presto/PulsarSplitManager.java      | 89 ++++++++--------------
 .../pulsar/sql/presto/TestPulsarConnector.java     |  3 +-
 4 files changed, 36 insertions(+), 66 deletions(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
index 498583d..1d89b51 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
@@ -87,11 +87,6 @@ public class PulsarConnector implements Connector {
             log.error(e, "Failed to close pulsar connector");
         }
         try {
-            PulsarConnectorCache.shutdown();
-        } catch (Exception e) {
-            log.error("Failed to shutdown pulsar connector cache");
-        }
-        try {
             lifeCycleManager.stop();
         } catch (Exception e) {
             log.error(e, "Error shutting down connector");
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index 36ba1d2..5478984 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -39,11 +39,14 @@ import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class PulsarConnectorCache {
 
     private static final Logger log = Logger.get(PulsarConnectorCache.class);
 
-    private static PulsarConnectorCache instance;
+    @VisibleForTesting
+    static PulsarConnectorCache instance;
 
     private final ManagedLedgerFactory managedLedgerFactory;
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index bb1dea2..77b8f11 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -134,16 +134,6 @@ public class PulsarSplitManager implements ConnectorSplitManager {
     }
 
     @VisibleForTesting
-    ManagedLedgerFactory getManagedLedgerFactory() throws Exception {
-        ClientConfiguration bkClientConfiguration = new ClientConfiguration()
-                .setZkServers(this.pulsarConnectorConfig.getZookeeperUri())
-                .setClientTcpNoDelay(false)
-                .setStickyReadsEnabled(false)
-                .setUseV2WireProtocol(true);
-        return new ManagedLedgerFactoryImpl(bkClientConfiguration);
-    }
-
-    @VisibleForTesting
     Collection<PulsarSplit> getSplitsPartitionedTopic(int numSplits, TopicName topicName, PulsarTableHandle
             tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain) throws Exception {
         int numPartitions;
@@ -165,59 +155,40 @@ public class PulsarSplitManager implements ConnectorSplitManager {
 
         int splitRemainder = actualNumSplits % numPartitions;
 
-        ManagedLedgerFactory managedLedgerFactory = getManagedLedgerFactory();
-
-        try {
-            List<PulsarSplit> splits = new LinkedList<>();
-            for (int i = 0; i < numPartitions; i++) {
-
-                int splitsForThisPartition = (splitRemainder > i) ? splitsPerPartition + 1 : splitsPerPartition;
-                splits.addAll(
-                        getSplitsForTopic(
-                                topicName.getPartition(i).getPersistenceNamingEncoding(),
-                                managedLedgerFactory,
-                                splitsForThisPartition,
-                                tableHandle,
-                                schemaInfo,
-                                topicName.getPartition(i).getLocalName(),
-                                tupleDomain)
-                );
-            }
-            return splits;
-        } finally {
-            if (managedLedgerFactory != null) {
-                try {
-                    managedLedgerFactory.shutdown();
-                } catch (Exception e) {
-                    log.error(e);
-                }
-            }
+        ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig)
+                .getManagedLedgerFactory();
+
+        List<PulsarSplit> splits = new LinkedList<>();
+        for (int i = 0; i < numPartitions; i++) {
+
+            int splitsForThisPartition = (splitRemainder > i) ? splitsPerPartition + 1 : splitsPerPartition;
+            splits.addAll(
+                    getSplitsForTopic(
+                            topicName.getPartition(i).getPersistenceNamingEncoding(),
+                            managedLedgerFactory,
+                            splitsForThisPartition,
+                            tableHandle,
+                            schemaInfo,
+                            topicName.getPartition(i).getLocalName(),
+                            tupleDomain));
         }
+        return splits;
     }
 
     @VisibleForTesting
-    Collection<PulsarSplit> getSplitsNonPartitionedTopic(int numSplits, TopicName topicName, PulsarTableHandle
-            tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain) throws Exception {
-        ManagedLedgerFactory managedLedgerFactory = null;
-        try {
-            managedLedgerFactory = getManagedLedgerFactory();
-
-            return getSplitsForTopic(
-                    topicName.getPersistenceNamingEncoding(),
-                    managedLedgerFactory,
-                    numSplits,
-                    tableHandle,
-                    schemaInfo,
-                    tableHandle.getTableName(), tupleDomain);
-        } finally {
-            if (managedLedgerFactory != null) {
-                try {
-                    managedLedgerFactory.shutdown();
-                } catch (Exception e) {
-                    log.error(e);
-                }
-            }
-        }
+    Collection<PulsarSplit> getSplitsNonPartitionedTopic(int numSplits, TopicName topicName,
+            PulsarTableHandle tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain)
+            throws Exception {
+        ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig)
+                .getManagedLedgerFactory();
+
+        return getSplitsForTopic(
+                topicName.getPersistenceNamingEncoding(),
+                managedLedgerFactory,
+                numSplits,
+                tableHandle,
+                schemaInfo,
+                tableHandle.getTableName(), tupleDomain);
     }
 
     @VisibleForTesting
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 7721324..84228d5 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -920,7 +920,8 @@ public abstract class TestPulsarConnector {
             }
         });
 
-        doReturn(managedLedgerFactory).when(this.pulsarSplitManager).getManagedLedgerFactory();
+        PulsarConnectorCache.instance = mock(PulsarConnectorCache.class);
+        when(PulsarConnectorCache.instance.getManagedLedgerFactory()).thenReturn(managedLedgerFactory);
 
         for (Map.Entry<TopicName, PulsarSplit> split : splits.entrySet()) {