You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/19 01:13:20 UTC

[pulsar] branch master updated: Change type of publish_time to timestamp (#4757)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f5416e  Change type of publish_time to timestamp (#4757)
6f5416e is described below

commit 6f5416eea0e7010c26091d9cbe72122b1fd94268
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Jul 19 09:13:14 2019 +0800

    Change type of publish_time to timestamp (#4757)
    
    Fixes #4734
    
    ### Motivation
    
    "publish_time" is Pulsar SQL internal column, as Pulsar only stores timestamps, it doesn’t store the timezone information. Use timestamp as "publish_time" type is more correct way in Pulsar SQL.
    
    ### Modifications
    
    Change type of publish_time to timestamp.
    
    ### Verifying this change
    
    predicate of publish_time is pushdown
    
    Use `__publish_time__` to trim messages:
    ```
    SELECT COUNT(*)
    FROM "sql-test-1"
    WHERE "__publish_time__" >= TIMESTAMP '2019-07-18 17:26:50.119'
    AND  "__publish_time__" < TIMESTAMP '2019-07-18 17:26:51.119';
    ```
    ![image](https://user-images.githubusercontent.com/12592133/61447301-43835080-a983-11e9-814b-bc2b378f02b9.png)
    
    Without `__publish_time__` predicate:
    ```
    SELECT COUNT(*)
    FROM "sql-test-1";
    ```
    ![image](https://user-images.githubusercontent.com/12592133/61447427-82190b00-a983-11e9-8d3f-3bf2a4798047.png)
---
 .../apache/pulsar/sql/presto/PulsarInternalColumn.java   |  3 +--
 .../org/apache/pulsar/sql/presto/PulsarSplitManager.java | 16 +++++++++++-----
 .../apache/pulsar/sql/presto/TestPulsarSplitManager.java | 14 +++++---------
 3 files changed, 17 insertions(+), 16 deletions(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
index ca74e39..c18a0e2 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
@@ -134,8 +134,7 @@ public abstract class PulsarInternalColumn {
             .TIMESTAMP, "Application defined timestamp in milliseconds of when the event occurred");
 
     public static final PulsarInternalColumn PUBLISH_TIME = new PublishTimeColumn("__publish_time__",
-            TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE,
-            "The timestamp in milliseconds of when event as published");
+            TimestampType.TIMESTAMP, "The timestamp in milliseconds of when event as published");
 
     public static final PulsarInternalColumn MESSAGE_ID = new MessageIdColumn("__message_id__", VarcharType.VARCHAR,
             "The message ID of the message used to generate this row");
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index aa11a73..40f31c9 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -51,6 +51,7 @@ import com.google.common.base.Predicate;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 
 import javax.inject.Inject;
+import java.sql.Timestamp;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -321,13 +322,13 @@ public class PulsarSplitManager implements ConnectorSplitManager {
                             Range range = domain.getValues().getRanges().getOrderedRanges().get(0);
 
                             if (!range.getHigh().isUpperUnbounded()) {
-                                upperBoundTs = new SqlTimestampWithTimeZone(range.getHigh().getValueBlock().get()
-                                        .getLong(0, 0)).getMillisUtc();
+                                upperBoundTs = new Timestamp(range.getHigh().getValueBlock().get()
+                                        .getLong(0, 0)).getTime();
                             }
 
                             if (!range.getLow().isLowerUnbounded()) {
-                                lowerBoundTs = new SqlTimestampWithTimeZone(range.getLow().getValueBlock().get()
-                                        .getLong(0, 0)).getMillisUtc();
+                                lowerBoundTs = new Timestamp(range.getLow().getValueBlock().get()
+                                        .getLong(0, 0)).getTime();
                             }
 
                             PositionImpl overallStartPos;
@@ -335,15 +336,20 @@ public class PulsarSplitManager implements ConnectorSplitManager {
                                 overallStartPos = (PositionImpl) readOnlyCursor.getReadPosition();
                             } else {
                                 overallStartPos = findPosition(readOnlyCursor, lowerBoundTs);
+                                if (overallStartPos == null) {
+                                    overallStartPos = (PositionImpl) readOnlyCursor.getReadPosition();
+                                }
                             }
 
                             PositionImpl overallEndPos;
                             if (upperBoundTs == null) {
-
                                 readOnlyCursor.skipEntries(Math.toIntExact(totalNumEntries));
                                 overallEndPos = (PositionImpl) readOnlyCursor.getReadPosition();
                             } else {
                                 overallEndPos = findPosition(readOnlyCursor, upperBoundTs);
+                                if (overallEndPos == null) {
+                                    overallEndPos = overallStartPos;
+                                }
                             }
 
                             // Just use a close bound since presto can always filter out the extra entries even if
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
index c02df94..253803d 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
@@ -26,7 +26,6 @@ import com.facebook.presto.spi.predicate.Domain;
 import com.facebook.presto.spi.predicate.Range;
 import com.facebook.presto.spi.predicate.TupleDomain;
 import com.facebook.presto.spi.predicate.ValueSet;
-import com.facebook.presto.spi.type.TimeZoneKey;
 import io.airlift.log.Logger;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.common.naming.TopicName;
@@ -40,8 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
-import static com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
+import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.doAnswer;
@@ -185,9 +183,8 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
 
 
         Map<ColumnHandle, Domain> domainMap = new HashMap<>();
-        Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP_WITH_TIME_ZONE, packDateTimeWithZone
-                (currentTimeMs + 1L, TimeZoneKey.UTC_KEY), true, packDateTimeWithZone(currentTimeMs + 50L,
-                TimeZoneKey.UTC_KEY), true)), false);
+        Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP, currentTimeMs + 1L, true,
+                currentTimeMs + 50L, true)), false);
         domainMap.put(PulsarInternalColumn.PUBLISH_TIME.getColumnHandle(pulsarConnectorId.toString(), false), domain);
         TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);
 
@@ -243,9 +240,8 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
 
 
         Map<ColumnHandle, Domain> domainMap = new HashMap<>();
-        Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP_WITH_TIME_ZONE, packDateTimeWithZone
-                (currentTimeMs + 1L, TimeZoneKey.UTC_KEY), true, packDateTimeWithZone(currentTimeMs + 50L,
-                TimeZoneKey.UTC_KEY), true)), false);
+        Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP, currentTimeMs + 1L, true,
+                currentTimeMs + 50L, true)), false);
         domainMap.put(PulsarInternalColumn.PUBLISH_TIME.getColumnHandle(pulsarConnectorId.toString(), false), domain);
         TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);