You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/05/01 00:02:48 UTC

[incubator-pinot] branch time_boundary updated: address comments

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch time_boundary
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/time_boundary by this push:
     new 3c8ea8f  address comments
3c8ea8f is described below

commit 3c8ea8f4146c591a1907db4affbdc991044d8ff6
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Tue Apr 30 17:02:34 2019 -0700

    address comments
---
 .../HelixExternalViewBasedTimeBoundaryService.java | 39 +++++++++-------------
 ...ixExternalViewBasedTimeBoundaryServiceTest.java | 16 +++++----
 2 files changed, 25 insertions(+), 30 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryService.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryService.java
index d41b836..549818c 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryService.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryService.java
@@ -66,26 +66,26 @@ public class HelixExternalViewBasedTimeBoundaryService implements TimeBoundarySe
     // TODO: when we start using dateTime, pick the time column from the retention config, and use the DateTimeFieldSpec
     //       from the schema to determine the time unit
     // TODO: support SDF
-    TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
-    assert tableConfig != null;
-    SegmentsValidationAndRetentionConfig retentionConfig = tableConfig.getValidationConfig();
-    String timeColumn = retentionConfig.getTimeColumnName();
-    TimeUnit tableTimeUnit = retentionConfig.getTimeType();
+    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableNameWithType);
+    assert schema != null;
+    String timeColumn = schema.getTimeColumnName();
+    TimeUnit tableTimeUnit = schema.getOutgoingTimeUnit();
     if (timeColumn == null || tableTimeUnit == null) {
       LOGGER.error("Skipping updating time boundary for table: '{}' because time column/unit is not set",
           tableNameWithType);
       return;
     }
 
-    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableNameWithType);
-    assert schema != null;
-    if (!timeColumn.equals(schema.getTimeColumnName())) {
-      LOGGER.error("Time column does not match in table config: '{}' and schema: '{}'", timeColumn,
-          schema.getTimeColumnName());
+    TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+    assert tableConfig != null;
+    SegmentsValidationAndRetentionConfig retentionConfig = tableConfig.getValidationConfig();
+    if (!timeColumn.equals(retentionConfig.getTimeColumnName())) {
+      LOGGER.error("Time column does not match in schema: '{}' and table config: '{}'", timeColumn,
+          retentionConfig.getTimeColumnName());
     }
-    if (tableTimeUnit != schema.getOutgoingTimeUnit()) {
-      LOGGER.error("Time unit does not match in table config: '{}' and schema: '{}'", tableTimeUnit,
-          schema.getOutgoingTimeUnit());
+    if (tableTimeUnit != retentionConfig.getTimeType()) {
+      LOGGER.error("Time unit does not match in schema: '{}' and table config: '{}'", tableTimeUnit,
+          retentionConfig.getTimeType());
     }
 
     // Bulk reading all segment ZK metadata is more efficient than reading one at a time
@@ -103,15 +103,6 @@ public class HelixExternalViewBasedTimeBoundaryService implements TimeBoundarySe
         continue;
       }
 
-      // Check if time unit in segment ZK metadata matches table time unit
-      // NOTE: for now, time unit in segment ZK metadata should always match table time unit, but in the future we might
-      //       want to always use MILLISECONDS as the time unit in segment ZK metadata
-      TimeUnit segmentTimeUnit = segmentZKMetadata.getTimeUnit();
-      if (segmentTimeUnit != tableTimeUnit) {
-        LOGGER.warn("Time unit for table: '{}', segment: '{}' ZK metadata: {} does not match the table time unit: {}",
-            tableNameWithType, segmentName, segmentTimeUnit, tableTimeUnit);
-      }
-
       long segmentEndTime = segmentZKMetadata.getEndTime();
       if (segmentEndTime <= 0) {
         LOGGER
@@ -121,7 +112,9 @@ public class HelixExternalViewBasedTimeBoundaryService implements TimeBoundarySe
       }
 
       // Convert segment end time into table time unit
-      maxTimeValue = Math.max(maxTimeValue, tableTimeUnit.convert(segmentEndTime, segmentTimeUnit));
+      // NOTE: for now, time unit in segment ZK metadata should always match table time unit, but in the future we might
+      //       want to always use MILLISECONDS as the time unit in segment ZK metadata
+      maxTimeValue = Math.max(maxTimeValue, tableTimeUnit.convert(segmentEndTime, segmentZKMetadata.getTimeUnit()));
     }
 
     if (maxTimeValue == -1L) {
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryServiceTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryServiceTest.java
index 4261965..20ccc9c 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryServiceTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryServiceTest.java
@@ -78,14 +78,16 @@ public class HelixExternalViewBasedTimeBoundaryServiceTest {
     HelixExternalViewBasedTimeBoundaryService timeBoundaryService =
         new HelixExternalViewBasedTimeBoundaryService(_propertyStore);
 
-    int tableIndex = 0;
+    int tableIndex = 1;
     for (TimeUnit timeUnit : TimeUnit.values()) {
       String rawTableName = "table" + tableIndex;
       String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
       String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
       addTableConfig(rawTableName, timeUnit, "daily");
       addSchema(rawTableName, timeUnit);
-      addSegmentZKMetadata(rawTableName, 10 + tableIndex, timeUnit);
+
+      int endTimeInDays = tableIndex;
+      addSegmentZKMetadata(rawTableName, endTimeInDays, timeUnit);
 
       // Should skip real-time external view
       ExternalView externalView = constructRealtimeExternalView(realtimeTableName);
@@ -101,7 +103,7 @@ public class HelixExternalViewBasedTimeBoundaryServiceTest {
       TimeBoundaryInfo timeBoundaryInfo = timeBoundaryService.getTimeBoundaryInfoFor(offlineTableName);
       assertNotNull(timeBoundaryInfo);
       assertEquals(timeBoundaryInfo.getTimeColumn(), TIME_COLUMN);
-      assertEquals(Long.parseLong(timeBoundaryInfo.getTimeValue()), timeUnit.convert(9 + tableIndex, TimeUnit.DAYS));
+      assertEquals(Long.parseLong(timeBoundaryInfo.getTimeValue()), timeUnit.convert(endTimeInDays - 1, TimeUnit.DAYS));
 
       // Test HOURLY push frequency
       addTableConfig(rawTableName, timeUnit, "hourly");
@@ -113,10 +115,10 @@ public class HelixExternalViewBasedTimeBoundaryServiceTest {
       assertEquals(timeBoundaryInfo.getTimeColumn(), TIME_COLUMN);
       long timeValue = Long.parseLong(timeBoundaryInfo.getTimeValue());
       if (timeUnit == TimeUnit.DAYS) {
-        assertEquals(timeValue, timeUnit.convert(9 + tableIndex, TimeUnit.DAYS));
+        assertEquals(timeValue, timeUnit.convert(endTimeInDays - 1, TimeUnit.DAYS));
       } else {
         assertEquals(timeValue,
-            timeUnit.convert(TimeUnit.HOURS.convert(10 + tableIndex, TimeUnit.DAYS) - 1, TimeUnit.HOURS));
+            timeUnit.convert(TimeUnit.HOURS.convert(endTimeInDays, TimeUnit.DAYS) - 1, TimeUnit.HOURS));
       }
 
       tableIndex++;
@@ -137,8 +139,8 @@ public class HelixExternalViewBasedTimeBoundaryServiceTest {
             .build());
   }
 
-  private void addSegmentZKMetadata(String rawTableName, int numSegments, TimeUnit timeUnit) {
-    for (int i = 1; i <= numSegments; i++) {
+  private void addSegmentZKMetadata(String rawTableName, int endTimeInDays, TimeUnit timeUnit) {
+    for (int i = 1; i <= endTimeInDays; i++) {
       OfflineSegmentZKMetadata offlineSegmentZKMetadata = new OfflineSegmentZKMetadata();
       offlineSegmentZKMetadata.setTableName(rawTableName);
       offlineSegmentZKMetadata.setSegmentName(rawTableName + i);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org