You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/19 01:13:20 UTC
[pulsar] branch master updated: Change type of publish_time to
timestamp (#4757)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6f5416e Change type of publish_time to timestamp (#4757)
6f5416e is described below
commit 6f5416eea0e7010c26091d9cbe72122b1fd94268
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Jul 19 09:13:14 2019 +0800
Change type of publish_time to timestamp (#4757)
Fixes #4734
### Motivation
"publish_time" is Pulsar SQL internal column, as Pulsar only stores timestamps, it doesn’t store the timezone information. Use timestamp as "publish_time" type is more correct way in Pulsar SQL.
### Modifications
Change type of publish_time to timestamp.
### Verifying this change
predicate of publish_time is pushdown
Use `__publish_time__` to trim messages:
```
SELECT COUNT(*)
FROM "sql-test-1"
WHERE "__publish_time__" >= TIMESTAMP '2019-07-18 17:26:50.119'
AND "__publish_time__" < TIMESTAMP '2019-07-18 17:26:51.119';
```
![image](https://user-images.githubusercontent.com/12592133/61447301-43835080-a983-11e9-814b-bc2b378f02b9.png)
Without `__publish_time__` predicate:
```
SELECT COUNT(*)
FROM "sql-test-1";
```
![image](https://user-images.githubusercontent.com/12592133/61447427-82190b00-a983-11e9-8d3f-3bf2a4798047.png)
---
.../apache/pulsar/sql/presto/PulsarInternalColumn.java | 3 +--
.../org/apache/pulsar/sql/presto/PulsarSplitManager.java | 16 +++++++++++-----
.../apache/pulsar/sql/presto/TestPulsarSplitManager.java | 14 +++++---------
3 files changed, 17 insertions(+), 16 deletions(-)
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
index ca74e39..c18a0e2 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
@@ -134,8 +134,7 @@ public abstract class PulsarInternalColumn {
.TIMESTAMP, "Application defined timestamp in milliseconds of when the event occurred");
public static final PulsarInternalColumn PUBLISH_TIME = new PublishTimeColumn("__publish_time__",
- TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE,
- "The timestamp in milliseconds of when event as published");
+ TimestampType.TIMESTAMP, "The timestamp in milliseconds of when event as published");
public static final PulsarInternalColumn MESSAGE_ID = new MessageIdColumn("__message_id__", VarcharType.VARCHAR,
"The message ID of the message used to generate this row");
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index aa11a73..40f31c9 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -51,6 +51,7 @@ import com.google.common.base.Predicate;
import org.apache.bookkeeper.conf.ClientConfiguration;
import javax.inject.Inject;
+import java.sql.Timestamp;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
@@ -321,13 +322,13 @@ public class PulsarSplitManager implements ConnectorSplitManager {
Range range = domain.getValues().getRanges().getOrderedRanges().get(0);
if (!range.getHigh().isUpperUnbounded()) {
- upperBoundTs = new SqlTimestampWithTimeZone(range.getHigh().getValueBlock().get()
- .getLong(0, 0)).getMillisUtc();
+ upperBoundTs = new Timestamp(range.getHigh().getValueBlock().get()
+ .getLong(0, 0)).getTime();
}
if (!range.getLow().isLowerUnbounded()) {
- lowerBoundTs = new SqlTimestampWithTimeZone(range.getLow().getValueBlock().get()
- .getLong(0, 0)).getMillisUtc();
+ lowerBoundTs = new Timestamp(range.getLow().getValueBlock().get()
+ .getLong(0, 0)).getTime();
}
PositionImpl overallStartPos;
@@ -335,15 +336,20 @@ public class PulsarSplitManager implements ConnectorSplitManager {
overallStartPos = (PositionImpl) readOnlyCursor.getReadPosition();
} else {
overallStartPos = findPosition(readOnlyCursor, lowerBoundTs);
+ if (overallStartPos == null) {
+ overallStartPos = (PositionImpl) readOnlyCursor.getReadPosition();
+ }
}
PositionImpl overallEndPos;
if (upperBoundTs == null) {
-
readOnlyCursor.skipEntries(Math.toIntExact(totalNumEntries));
overallEndPos = (PositionImpl) readOnlyCursor.getReadPosition();
} else {
overallEndPos = findPosition(readOnlyCursor, upperBoundTs);
+ if (overallEndPos == null) {
+ overallEndPos = overallStartPos;
+ }
}
// Just use a close bound since presto can always filter out the extra entries even if
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
index c02df94..253803d 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
@@ -26,7 +26,6 @@ import com.facebook.presto.spi.predicate.Domain;
import com.facebook.presto.spi.predicate.Range;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.predicate.ValueSet;
-import com.facebook.presto.spi.type.TimeZoneKey;
import io.airlift.log.Logger;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.naming.TopicName;
@@ -40,8 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
-import static com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
+import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doAnswer;
@@ -185,9 +183,8 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
Map<ColumnHandle, Domain> domainMap = new HashMap<>();
- Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP_WITH_TIME_ZONE, packDateTimeWithZone
- (currentTimeMs + 1L, TimeZoneKey.UTC_KEY), true, packDateTimeWithZone(currentTimeMs + 50L,
- TimeZoneKey.UTC_KEY), true)), false);
+ Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP, currentTimeMs + 1L, true,
+ currentTimeMs + 50L, true)), false);
domainMap.put(PulsarInternalColumn.PUBLISH_TIME.getColumnHandle(pulsarConnectorId.toString(), false), domain);
TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);
@@ -243,9 +240,8 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
Map<ColumnHandle, Domain> domainMap = new HashMap<>();
- Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP_WITH_TIME_ZONE, packDateTimeWithZone
- (currentTimeMs + 1L, TimeZoneKey.UTC_KEY), true, packDateTimeWithZone(currentTimeMs + 50L,
- TimeZoneKey.UTC_KEY), true)), false);
+ Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP, currentTimeMs + 1L, true,
+ currentTimeMs + 50L, true)), false);
domainMap.put(PulsarInternalColumn.PUBLISH_TIME.getColumnHandle(pulsarConnectorId.toString(), false), domain);
TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);