You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/07/18 18:59:40 UTC
[pulsar] branch master updated: Fix: predicate pushdown for Pulsar
SQL NPE (#4744)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ac10b00 Fix: predicate pushdown for Pulsar SQL NPE (#4744)
ac10b00 is described below
commit ac10b006cf59308ae1a0bf8307ddcb2f5745cc11
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Jul 18 11:59:33 2019 -0700
Fix: predicate pushdown for Pulsar SQL NPE (#4744)
* Fix: predicate pushdown for Pulsar SQL NPE
* fix unit test
---
.../bookkeeper/mledger/impl/OpFindNewest.java | 2 +-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 7 +-
.../service/PersistentMessageFinderTest.java | 2 +-
tests/integration/pom.xml | 7 ++
.../containers/PrestoWorkerContainer.java | 5 ++
.../tests/integration/presto/TestBasicPresto.java | 92 +++++++++++++++++++++-
6 files changed, 109 insertions(+), 6 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
index 60ecacf..57e8044 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
@@ -64,7 +64,7 @@ class OpFindNewest implements ReadEntryCallback {
switch (state) {
case checkFirst:
if (!condition.apply(entry)) {
- callback.findEntryComplete(null, OpFindNewest.this.ctx);
+ callback.findEntryComplete(startPosition, OpFindNewest.this.ctx);
return;
} else {
lastMatchedPosition = position;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 4592e32..fc3a8cb 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -1645,7 +1645,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
ledger.addEntry("not-expired".getBytes(Encoding));
ledger.addEntry("not-expired".getBytes(Encoding));
- assertNull(
+ assertEquals(c1.readPosition,
c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))));
}
@@ -2108,7 +2108,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
- ledger.addEntry(getEntryPublishTime("retained1"));
+ Position firstPosition = ledger.addEntry(getEntryPublishTime("retained1"));
// space apart message publish times
Thread.sleep(100);
ledger.addEntry(getEntryPublishTime("retained2"));
@@ -2135,6 +2135,9 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
PositionImpl found = (PositionImpl) findPositionFromAllEntries(c1, timestamp);
assertEquals(found.getLedgerId(), ledgerId);
assertEquals(found.getEntryId(), expectedEntryId);
+
+ found = (PositionImpl) findPositionFromAllEntries(c1, 0);
+ assertEquals(found, firstPosition);
}
@Test(timeOut = 20000)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index b2c2246..377e0ec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -154,7 +154,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
future = findMessage(result, c1, beginTimestamp);
future.get();
assertEquals(result.exception, null);
- assertEquals(result.position, null);
+ assertEquals(result.position, c1.getFirstPosition());
result.reset();
future = findMessage(result, c1, endTimestamp);
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 9db5bf2..486bea0 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -137,6 +137,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.facebook.presto</groupId>
+ <artifactId>presto-jdbc</artifactId>
+ <version>${presto.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
index 2dd4d4e..71ebb48 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
@@ -37,6 +37,7 @@ public class PrestoWorkerContainer extends PulsarContainer<PrestoWorkerContainer
-1,
PRESTO_HTTP_PORT,
"/v1/node");
+
}
@Override
@@ -50,4 +51,8 @@ public class PrestoWorkerContainer extends PulsarContainer<PrestoWorkerContainer
);
}
}
+
+ public String getUrl() {
+ return String.format("%s:%s", getContainerIpAddress(), getMappedPort(PrestoWorkerContainer.PRESTO_HTTP_PORT));
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index d759023..093f3eb 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -32,6 +32,15 @@ import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.LinkedList;
+import java.util.List;
+
import static org.assertj.core.api.Assertions.assertThat;
@Slf4j
@@ -84,7 +93,12 @@ public class TestBasicPresto extends PulsarTestSuite {
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
- final String stocksTopic = "stocks";
+ String stocksTopic;
+ if (isBatched) {
+ stocksTopic = "stocks_batched";
+ } else {
+ stocksTopic = "stocks_nonbatched";
+ }
@Cleanup
Producer<Stock> producer = pulsarClient.newProducer(JSONSchema.of(Stock.class))
@@ -96,6 +110,7 @@ public class TestBasicPresto extends PulsarTestSuite {
final Stock stock = new Stock(i,"STOCK_" + i , 100.0 + i * 10);
producer.send(stock);
}
+ producer.flush();
result = execQuery("show schemas in pulsar;");
assertThat(result.getExitCode()).isEqualTo(0);
@@ -105,7 +120,7 @@ public class TestBasicPresto extends PulsarTestSuite {
assertThat(result.getExitCode()).isEqualTo(0);
assertThat(result.getStdout()).contains("stocks");
- ContainerExecResult containerExecResult = execQuery("select * from pulsar.\"public/default\".stocks order by entryid;");
+ ContainerExecResult containerExecResult = execQuery(String.format("select * from pulsar.\"public/default\".%s order by entryid;", stocksTopic));
assertThat(containerExecResult.getExitCode()).isEqualTo(0);
log.info("select sql query output \n{}", containerExecResult.getStdout());
String[] split = containerExecResult.getStdout().split("\n");
@@ -119,6 +134,67 @@ public class TestBasicPresto extends PulsarTestSuite {
assertThat(split2).contains("\"" + (100.0 + i * 10) + "\"");
}
+ // test predicate pushdown
+
+ String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl());
+ Connection connection = DriverManager.getConnection(url, "test", null);
+
+ String query = String.format("select * from pulsar" +
+ ".\"public/default\".%s order by __publish_time__", stocksTopic);
+ log.info("Executing query: {}", query);
+ ResultSet res = connection.createStatement().executeQuery(query);
+
+ List<Timestamp> timestamps = new LinkedList<>();
+ while (res.next()) {
+ printCurrent(res);
+ timestamps.add(res.getTimestamp("__publish_time__"));
+ }
+
+ assertThat(timestamps.size()).isGreaterThan(NUM_OF_STOCKS - 2);
+
+ query = String.format("select * from pulsar" +
+ ".\"public/default\".%s where __publish_time__ > timestamp '%s' order by __publish_time__", stocksTopic, timestamps.get(timestamps.size() / 2));
+ log.info("Executing query: {}", query);
+ res = connection.createStatement().executeQuery(query);
+
+ List<Timestamp> returnedTimestamps = new LinkedList<>();
+ while (res.next()) {
+ printCurrent(res);
+ returnedTimestamps.add(res.getTimestamp("__publish_time__"));
+ }
+
+ assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size() / 2);
+
+ // Try with a predicate that has a earlier time than any entry
+ // Should return all rows
+ query = String.format("select * from pulsar" +
+ ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 0);
+ log.info("Executing query: {}", query);
+ res = connection.createStatement().executeQuery(query);
+
+ returnedTimestamps = new LinkedList<>();
+ while (res.next()) {
+ printCurrent(res);
+ returnedTimestamps.add(res.getTimestamp("__publish_time__"));
+ }
+
+ assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size());
+
+ // Try with a predicate that has a latter time than any entry
+ // Should return no rows
+
+ query = String.format("select * from pulsar" +
+ ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 99999999999L);
+ log.info("Executing query: {}", query);
+ res = connection.createStatement().executeQuery(query);
+
+ returnedTimestamps = new LinkedList<>();
+ while (res.next()) {
+ printCurrent(res);
+ returnedTimestamps.add(res.getTimestamp("__publish_time__"));
+ }
+
+ assertThat(returnedTimestamps.size()).isEqualTo(0);
}
@AfterSuite
@@ -137,4 +213,16 @@ public class TestBasicPresto extends PulsarTestSuite {
}
+ private static void printCurrent(ResultSet rs) throws SQLException {
+ ResultSetMetaData rsmd = rs.getMetaData();
+ int columnsNumber = rsmd.getColumnCount();
+ for (int i = 1; i <= columnsNumber; i++) {
+ if (i > 1) System.out.print(", ");
+ String columnValue = rs.getString(i);
+ System.out.print(columnValue + " " + rsmd.getColumnName(i));
+ }
+ System.out.println("");
+
+ }
+
}