You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/07/18 18:59:40 UTC

[pulsar] branch master updated: Fix: predicate pushdown for Pulsar SQL NPE (#4744)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ac10b00  Fix: predicate pushdown for Pulsar SQL NPE (#4744)
ac10b00 is described below

commit ac10b006cf59308ae1a0bf8307ddcb2f5745cc11
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Jul 18 11:59:33 2019 -0700

    Fix: predicate pushdown for Pulsar SQL NPE (#4744)
    
    * Fix: predicate pushdown for Pulsar SQL NPE
    
    * fix unit test
---
 .../bookkeeper/mledger/impl/OpFindNewest.java      |  2 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  7 +-
 .../service/PersistentMessageFinderTest.java       |  2 +-
 tests/integration/pom.xml                          |  7 ++
 .../containers/PrestoWorkerContainer.java          |  5 ++
 .../tests/integration/presto/TestBasicPresto.java  | 92 +++++++++++++++++++++-
 6 files changed, 109 insertions(+), 6 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
index 60ecacf..57e8044 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
@@ -64,7 +64,7 @@ class OpFindNewest implements ReadEntryCallback {
         switch (state) {
         case checkFirst:
             if (!condition.apply(entry)) {
-                callback.findEntryComplete(null, OpFindNewest.this.ctx);
+                callback.findEntryComplete(startPosition, OpFindNewest.this.ctx);
                 return;
             } else {
                 lastMatchedPosition = position;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 4592e32..fc3a8cb 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -1645,7 +1645,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         ledger.addEntry("not-expired".getBytes(Encoding));
         ledger.addEntry("not-expired".getBytes(Encoding));
 
-        assertNull(
+        assertEquals(c1.readPosition,
                 c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))));
     }
 
@@ -2108,7 +2108,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
         ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
 
-        ledger.addEntry(getEntryPublishTime("retained1"));
+        Position firstPosition = ledger.addEntry(getEntryPublishTime("retained1"));
         // space apart message publish times
         Thread.sleep(100);
         ledger.addEntry(getEntryPublishTime("retained2"));
@@ -2135,6 +2135,9 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         PositionImpl found = (PositionImpl) findPositionFromAllEntries(c1, timestamp);
         assertEquals(found.getLedgerId(), ledgerId);
         assertEquals(found.getEntryId(), expectedEntryId);
+
+        found = (PositionImpl) findPositionFromAllEntries(c1, 0);
+        assertEquals(found, firstPosition);
     }
 
     @Test(timeOut = 20000)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index b2c2246..377e0ec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -154,7 +154,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
         future = findMessage(result, c1, beginTimestamp);
         future.get();
         assertEquals(result.exception, null);
-        assertEquals(result.position, null);
+        assertEquals(result.position, c1.getFirstPosition());
 
         result.reset();
         future = findMessage(result, c1, endTimestamp);
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 9db5bf2..486bea0 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -137,6 +137,13 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>com.facebook.presto</groupId>
+      <artifactId>presto-jdbc</artifactId>
+      <version>${presto.version}</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
index 2dd4d4e..71ebb48 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
@@ -37,6 +37,7 @@ public class PrestoWorkerContainer extends PulsarContainer<PrestoWorkerContainer
                 -1,
                 PRESTO_HTTP_PORT,
                 "/v1/node");
+
     }
 
     @Override
@@ -50,4 +51,8 @@ public class PrestoWorkerContainer extends PulsarContainer<PrestoWorkerContainer
             );
         }
     }
+
+    public String getUrl() {
+        return String.format("%s:%s",  getContainerIpAddress(), getMappedPort(PrestoWorkerContainer.PRESTO_HTTP_PORT));
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index d759023..093f3eb 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -32,6 +32,15 @@ import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.LinkedList;
+import java.util.List;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 @Slf4j
@@ -84,7 +93,12 @@ public class TestBasicPresto extends PulsarTestSuite {
                                     .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
                                     .build();
 
-        final String stocksTopic = "stocks";
+        String stocksTopic;
+        if (isBatched) {
+            stocksTopic = "stocks_batched";
+        } else {
+            stocksTopic = "stocks_nonbatched";
+        }
 
         @Cleanup
         Producer<Stock> producer = pulsarClient.newProducer(JSONSchema.of(Stock.class))
@@ -96,6 +110,7 @@ public class TestBasicPresto extends PulsarTestSuite {
             final Stock stock = new Stock(i,"STOCK_" + i , 100.0 + i * 10);
             producer.send(stock);
         }
+        producer.flush();
 
         result = execQuery("show schemas in pulsar;");
         assertThat(result.getExitCode()).isEqualTo(0);
@@ -105,7 +120,7 @@ public class TestBasicPresto extends PulsarTestSuite {
         assertThat(result.getExitCode()).isEqualTo(0);
         assertThat(result.getStdout()).contains("stocks");
 
-        ContainerExecResult containerExecResult = execQuery("select * from pulsar.\"public/default\".stocks order by entryid;");
+        ContainerExecResult containerExecResult = execQuery(String.format("select * from pulsar.\"public/default\".%s order by entryid;", stocksTopic));
         assertThat(containerExecResult.getExitCode()).isEqualTo(0);
         log.info("select sql query output \n{}", containerExecResult.getStdout());
         String[] split = containerExecResult.getStdout().split("\n");
@@ -119,6 +134,67 @@ public class TestBasicPresto extends PulsarTestSuite {
             assertThat(split2).contains("\"" + (100.0 + i * 10) + "\"");
         }
 
+        // test predicate pushdown
+
+        String url = String.format("jdbc:presto://%s",  pulsarCluster.getPrestoWorkerContainer().getUrl());
+        Connection connection = DriverManager.getConnection(url, "test", null);
+
+        String query = String.format("select * from pulsar" +
+                ".\"public/default\".%s order by __publish_time__", stocksTopic);
+        log.info("Executing query: {}", query);
+        ResultSet res = connection.createStatement().executeQuery(query);
+
+        List<Timestamp> timestamps = new LinkedList<>();
+        while (res.next()) {
+            printCurrent(res);
+            timestamps.add(res.getTimestamp("__publish_time__"));
+        }
+
+        assertThat(timestamps.size()).isGreaterThan(NUM_OF_STOCKS - 2);
+
+        query = String.format("select * from pulsar" +
+                ".\"public/default\".%s where __publish_time__ > timestamp '%s' order by __publish_time__", stocksTopic, timestamps.get(timestamps.size() / 2));
+        log.info("Executing query: {}", query);
+        res = connection.createStatement().executeQuery(query);
+
+        List<Timestamp> returnedTimestamps = new LinkedList<>();
+        while (res.next()) {
+            printCurrent(res);
+            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
+        }
+
+        assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size() / 2);
+
+        // Try with a predicate that has a earlier time than any entry
+        // Should return all rows
+        query = String.format("select * from pulsar" +
+                ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 0);
+        log.info("Executing query: {}", query);
+        res = connection.createStatement().executeQuery(query);
+
+        returnedTimestamps = new LinkedList<>();
+        while (res.next()) {
+            printCurrent(res);
+            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
+        }
+
+        assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size());
+
+        // Try with a predicate that has a latter time than any entry
+        // Should return no rows
+
+        query = String.format("select * from pulsar" +
+                ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 99999999999L);
+        log.info("Executing query: {}", query);
+        res = connection.createStatement().executeQuery(query);
+
+        returnedTimestamps = new LinkedList<>();
+        while (res.next()) {
+            printCurrent(res);
+            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
+        }
+
+        assertThat(returnedTimestamps.size()).isEqualTo(0);
     }
 
     @AfterSuite
@@ -137,4 +213,16 @@ public class TestBasicPresto extends PulsarTestSuite {
 
     }
 
+    private static void printCurrent(ResultSet rs) throws SQLException {
+        ResultSetMetaData rsmd = rs.getMetaData();
+        int columnsNumber = rsmd.getColumnCount();
+        for (int i = 1; i <= columnsNumber; i++) {
+            if (i > 1) System.out.print(",  ");
+            String columnValue = rs.getString(i);
+            System.out.print(columnValue + " " + rsmd.getColumnName(i));
+        }
+        System.out.println("");
+
+    }
+
 }