You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/05/01 21:02:59 UTC

[incubator-pinot] branch master updated: Refactor HelixExternalViewBasedTimeBoundaryService to support all time units (#4156)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0680437  Refactor HelixExternalViewBasedTimeBoundaryService to support all time units (#4156)
0680437 is described below

commit 0680437c7619da3da1acd23445c0661d6a113f96
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed May 1 14:02:53 2019 -0700

    Refactor HelixExternalViewBasedTimeBoundaryService to support all time units (#4156)
    
    Currently we pick the segment end time as the time boundary, and
    append filter 'timeColumn < boundary' to offline table and filter
    'timeColumn >= boundary' to realtime table to achieve the hybrid
    table federation. The problem with this is that, if the time unit
    is not DAYS (for example, MILLISECONDS), and the offline table has
    multiple daily segments to push, then we might get incomplete
    result before all offline segments are pushed.
    
    The solution is: always use (end time - 1 HOUR/DAY) as the time
    boundary, append filter 'timeColumn <= boundary' to offline table
    and 'timeColumn > boundary' to realtime table. This can ensure
    all HOURLY and DAILY pushed segments be covered regardless of the
    time unit, but also use as much offline side data as possible.
    
    For HOURLY push table with time unit other than DAYS, use
    (maxTimeValue - 1 HOUR) as the time boundary; otherwise, use
    (maxTimeValue - 1 DAY)
    
    Also, add checks for the time consistency between table config
    and schema. We should use the time spec in schema as the source of
    truth for time column because data is generated based on the
    schema. In the future we might remove the timeType field from the
    SegmentsValidationAndRetentionConfig.
---
 .../requesthandler/BaseBrokerRequestHandler.java   |   5 +-
 .../HelixExternalViewBasedTimeBoundaryService.java | 118 +++++++++------
 .../pinot/broker/routing/TimeBoundaryService.java  |  15 +-
 .../broker/broker/HelixBrokerStarterTest.java      |   7 +-
 ...ixExternalViewBasedTimeBoundaryServiceTest.java | 166 +++++++++++++++++++++
 .../broker/routing/TimeBoundaryServiceTest.java    | 130 ----------------
 .../pinot/common/metadata/ZKMetadataProvider.java  |   5 +
 7 files changed, 255 insertions(+), 191 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 6981280..7bc689f 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -428,8 +428,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   private void attachTimeBoundary(String rawTableName, BrokerRequest brokerRequest, boolean isOfflineRequest) {
     TimeBoundaryService.TimeBoundaryInfo timeBoundaryInfo =
         _timeBoundaryService.getTimeBoundaryInfoFor(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName));
-    if (timeBoundaryInfo == null || timeBoundaryInfo.getTimeColumn() == null
-        || timeBoundaryInfo.getTimeValue() == null) {
+    if (timeBoundaryInfo == null) {
       LOGGER.warn("Failed to find time boundary info for hybrid table: {}", rawTableName);
       return;
     }
@@ -440,7 +439,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     timeFilterQuery.setId(-1);
     timeFilterQuery.setColumn(timeBoundaryInfo.getTimeColumn());
     String timeValue = timeBoundaryInfo.getTimeValue();
-    String filterValue = isOfflineRequest ? "(*\t\t" + timeValue + ")" : "[" + timeValue + "\t\t*)";
+    String filterValue = isOfflineRequest ? "(*\t\t" + timeValue + "]" : "(" + timeValue + "\t\t*)";
     timeFilterQuery.setValue(Collections.singletonList(filterValue));
     timeFilterQuery.setOperator(FilterOperator.RANGE);
     timeFilterQuery.setNestedFilterQueryIds(Collections.emptyList());
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryService.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryService.java
index d6e1adf..549818c 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryService.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryService.java
@@ -27,8 +27,10 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
@@ -48,77 +50,99 @@ public class HelixExternalViewBasedTimeBoundaryService implements TimeBoundarySe
   }
 
   public void updateTimeBoundaryService(ExternalView externalView) {
-    if (_propertyStore == null) {
-      return;
-    }
-    String tableName = externalView.getResourceName();
-    // Do nothing for realtime table.
-    if (TableNameBuilder.getTableTypeFromTableName(tableName) == TableType.REALTIME) {
+    String tableNameWithType = externalView.getResourceName();
+
+    // Skip real-time table, only use offline table to update the time boundary
+    if (TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == TableType.REALTIME) {
       return;
     }
 
     Set<String> offlineSegmentsServing = externalView.getPartitionSet();
     if (offlineSegmentsServing.isEmpty()) {
-      LOGGER.info("Skipping updating time boundary service for table '{}' with no offline segments.", tableName);
+      LOGGER.warn("Skipping updating time boundary for table: '{}' with no offline segment", tableNameWithType);
       return;
     }
 
-    TableConfig offlineTableConfig = ZKMetadataProvider.getOfflineTableConfig(_propertyStore, tableName);
-    assert offlineTableConfig != null;
-    TimeUnit tableTimeUnit = offlineTableConfig.getValidationConfig().getTimeType();
-    if (tableTimeUnit == null) {
-      LOGGER.info("Skipping updating time boundary service for table '{}' because time unit is not set", tableName);
+    // TODO: when we start using dateTime, pick the time column from the retention config, and use the DateTimeFieldSpec
+    //       from the schema to determine the time unit
+    // TODO: support SDF
+    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableNameWithType);
+    assert schema != null;
+    String timeColumn = schema.getTimeColumnName();
+    TimeUnit tableTimeUnit = schema.getOutgoingTimeUnit();
+    if (timeColumn == null || tableTimeUnit == null) {
+      LOGGER.error("Skipping updating time boundary for table: '{}' because time column/unit is not set",
+          tableNameWithType);
       return;
     }
 
-    // Bulk reading all segment zk-metadata at once is more efficient than reading one at a time.
-    List<OfflineSegmentZKMetadata> segmentZKMetadataList =
-        ZKMetadataProvider.getOfflineSegmentZKMetadataListForTable(_propertyStore, tableName);
-
-    long maxTimeValue = computeMaxSegmentEndTimeForTable(segmentZKMetadataList, tableTimeUnit);
-    TimeBoundaryInfo timeBoundaryInfo = new TimeBoundaryInfo();
-    timeBoundaryInfo.setTimeColumn(offlineTableConfig.getValidationConfig().getTimeColumnName());
+    TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+    assert tableConfig != null;
+    SegmentsValidationAndRetentionConfig retentionConfig = tableConfig.getValidationConfig();
+    if (!timeColumn.equals(retentionConfig.getTimeColumnName())) {
+      LOGGER.error("Time column does not match in schema: '{}' and table config: '{}'", timeColumn,
+          retentionConfig.getTimeColumnName());
+    }
+    if (tableTimeUnit != retentionConfig.getTimeType()) {
+      LOGGER.error("Time unit does not match in schema: '{}' and table config: '{}'", tableTimeUnit,
+          retentionConfig.getTimeType());
+    }
 
-    timeBoundaryInfo.setTimeValue(Long.toString(maxTimeValue));
-    _timeBoundaryInfoMap.put(tableName, timeBoundaryInfo);
+    // Bulk reading all segment ZK metadata is more efficient than reading one at a time
+    List<OfflineSegmentZKMetadata> segmentZKMetadataList =
+        ZKMetadataProvider.getOfflineSegmentZKMetadataListForTable(_propertyStore, tableNameWithType);
 
-    LOGGER.info("Updated time boundary service for table '{}', maxTime: {}", tableName, maxTimeValue);
-  }
+    long maxTimeValue = -1L;
+    for (OfflineSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+      String segmentName = segmentZKMetadata.getSegmentName();
 
-  /**
-   * Compute maximum end time across a list of segment zk-metadata.
-   *
-   * @param segmentZKMetadataList List of Segment zk metadata for which to compute the max end time.
-   * @param tableTimeUnit Time Unit for table
-   * @return Max end time across all segments.
-   */
-  private long computeMaxSegmentEndTimeForTable(List<OfflineSegmentZKMetadata> segmentZKMetadataList,
-      TimeUnit tableTimeUnit) {
-    long maxTimeValue = -1;
-
-    for (OfflineSegmentZKMetadata metadata : segmentZKMetadataList) {
-      long endTime = metadata.getEndTime();
-      if (endTime <= 0) {
+      // Only consider segments in the external view
+      if (!offlineSegmentsServing.contains(segmentName)) {
+        LOGGER.warn("Skipping processing segment: '{}' for table: '{}' because it does not exist in the external view",
+            segmentName, tableNameWithType);
         continue;
       }
 
-      // Convert all segment times to table's time unit, before comparison.
-      TimeUnit segmentTimeUnit = metadata.getTimeUnit();
-      if (segmentTimeUnit != null) {
-        endTime = tableTimeUnit.convert(endTime, segmentTimeUnit);
+      long segmentEndTime = segmentZKMetadata.getEndTime();
+      if (segmentEndTime <= 0) {
+        LOGGER
+            .error("Skipping processing segment: '{}' for table: '{}' because the end time: {} is illegal", segmentName,
+                tableNameWithType, segmentEndTime);
+        continue;
       }
-      maxTimeValue = Math.max(maxTimeValue, endTime);
+
+      // Convert segment end time into table time unit
+      // NOTE: for now, time unit in segment ZK metadata should always match table time unit, but in the future we might
+      //       want to always use MILLISECONDS as the time unit in segment ZK metadata
+      maxTimeValue = Math.max(maxTimeValue, tableTimeUnit.convert(segmentEndTime, segmentZKMetadata.getTimeUnit()));
     }
-    return maxTimeValue;
-  }
 
-  @Override
-  public void remove(String tableName) {
-    _timeBoundaryInfoMap.remove(tableName);
+    if (maxTimeValue == -1L) {
+      LOGGER.error("Skipping updating time boundary for table: '{}' because no segment contains valid end time",
+          tableNameWithType);
+      return;
+    }
+
+    // For HOURLY push table with time unit other than DAYS, use (maxTimeValue - 1 HOUR) as the time boundary
+    // Otherwise, use (maxTimeValue - 1 DAY)
+    long timeBoundary;
+    if ("HOURLY".equalsIgnoreCase(retentionConfig.getSegmentPushFrequency()) && tableTimeUnit != TimeUnit.DAYS) {
+      timeBoundary = maxTimeValue - tableTimeUnit.convert(1L, TimeUnit.HOURS);
+    } else {
+      timeBoundary = maxTimeValue - tableTimeUnit.convert(1L, TimeUnit.DAYS);
+    }
+
+    LOGGER.info("Updated time boundary for table: '{}' to: {} {}", tableNameWithType, timeBoundary, tableTimeUnit);
+    _timeBoundaryInfoMap.put(tableNameWithType, new TimeBoundaryInfo(timeColumn, Long.toString(timeBoundary)));
   }
 
   @Override
   public TimeBoundaryInfo getTimeBoundaryInfoFor(String table) {
     return _timeBoundaryInfoMap.get(table);
   }
+
+  @Override
+  public void remove(String tableName) {
+    _timeBoundaryInfoMap.remove(tableName);
+  }
 }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/TimeBoundaryService.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/TimeBoundaryService.java
index cbe6db1..382c7bb 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/TimeBoundaryService.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/TimeBoundaryService.java
@@ -30,18 +30,25 @@ public interface TimeBoundaryService {
    * @param table
    * @return
    */
+  // TODO: pass in raw table name instead of offline table name
   TimeBoundaryInfo getTimeBoundaryInfoFor(String table);
 
   /**
    * Remove a table from TimeBoundaryService
    * @param tableName
    */
+  // TODO: pass in raw table name instead of offline table name
   void remove(String tableName);
 
   class TimeBoundaryInfo {
     private String _timeColumn;
     private String _timeValue;
 
+    public TimeBoundaryInfo(String timeColumn, String timeValue) {
+      _timeColumn = timeColumn;
+      _timeValue = timeValue;
+    }
+
     public String getTimeColumn() {
       return _timeColumn;
     }
@@ -50,14 +57,6 @@ public interface TimeBoundaryService {
       return _timeValue;
     }
 
-    public void setTimeColumn(String timeColumn) {
-      _timeColumn = timeColumn;
-    }
-
-    public void setTimeValue(String timeValue) {
-      _timeValue = timeValue;
-    }
-
     public String toJsonString()
         throws JsonProcessingException {
       return JsonUtils.objectToPrettyString(this);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 1dfa1d0..4ac704f 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -39,6 +39,7 @@ import org.apache.pinot.broker.routing.TimeBoundaryService;
 import org.apache.pinot.broker.routing.builder.RoutingTableBuilder;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.utils.CommonConstants;
@@ -119,8 +120,8 @@ public class HelixBrokerStarterTest extends ControllerTest {
         new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_DINING_TABLE_NAME)
             .setTimeColumnName("timeColumn").setTimeType("DAYS").
             setStreamConfigs(streamConfigs).build();
-    Schema schema = new Schema();
-    schema.setSchemaName(RAW_DINING_TABLE_NAME);
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_DINING_TABLE_NAME)
+        .addTime("timeColumn", TimeUnit.DAYS, FieldSpec.DataType.INT).build();
     _helixResourceManager.addOrUpdateSchema(schema);
     _helixResourceManager.addTable(realtimeTimeConfig);
     _helixBrokerStarter.getHelixExternalViewBasedRouting()
@@ -258,7 +259,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
     TimeBoundaryService.TimeBoundaryInfo tbi = _helixBrokerStarter.getHelixExternalViewBasedRouting().
         getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME);
 
-    Assert.assertEquals(tbi.getTimeValue(), Long.toString(currentTimeBoundary));
+    Assert.assertEquals(tbi.getTimeValue(), Long.toString(currentTimeBoundary - 1));
 
     List<String> segmentNames = _helixResourceManager.getSegmentsFor(DINING_TABLE_NAME);
     long endTime = currentTimeBoundary + 10;
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryServiceTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryServiceTest.java
new file mode 100644
index 0000000..20ccc9c
--- /dev/null
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryServiceTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.StringUtils;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.broker.routing.TimeBoundaryService.TimeBoundaryInfo;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+
+public class HelixExternalViewBasedTimeBoundaryServiceTest {
+  private static final String TIME_COLUMN = "time";
+
+  private ZkStarter.ZookeeperInstance _zookeeperInstance;
+  private ZkClient _zkClient;
+  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+
+  @BeforeClass
+  public void setUp() {
+    _zookeeperInstance = ZkStarter.startLocalZkServer();
+
+    _zkClient = new ZkClient(StringUtil.join("/", StringUtils.chomp(ZkStarter.DEFAULT_ZK_STR, "/")),
+        ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+    String helixClusterName = "TestTimeBoundaryService";
+    _zkClient.deleteRecursively("/" + helixClusterName + "/PROPERTYSTORE");
+    _zkClient.createPersistent("/" + helixClusterName + "/PROPERTYSTORE", true);
+    _propertyStore = new ZkHelixPropertyStore<>(new ZkBaseDataAccessor<ZNRecord>(_zkClient),
+        "/" + helixClusterName + "/PROPERTYSTORE", null);
+  }
+
+  @AfterClass
+  public void tearDown() {
+    _zkClient.close();
+    ZkStarter.stopLocalZkServer(_zookeeperInstance);
+  }
+
+  @Test
+  public void testExternalViewBasedTimeBoundaryService()
+      throws Exception {
+    HelixExternalViewBasedTimeBoundaryService timeBoundaryService =
+        new HelixExternalViewBasedTimeBoundaryService(_propertyStore);
+
+    int tableIndex = 1;
+    for (TimeUnit timeUnit : TimeUnit.values()) {
+      String rawTableName = "table" + tableIndex;
+      String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+      addTableConfig(rawTableName, timeUnit, "daily");
+      addSchema(rawTableName, timeUnit);
+
+      int endTimeInDays = tableIndex;
+      addSegmentZKMetadata(rawTableName, endTimeInDays, timeUnit);
+
+      // Should skip real-time external view
+      ExternalView externalView = constructRealtimeExternalView(realtimeTableName);
+      timeBoundaryService.updateTimeBoundaryService(externalView);
+      assertNull(timeBoundaryService.getTimeBoundaryInfoFor(rawTableName));
+      assertNull(timeBoundaryService.getTimeBoundaryInfoFor(realtimeTableName));
+      assertNull(timeBoundaryService.getTimeBoundaryInfoFor(offlineTableName));
+
+      externalView = constructOfflineExternalView(offlineTableName);
+      timeBoundaryService.updateTimeBoundaryService(externalView);
+      assertNull(timeBoundaryService.getTimeBoundaryInfoFor(rawTableName));
+      assertNull(timeBoundaryService.getTimeBoundaryInfoFor(realtimeTableName));
+      TimeBoundaryInfo timeBoundaryInfo = timeBoundaryService.getTimeBoundaryInfoFor(offlineTableName);
+      assertNotNull(timeBoundaryInfo);
+      assertEquals(timeBoundaryInfo.getTimeColumn(), TIME_COLUMN);
+      assertEquals(Long.parseLong(timeBoundaryInfo.getTimeValue()), timeUnit.convert(endTimeInDays - 1, TimeUnit.DAYS));
+
+      // Test HOURLY push frequency
+      addTableConfig(rawTableName, timeUnit, "hourly");
+      timeBoundaryService.updateTimeBoundaryService(externalView);
+      assertNull(timeBoundaryService.getTimeBoundaryInfoFor(rawTableName));
+      assertNull(timeBoundaryService.getTimeBoundaryInfoFor(realtimeTableName));
+      timeBoundaryInfo = timeBoundaryService.getTimeBoundaryInfoFor(offlineTableName);
+      assertNotNull(timeBoundaryInfo);
+      assertEquals(timeBoundaryInfo.getTimeColumn(), TIME_COLUMN);
+      long timeValue = Long.parseLong(timeBoundaryInfo.getTimeValue());
+      if (timeUnit == TimeUnit.DAYS) {
+        assertEquals(timeValue, timeUnit.convert(endTimeInDays - 1, TimeUnit.DAYS));
+      } else {
+        assertEquals(timeValue,
+            timeUnit.convert(TimeUnit.HOURS.convert(endTimeInDays, TimeUnit.DAYS) - 1, TimeUnit.HOURS));
+      }
+
+      tableIndex++;
+    }
+  }
+
+  private void addTableConfig(String rawTableName, TimeUnit timeUnit, String pushFrequency)
+      throws Exception {
+    ZKMetadataProvider.setOfflineTableConfig(_propertyStore, TableNameBuilder.OFFLINE.tableNameWithType(rawTableName),
+        new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(rawTableName)
+            .setTimeColumnName(TIME_COLUMN).setTimeType(timeUnit.name()).setSegmentPushFrequency(pushFrequency).build()
+            .toZNRecord());
+  }
+
+  private void addSchema(String rawTableName, TimeUnit timeUnit) {
+    ZKMetadataProvider.setSchema(_propertyStore,
+        new Schema.SchemaBuilder().setSchemaName(rawTableName).addTime(TIME_COLUMN, timeUnit, FieldSpec.DataType.LONG)
+            .build());
+  }
+
+  private void addSegmentZKMetadata(String rawTableName, int endTimeInDays, TimeUnit timeUnit) {
+    for (int i = 1; i <= endTimeInDays; i++) {
+      OfflineSegmentZKMetadata offlineSegmentZKMetadata = new OfflineSegmentZKMetadata();
+      offlineSegmentZKMetadata.setTableName(rawTableName);
+      offlineSegmentZKMetadata.setSegmentName(rawTableName + i);
+      offlineSegmentZKMetadata.setEndTime(i * timeUnit.convert(1L, TimeUnit.DAYS));
+      offlineSegmentZKMetadata.setTimeUnit(timeUnit);
+      ZKMetadataProvider.setOfflineSegmentZKMetadata(_propertyStore, offlineSegmentZKMetadata);
+    }
+  }
+
+  private ExternalView constructOfflineExternalView(String offlineTableName) {
+    ExternalView externalView = new ExternalView(offlineTableName);
+    List<OfflineSegmentZKMetadata> segmentZKMetadataList =
+        ZKMetadataProvider.getOfflineSegmentZKMetadataListForTable(_propertyStore, offlineTableName);
+    for (OfflineSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+      externalView.setState(segmentZKMetadata.getSegmentName(), "localhost", "ONLINE");
+    }
+    return externalView;
+  }
+
+  private ExternalView constructRealtimeExternalView(String realtimeTableName) {
+    return new ExternalView(realtimeTableName);
+  }
+}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/TimeBoundaryServiceTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/TimeBoundaryServiceTest.java
deleted file mode 100644
index 31e6839..0000000
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/TimeBoundaryServiceTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.routing;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang.StringUtils;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.pinot.broker.routing.TimeBoundaryService.TimeBoundaryInfo;
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.CommonConstants.Segment.SegmentType;
-import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.testng.Assert;
-import org.testng.annotations.AfterTest;
-import org.testng.annotations.BeforeTest;
-import org.testng.annotations.Test;
-
-
-public class TimeBoundaryServiceTest {
-  private ZkClient _zkClient;
-  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  private ZkStarter.ZookeeperInstance _zookeeperInstance;
-
-  @BeforeTest
-  public void beforeTest() {
-    _zookeeperInstance = ZkStarter.startLocalZkServer();
-
-    _zkClient = new ZkClient(StringUtil.join("/", StringUtils.chomp(ZkStarter.DEFAULT_ZK_STR, "/")),
-        ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
-    String helixClusterName = "TestTimeBoundaryService";
-    _zkClient.deleteRecursively("/" + helixClusterName + "/PROPERTYSTORE");
-    _zkClient.createPersistent("/" + helixClusterName + "/PROPERTYSTORE", true);
-    _propertyStore = new ZkHelixPropertyStore<>(new ZkBaseDataAccessor<ZNRecord>(_zkClient),
-        "/" + helixClusterName + "/PROPERTYSTORE", null);
-  }
-
-  @AfterTest
-  public void afterTest() {
-    _zkClient.close();
-    ZkStarter.stopLocalZkServer(_zookeeperInstance);
-  }
-
-  @Test
-  public void testExternalViewBasedTimeBoundaryService()
-      throws Exception {
-    addingTableToPropertyStore("testResource0");
-    addingTableToPropertyStore("testResource1");
-    HelixExternalViewBasedTimeBoundaryService tbs = new HelixExternalViewBasedTimeBoundaryService(_propertyStore);
-    addingSegmentsToPropertyStore(5, "testResource0");
-    ExternalView externalView = constructExternalView("testResource0");
-
-    tbs.updateTimeBoundaryService(externalView);
-    TimeBoundaryInfo tbi = tbs.getTimeBoundaryInfoFor("testResource0");
-    Assert.assertEquals(tbi.getTimeColumn(), "timestamp");
-    Assert.assertEquals(tbi.getTimeValue(), "4");
-
-    addingSegmentsToPropertyStore(50, "testResource1");
-    externalView = constructExternalView("testResource1");
-    tbs.updateTimeBoundaryService(externalView);
-    tbi = tbs.getTimeBoundaryInfoFor("testResource1");
-    Assert.assertEquals(tbi.getTimeColumn(), "timestamp");
-    Assert.assertEquals(tbi.getTimeValue(), "49");
-
-    addingSegmentsToPropertyStore(50, "testResource0");
-    externalView = constructExternalView("testResource0");
-    tbs.updateTimeBoundaryService(externalView);
-    tbi = tbs.getTimeBoundaryInfoFor("testResource0");
-    Assert.assertEquals(tbi.getTimeColumn(), "timestamp");
-    Assert.assertEquals(tbi.getTimeValue(), "49");
-  }
-
-  private ExternalView constructExternalView(String tableName) {
-    ExternalView externalView = new ExternalView(tableName);
-    List<OfflineSegmentZKMetadata> offlineResourceZKMetadataListForResource =
-        ZKMetadataProvider.getOfflineSegmentZKMetadataListForTable(_propertyStore, tableName);
-    for (OfflineSegmentZKMetadata segmentMetadata : offlineResourceZKMetadataListForResource) {
-      externalView.setState(segmentMetadata.getSegmentName(), "localhost", "ONLINE");
-    }
-    return externalView;
-  }
-
-  private void addingSegmentsToPropertyStore(int numSegments, String tableName) {
-    for (int i = 0; i < numSegments; ++i) {
-      OfflineSegmentZKMetadata offlineSegmentZKMetadata = new OfflineSegmentZKMetadata();
-      offlineSegmentZKMetadata.setSegmentName(tableName + "_" + System.currentTimeMillis() + "_" + i);
-      offlineSegmentZKMetadata.setTableName(tableName);
-      offlineSegmentZKMetadata.setStartTime(i - 1);
-      offlineSegmentZKMetadata.setEndTime(i);
-      offlineSegmentZKMetadata.setTimeUnit(TimeUnit.DAYS);
-      offlineSegmentZKMetadata.setCrc(-1);
-      offlineSegmentZKMetadata.setCreationTime(-1);
-      offlineSegmentZKMetadata.setIndexVersion("0");
-      offlineSegmentZKMetadata.setPushTime(i + 5);
-      offlineSegmentZKMetadata.setSegmentType(SegmentType.OFFLINE);
-      ZKMetadataProvider.setOfflineSegmentZKMetadata(_propertyStore, offlineSegmentZKMetadata);
-    }
-  }
-
-  private void addingTableToPropertyStore(String tableName)
-      throws Exception {
-    TableConfig tableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(tableName)
-        .setTimeColumnName("timestamp").setTimeType("DAYS").build();
-    ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableConfig.getTableName(), tableConfig.toZNRecord());
-  }
-}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 6634b27..bb9d1c2 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -240,6 +240,11 @@ public class ZKMetadataProvider {
     return getTableConfig(propertyStore, TableNameBuilder.REALTIME.tableNameWithType(tableName));
   }
 
+  public static void setSchema(ZkHelixPropertyStore<ZNRecord> propertyStore, Schema schema) {
+    propertyStore.set(constructPropertyStorePathForSchema(schema.getSchemaName()), SchemaUtils.toZNRecord(schema),
+        AccessOption.PERSISTENT);
+  }
+
   @Nullable
   public static Schema getSchema(@Nonnull ZkHelixPropertyStore<ZNRecord> propertyStore, @Nonnull String schemaName) {
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org