You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/07 21:41:47 UTC

[GitHub] merlimat closed pull request #3139: more optimizations for sql

merlimat closed pull request #3139: more optimizations for sql
URL: https://github.com/apache/pulsar/pull/3139
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index 201269c491..64247687c2 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -390,6 +390,8 @@ The Apache Software License, Version 2.0
     - simpleclient_servlet-0.5.0.jar
   * LZ4
     - lz4-java-1.5.0.jar
+  * JCTools
+    - jctools-core-2.1.2.jar
 
 Protocol Buffers License
  * Protocol Buffers
diff --git a/pulsar-sql/presto-pulsar/pom.xml b/pulsar-sql/presto-pulsar/pom.xml
index a46d3eac52..e3b202137c 100644
--- a/pulsar-sql/presto-pulsar/pom.xml
+++ b/pulsar-sql/presto-pulsar/pom.xml
@@ -39,6 +39,7 @@
         <dep.javax-validation.version>1.1.0.Final</dep.javax-validation.version>
         <dep.javax-inject.version>1</dep.javax-inject.version>
         <dep.guava.version>24.1-jre</dep.guava.version>
+        <jctools.version>2.1.2</jctools.version>
     </properties>
 
     <dependencies>
@@ -89,6 +90,12 @@
             <artifactId>managed-ledger</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.jctools</groupId>
+            <artifactId>jctools-core</artifactId>
+            <version>${jctools.version}</version>
+        </dependency>
         <!-- Presto SPI -->
         <dependency>
             <groupId>com.facebook.presto</groupId>
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 4088868045..e1791b432e 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -41,12 +41,12 @@
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
@@ -69,14 +69,14 @@
     private PulsarSplit pulsarSplit;
     private PulsarConnectorConfig pulsarConnectorConfig;
     private ReadOnlyCursor cursor;
-    private ArrayBlockingQueue<Message> messageQueue;
-    private ArrayBlockingQueue<Entry> entryQueue;
+    private SpscArrayQueue<Message> messageQueue;
+    private SpscArrayQueue<Entry> entryQueue;
     private Object currentRecord;
     private Message currentMessage;
     private Map<String, PulsarInternalColumn> internalColumnMap = PulsarInternalColumn.getInternalFieldsMap();
     private SchemaHandler schemaHandler;
     private int maxBatchSize;
-    private AtomicLong completedBytes = new AtomicLong(0L);
+    private long completedBytes = 0;
     private ReadEntries readEntries;
     private DeserializeEntries deserializeEntries;
     private TopicName topicName;
@@ -85,10 +85,18 @@
     // Stats total execution time of split
     private long startTime;
 
+    // Used to make sure we don't finish before all entries are processed since entries that have been dequeued
+    // but not been deserialized and added messages to the message queue can be missed if we just check if the queues
+    // are empty or not
+    private final long splitSize;
+    private long entriesProcessed = 0;
+
+
     private static final Logger log = Logger.get(PulsarRecordCursor.class);
 
     public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit,
                               PulsarConnectorConfig pulsarConnectorConfig) {
+        this.splitSize = pulsarSplit.getSplitSize();
         // Set start time for split
         this.startTime = System.nanoTime();
         PulsarConnectorCache pulsarConnectorCache;
@@ -107,6 +115,7 @@ public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pu
     // Exposed for testing purposes
     PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig
             pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
+        this.splitSize = pulsarSplit.getSplitSize();
         initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, pulsarConnectorMetricsTracker);
     }
 
@@ -117,8 +126,8 @@ private void initialize(List<PulsarColumnHandle> columnHandles, PulsarSplit puls
         this.pulsarSplit = pulsarSplit;
         this.pulsarConnectorConfig = pulsarConnectorConfig;
         this.maxBatchSize = pulsarConnectorConfig.getMaxEntryReadBatchSize();
-        this.messageQueue = new ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
-        this.entryQueue = new ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitEntryQueueSize());
+        this.messageQueue = new SpscArrayQueue(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
+        this.entryQueue = new SpscArrayQueue(pulsarConnectorConfig.getMaxSplitEntryQueueSize());
         this.topicName = TopicName.get("persistent",
                 NamespaceName.get(pulsarSplit.getSchemaName()),
                 pulsarSplit.getTableName());
@@ -168,7 +177,7 @@ private ReadOnlyCursor getCursor(TopicName topicName, Position startPosition, Ma
 
     @Override
     public long getCompletedBytes() {
-        return this.completedBytes.get();
+        return this.completedBytes;
     }
 
     @Override
@@ -185,7 +194,7 @@ public Type getType(int field) {
     @VisibleForTesting
     class DeserializeEntries implements Runnable {
 
-    protected AtomicBoolean isRunning = new AtomicBoolean(false);
+    protected boolean isRunning = false;
 
         private final Thread thread;
 
@@ -194,7 +203,7 @@ public DeserializeEntries() {
         }
 
         public void interrupt() {
-            isRunning.set(false);
+            isRunning = false;
             thread.interrupt();
         }
 
@@ -204,62 +213,71 @@ public void start() {
 
         @Override
         public void run() {
-            isRunning.set(true);
-            while (isRunning.get()) {
-                Entry entry;
-                try {
-                    // start time for entry queue read
-                    metricsTracker.start_ENTRY_QUEUE_DEQUEUE_WAIT_TIME();
-                    // read from entry queue and block if empty
-                    entry = entryQueue.take();
-                    // record entry queue wait time stats
-                    metricsTracker.end_ENTRY_QUEUE_DEQUEUE_WAIT_TIME();
-                } catch (InterruptedException e) {
-                    break;
-                }
-                try {
-                    long bytes = entry.getDataBuffer().readableBytes();
-                    completedBytes.addAndGet(bytes);
-                    // register stats for bytes read
-                    metricsTracker.register_BYTES_READ(bytes);
+            isRunning = true;
+            while (isRunning) {
 
-                    // set start time for time deserializing entries for stats
-                    metricsTracker.start_ENTRY_DESERIALIZE_TIME();
+                 int read = entryQueue.drain(new MessagePassingQueue.Consumer<Entry>() {
+                    @Override
+                    public void accept(Entry entry) {
 
-                    // filter entries that is not part of my split
-                    if (((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) < 0) {
                         try {
-                            MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(),
-                                    entry.getDataBuffer(), (messageId, message, byteBuf) -> {
-                                        try {
-
-                                            // start time for message queue read
-                                            metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
-
-                                            // enqueue deserialize message from this entry
-                                            messageQueue.put(message);
-
-                                            // stats for how long a read from message queue took
-                                            metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
-                                            // stats for number of messages read
-                                            metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
-
-                                        } catch (InterruptedException e) {
-                                            //no-op
-                                        }
-                                    });
-                        } catch (IOException e) {
-                            log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString());
-                            throw new RuntimeException(e);
+                            long bytes = entry.getDataBuffer().readableBytes();
+                            completedBytes += bytes;
+                            // register stats for bytes read
+                            metricsTracker.register_BYTES_READ(bytes);
+
+                            // check if we have processed all entries in this split
+                            if (((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0) {
+                                return;
+                            }
+
+                            // set start time for time deserializing entries for stats
+                            metricsTracker.start_ENTRY_DESERIALIZE_TIME();
+
+                            try {
+                                MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(),
+                                        entry.getDataBuffer(), (messageId, message, byteBuf) -> {
+                                            try {
+                                                // start time for message queue read
+                                                metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
+
+                                                // enqueue deserialize message from this entry
+                                                while (!messageQueue.offer(message)) {
+                                                    Thread.sleep(1);
+                                                }
+
+                                                // stats for how long a read from message queue took
+                                                metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
+                                                // stats for number of messages read
+                                                metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
+
+                                            } catch (InterruptedException e) {
+                                                //no-op
+                                            }
+                                        });
+                            } catch (IOException e) {
+                                log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString());
+                                throw new RuntimeException(e);
+                            }
+                            // stats for time spend deserializing entries
+                            metricsTracker.end_ENTRY_DESERIALIZE_TIME();
+
+                            // stats for num messages per entry
+                            metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
+
+                        } finally {
+                            entriesProcessed++;
+                            entry.release();
                         }
-                        // stats for time spend deserializing entries
-                        metricsTracker.end_ENTRY_DESERIALIZE_TIME();
+                    }
+                });
 
-                        // stats for num messages per entry
-                        metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
+                if (read <= 0) {
+                    try {
+                        Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                        return;
                     }
-                } finally {
-                    entry.release();
                 }
             }
         }
@@ -269,7 +287,7 @@ public void run() {
     class ReadEntries implements AsyncCallbacks.ReadEntriesCallback {
 
         // indicate whether there are any additional entries left to read
-        private final AtomicBoolean isDone = new AtomicBoolean(false);
+        private boolean isDone = false;
 
         //num of outstanding read requests
         // set to 1 because we can only read one batch a time
@@ -281,10 +299,10 @@ public void run() {
 
                 if (!cursor.hasMoreEntries() || ((PositionImpl) cursor.getReadPosition())
                         .compareTo(pulsarSplit.getEndPosition()) >= 0) {
-                    isDone.set(true);
+                    isDone = true;
 
                 } else {
-                    int batchSize = Math.min(maxBatchSize, entryQueue.remainingCapacity());
+                    int batchSize = Math.min(maxBatchSize, entryQueue.capacity() - entryQueue.size());
 
                     if (batchSize > 0) {
                         outstandingReadsRequests.decrementAndGet();
@@ -302,7 +320,17 @@ public void run() {
 
         @Override
         public void readEntriesComplete(List<Entry> entries, Object ctx) {
-            entryQueue.addAll(entries);
+
+            entryQueue.fill(new MessagePassingQueue.Supplier<Entry>() {
+                private int i = 0;
+                @Override
+                public Entry get() {
+                    Entry entry = entries.get(i);
+                    i++;
+                    return entry;
+                }
+            }, entries.size());
+
             outstandingReadsRequests.incrementAndGet();
 
             //set read latency stats for success
@@ -312,10 +340,9 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
         }
 
         public boolean hashFinished() {
-            return messageQueue.isEmpty() && entryQueue.isEmpty() && isDone.get() && outstandingReadsRequests.get() >=1;
+            return messageQueue.isEmpty() && isDone && outstandingReadsRequests.get() >=1 && splitSize <= entriesProcessed;
         }
 
-
         @Override
         public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
             log.debug(exception, "Failed to read entries from topic %s", topicName.toString());
@@ -346,7 +373,7 @@ public boolean advanceNextPosition() {
                 return false;
             }
 
-            if (messageQueue.remainingCapacity() > 0) {
+            if ((messageQueue.capacity() - messageQueue.size()) > 0) {
                 readEntries.run();
             }
 
@@ -355,9 +382,9 @@ public boolean advanceNextPosition() {
                 break;
             } else {
                 try {
-                    Thread.sleep(5);
+                    Thread.sleep(1);
                     // stats for time spent wait to read from message queue because its empty
-                    metricsTracker.register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(5);
+                    metricsTracker.register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(1);
                 } catch (InterruptedException e) {
                     throw new RuntimeException(e);
                 }
@@ -486,6 +513,7 @@ public void close() {
             this.metricsTracker.register_TOTAL_EXECUTION_TIME(System.nanoTime() - startTime);
             this.metricsTracker.close();
         }
+
     }
 
     private void checkFieldType(int field, Class<?> expected) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services