You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/06/27 06:01:05 UTC

[incubator-pinot] branch time_boundary created (now 57dc511)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch time_boundary
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 57dc511  Enhance the time boundary service for backward-compatibility

This branch includes the following new commits:

     new 57dc511  Enhance the time boundary service for backward-compatibility

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Enhance the time boundary service for backward-compatibility

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch time_boundary
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 57dc511bdadf3832ed323135fa90ce40d8e5e060
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Wed Jun 26 22:53:15 2019 -0700

    Enhance the time boundary service for backward-compatibility
    
    Handle the time boundary with the following logic:
    Case 1 - segment has the same start and end time:
      Directly use the end time as the time boundary for backward-compatibility.
      There are use cases with time unit other than DAYS, but have time value
      rounded to the start of the day for offline table (offline segments have the
      same start and end time), and un-rounded time value for real-time table. In
      order to get the correct result, must attach filter 'time < endTime' to the
      offline side and filter 'time >= endTime' to the real-time side.
    Case 2 - segment has different start and end time:
      Rewind the end time with the push interval and use it as the time boundary
      to get consistent result during the push of multiple offline segments. If
      directly use the end time as the time boundary, then the first pushed offline
      segment will push forward the time boundary, while the remaining offline
      segments are not arrived yet, which will cause inconsistent result.
---
 .../requesthandler/BaseBrokerRequestHandler.java   |  5 +--
 .../HelixExternalViewBasedTimeBoundaryService.java | 48 ++++++++++++++++------
 ...ixExternalViewBasedTimeBoundaryServiceTest.java | 35 ++++++++++++----
 3 files changed, 64 insertions(+), 24 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 89d3470..6188630 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -307,8 +307,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
 
     if (_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
       // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
-      LOGGER.info(
-          "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{},"
+      LOGGER.info("RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{},"
               + " segments(queried/processed/matched/consuming):{}/{}/{}/{}, consumingFreshnessTimeMs:{},"
               + " servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId,
           brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(),
@@ -441,7 +440,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     timeFilterQuery.setId(-1);
     timeFilterQuery.setColumn(timeBoundaryInfo.getTimeColumn());
     String timeValue = timeBoundaryInfo.getTimeValue();
-    String filterValue = isOfflineRequest ? "(*\t\t" + timeValue + "]" : "(" + timeValue + "\t\t*)";
+    String filterValue = isOfflineRequest ? "(*\t\t" + timeValue + ")" : "[" + timeValue + "\t\t*)";
     timeFilterQuery.setValue(Collections.singletonList(filterValue));
     timeFilterQuery.setOperator(FilterOperator.RANGE);
     timeFilterQuery.setNestedFilterQueryIds(Collections.emptyList());
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryService.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryService.java
index 549818c..8e5b16b 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryService.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryService.java
@@ -92,7 +92,7 @@ public class HelixExternalViewBasedTimeBoundaryService implements TimeBoundarySe
     List<OfflineSegmentZKMetadata> segmentZKMetadataList =
         ZKMetadataProvider.getOfflineSegmentZKMetadataListForTable(_propertyStore, tableNameWithType);
 
-    long maxTimeValue = -1L;
+    OfflineSegmentZKMetadata latestSegmentZKMetadata = null;
     for (OfflineSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
       String segmentName = segmentZKMetadata.getSegmentName();
 
@@ -111,25 +111,47 @@ public class HelixExternalViewBasedTimeBoundaryService implements TimeBoundarySe
         continue;
       }
 
-      // Convert segment end time into table time unit
-      // NOTE: for now, time unit in segment ZK metadata should always match table time unit, but in the future we might
-      //       want to always use MILLISECONDS as the time unit in segment ZK metadata
-      maxTimeValue = Math.max(maxTimeValue, tableTimeUnit.convert(segmentEndTime, segmentZKMetadata.getTimeUnit()));
+      if (latestSegmentZKMetadata == null) {
+        latestSegmentZKMetadata = segmentZKMetadata;
+      } else {
+        long segmentEndTimeMs = segmentZKMetadata.getTimeUnit().toMillis(segmentEndTime);
+        long latestSegmentEndTimeMs =
+            latestSegmentZKMetadata.getTimeUnit().toMillis(latestSegmentZKMetadata.getEndTime());
+        if (segmentEndTimeMs > latestSegmentEndTimeMs) {
+          latestSegmentZKMetadata = segmentZKMetadata;
+        }
+      }
     }
 
-    if (maxTimeValue == -1L) {
+    if (latestSegmentZKMetadata == null) {
       LOGGER.error("Skipping updating time boundary for table: '{}' because no segment contains valid end time",
           tableNameWithType);
       return;
     }
 
-    // For HOURLY push table with time unit other than DAYS, use (maxTimeValue - 1 HOUR) as the time boundary
-    // Otherwise, use (maxTimeValue - 1 DAY)
-    long timeBoundary;
-    if ("HOURLY".equalsIgnoreCase(retentionConfig.getSegmentPushFrequency()) && tableTimeUnit != TimeUnit.DAYS) {
-      timeBoundary = maxTimeValue - tableTimeUnit.convert(1L, TimeUnit.HOURS);
-    } else {
-      timeBoundary = maxTimeValue - tableTimeUnit.convert(1L, TimeUnit.DAYS);
+    // Convert segment end time into table time unit
+    // NOTE: for now, time unit in segment ZK metadata should always match table time unit, but in the future we might
+    //       want to always use MILLISECONDS as the time unit in segment ZK metadata
+    long latestSegmentEndTime = latestSegmentZKMetadata.getEndTime();
+    long timeBoundary = tableTimeUnit.convert(latestSegmentEndTime, latestSegmentZKMetadata.getTimeUnit());
+
+    // Case 1 - segment has the same start and end time: directly use the end time as the time boundary for
+    //   backward-compatibility. There are use cases with time unit other than DAYS, but have time value rounded
+    //   to the start of the day for offline table (offline segments have the same start and end time), and un-rounded
+    //   time value for real-time table. In order to get the correct result, must attach filter 'time < endTime' to the
+    //   offline side and filter 'time >= endTime' to the real-time side.
+    // Case 2 - segment has different start and end time: rewind the end time with the push interval and use it as the
+    //   time boundary to get consistent result during the push of multiple offline segments. If directly use the end
+    //   time as the time boundary, then the first pushed offline segment will push forward the time boundary, while the
+    //   remaining offline segments are not arrived yet, which will cause inconsistent result.
+    if (latestSegmentEndTime != latestSegmentZKMetadata.getStartTime()) {
+      // For HOURLY push table with time unit other than DAYS, use {latestSegmentEndTime - (1 HOUR) + 1} as the time
+      // boundary; otherwise, use {latestSegmentEndTime - (1 DAY) + 1} as the time boundary
+      if ("HOURLY".equalsIgnoreCase(retentionConfig.getSegmentPushFrequency()) && tableTimeUnit != TimeUnit.DAYS) {
+        timeBoundary = timeBoundary - tableTimeUnit.convert(1L, TimeUnit.HOURS) + 1;
+      } else {
+        timeBoundary = timeBoundary - tableTimeUnit.convert(1L, TimeUnit.DAYS) + 1;
+      }
     }
 
     LOGGER.info("Updated time boundary for table: '{}' to: {} {}", tableNameWithType, timeBoundary, tableTimeUnit);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryServiceTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryServiceTest.java
index 74add38..3437a48 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryServiceTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryServiceTest.java
@@ -62,8 +62,9 @@ public class HelixExternalViewBasedTimeBoundaryServiceTest {
     String helixClusterName = "TestTimeBoundaryService";
     _zkClient.deleteRecursively("/" + helixClusterName + "/PROPERTYSTORE");
     _zkClient.createPersistent("/" + helixClusterName + "/PROPERTYSTORE", true);
-    _propertyStore = new ZkHelixPropertyStore<>(new ZkBaseDataAccessor<ZNRecord>(_zkClient),
-        "/" + helixClusterName + "/PROPERTYSTORE", null);
+    _propertyStore =
+        new ZkHelixPropertyStore<>(new ZkBaseDataAccessor<>(_zkClient), "/" + helixClusterName + "/PROPERTYSTORE",
+            null);
   }
 
   @AfterClass
@@ -87,7 +88,7 @@ public class HelixExternalViewBasedTimeBoundaryServiceTest {
       addSchema(rawTableName, timeUnit);
 
       int endTimeInDays = tableIndex;
-      addSegmentZKMetadata(rawTableName, endTimeInDays, timeUnit);
+      addSegmentZKMetadata(rawTableName, endTimeInDays, timeUnit, false);
 
       // Should skip real-time external view
       ExternalView externalView = constructRealtimeExternalView(realtimeTableName);
@@ -103,7 +104,8 @@ public class HelixExternalViewBasedTimeBoundaryServiceTest {
       TimeBoundaryInfo timeBoundaryInfo = timeBoundaryService.getTimeBoundaryInfoFor(offlineTableName);
       assertNotNull(timeBoundaryInfo);
       assertEquals(timeBoundaryInfo.getTimeColumn(), TIME_COLUMN);
-      assertEquals(Long.parseLong(timeBoundaryInfo.getTimeValue()), timeUnit.convert(endTimeInDays - 1, TimeUnit.DAYS));
+      assertEquals(Long.parseLong(timeBoundaryInfo.getTimeValue()),
+          timeUnit.convert(endTimeInDays - 1, TimeUnit.DAYS) + 1);
 
       // Test HOURLY push frequency
       addTableConfig(rawTableName, timeUnit, "hourly");
@@ -115,12 +117,22 @@ public class HelixExternalViewBasedTimeBoundaryServiceTest {
       assertEquals(timeBoundaryInfo.getTimeColumn(), TIME_COLUMN);
       long timeValue = Long.parseLong(timeBoundaryInfo.getTimeValue());
       if (timeUnit == TimeUnit.DAYS) {
-        assertEquals(timeValue, timeUnit.convert(endTimeInDays - 1, TimeUnit.DAYS));
+        assertEquals(timeValue, timeUnit.convert(endTimeInDays - 1, TimeUnit.DAYS) + 1);
       } else {
         assertEquals(timeValue,
-            timeUnit.convert(TimeUnit.HOURS.convert(endTimeInDays, TimeUnit.DAYS) - 1, TimeUnit.HOURS));
+            timeUnit.convert(TimeUnit.HOURS.convert(endTimeInDays, TimeUnit.DAYS) - 1, TimeUnit.HOURS) + 1);
       }
 
+      // Test the same start/end time
+      addSegmentZKMetadata(rawTableName, endTimeInDays, timeUnit, true);
+      timeBoundaryService.updateTimeBoundaryService(externalView);
+      assertNull(timeBoundaryService.getTimeBoundaryInfoFor(rawTableName));
+      assertNull(timeBoundaryService.getTimeBoundaryInfoFor(realtimeTableName));
+      timeBoundaryInfo = timeBoundaryService.getTimeBoundaryInfoFor(offlineTableName);
+      assertNotNull(timeBoundaryInfo);
+      assertEquals(timeBoundaryInfo.getTimeColumn(), TIME_COLUMN);
+      assertEquals(Long.parseLong(timeBoundaryInfo.getTimeValue()), timeUnit.convert(endTimeInDays, TimeUnit.DAYS));
+
       tableIndex++;
     }
   }
@@ -139,11 +151,18 @@ public class HelixExternalViewBasedTimeBoundaryServiceTest {
             .build());
   }
 
-  private void addSegmentZKMetadata(String rawTableName, int endTimeInDays, TimeUnit timeUnit) {
+  private void addSegmentZKMetadata(String rawTableName, int endTimeInDays, TimeUnit timeUnit,
+      boolean sameStartEndTime) {
     for (int i = 1; i <= endTimeInDays; i++) {
       OfflineSegmentZKMetadata offlineSegmentZKMetadata = new OfflineSegmentZKMetadata();
       offlineSegmentZKMetadata.setSegmentName(rawTableName + i);
-      offlineSegmentZKMetadata.setEndTime(i * timeUnit.convert(1L, TimeUnit.DAYS));
+      long segmentEndTime = i * timeUnit.convert(1L, TimeUnit.DAYS);
+      if (sameStartEndTime) {
+        offlineSegmentZKMetadata.setStartTime(segmentEndTime);
+      } else {
+        offlineSegmentZKMetadata.setStartTime(0L);
+      }
+      offlineSegmentZKMetadata.setEndTime(segmentEndTime);
       offlineSegmentZKMetadata.setTimeUnit(timeUnit);
       ZKMetadataProvider
           .setOfflineSegmentZKMetadata(_propertyStore, TableNameBuilder.OFFLINE.tableNameWithType(rawTableName),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org