You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/07/28 09:40:09 UTC

[incubator-pinot] branch master updated: Refactor HelixBrokerStarterTest (#4473)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d64d8a  Refactor HelixBrokerStarterTest (#4473)
9d64d8a is described below

commit 9d64d8a664defa355664110ca89f2a80c09052b1
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sun Jul 28 02:40:03 2019 -0700

    Refactor HelixBrokerStarterTest (#4473)
    
    - Use condition based wait instead of sleep to fix the flakiness of the test
    - Fix the wrong assert which causes the test to last for more than 1 minute
    - Use the TestUtils.waitForCondition so that the test can fail when condition cannot be met
    - Reduce the default interval for TestUtils.waitForCondition to accelerate the test
    - Remove some un-used class
---
 .../pinot/broker/routing/CfgBasedRouting.java      |  64 ----
 .../EmptyBrokerOnlineOfflineStateModelFactory.java |  70 ----
 ...EmptySegmentOnlineOfflineStateModelFactory.java |  71 -----
 .../broker/broker/HelixBrokerStarterTest.java      | 354 ++++++++-------------
 .../api/PinotInstanceRestletResourceTest.java      |   4 +-
 .../test/java/org/apache/pinot/util/TestUtils.java |  64 +---
 6 files changed, 135 insertions(+), 492 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/CfgBasedRouting.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/CfgBasedRouting.java
deleted file mode 100644
index c7c5d01..0000000
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/CfgBasedRouting.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.routing;
-
-import java.util.List;
-import java.util.Map;
-import org.apache.pinot.transport.config.PerTableRoutingConfig;
-import org.apache.pinot.transport.config.RoutingTableConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class CfgBasedRouting implements RoutingTable {
-  private static final Logger LOGGER = LoggerFactory.getLogger(CfgBasedRouting.class);
-
-  private RoutingTableConfig _cfg;
-
-  public CfgBasedRouting() {
-  }
-
-  public void init(RoutingTableConfig cfg) {
-    _cfg = cfg;
-  }
-
-  @Override
-  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
-    String tableName = request.getTableName();
-    PerTableRoutingConfig cfg = _cfg.getPerTableRoutingCfg().get(tableName);
-
-    if (cfg == null) {
-      LOGGER.warn("Unable to find routing setting for table: {}", tableName);
-      return null;
-    }
-
-    return cfg.buildRequestRoutingMap();
-  }
-
-  @Override
-  public boolean routingTableExists(String tableName) {
-    return _cfg.getPerTableRoutingCfg().containsKey(tableName);
-  }
-
-  @Override
-  public String dumpSnapshot(String tableName)
-      throws Exception {
-    return null;
-  }
-}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/EmptyBrokerOnlineOfflineStateModelFactory.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/EmptyBrokerOnlineOfflineStateModelFactory.java
deleted file mode 100644
index ffecb62..0000000
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/EmptyBrokerOnlineOfflineStateModelFactory.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.broker;
-
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.helix.participant.statemachine.StateModelInfo;
-import org.apache.helix.participant.statemachine.Transition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class EmptyBrokerOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
-
-  @Override
-  public StateModel createNewStateModel(String partitionName) {
-    final EmptyBrokerOnlineOfflineStateModel SegmentOnlineOfflineStateModel = new EmptyBrokerOnlineOfflineStateModel();
-    return SegmentOnlineOfflineStateModel;
-  }
-
-  public EmptyBrokerOnlineOfflineStateModelFactory() {
-  }
-
-  public static String getStateModelDef() {
-    return "BrokerResourceOnlineOfflineStateModel";
-  }
-
-  @StateModelInfo(states = "{'OFFLINE','ONLINE', 'DROPPED'}", initialState = "OFFLINE")
-  public static class EmptyBrokerOnlineOfflineStateModel extends StateModel {
-    private static final Logger LOGGER = LoggerFactory.getLogger(EmptyBrokerOnlineOfflineStateModel.class);
-
-    @Transition(from = "OFFLINE", to = "ONLINE")
-    public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
-      LOGGER.debug("EmptyBrokerOnlineOfflineStateModel.onBecomeOnlineFromOffline() : {}", message);
-    }
-
-    @Transition(from = "ONLINE", to = "OFFLINE")
-    public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
-      LOGGER.debug("EmptyBrokerOnlineOfflineStateModel.onBecomeOfflineFromOnline() : {}", message);
-    }
-
-    @Transition(from = "OFFLINE", to = "DROPPED")
-    public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
-      LOGGER.debug("EmptyBrokerOnlineOfflineStateModel.onBecomeDroppedFromOffline() : {}", message);
-    }
-
-    @Transition(from = "ONLINE", to = "DROPPED")
-    public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
-      LOGGER.debug("EmptyBrokerOnlineOfflineStateModel.onBecomeDroppedFromOnline() : {}", message);
-    }
-  }
-}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/EmptySegmentOnlineOfflineStateModelFactory.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/EmptySegmentOnlineOfflineStateModelFactory.java
deleted file mode 100644
index 6920000..0000000
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/EmptySegmentOnlineOfflineStateModelFactory.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.broker;
-
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.helix.participant.statemachine.StateModelInfo;
-import org.apache.helix.participant.statemachine.Transition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class EmptySegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
-
-  @Override
-  public StateModel createNewStateModel(String partitionName) {
-    final EmptySegmentOnlineOfflineStateModel SegmentOnlineOfflineStateModel =
-        new EmptySegmentOnlineOfflineStateModel();
-    return SegmentOnlineOfflineStateModel;
-  }
-
-  public EmptySegmentOnlineOfflineStateModelFactory() {
-  }
-
-  public static String getStateModelDef() {
-    return "SegmentOnlineOfflineStateModel";
-  }
-
-  @StateModelInfo(states = "{'OFFLINE','ONLINE', 'DROPPED'}", initialState = "OFFLINE")
-  public static class EmptySegmentOnlineOfflineStateModel extends StateModel {
-    private static final Logger LOGGER = LoggerFactory.getLogger(EmptySegmentOnlineOfflineStateModel.class);
-
-    @Transition(from = "OFFLINE", to = "ONLINE")
-    public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
-      LOGGER.debug("EmptySegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : {}", message);
-    }
-
-    @Transition(from = "ONLINE", to = "OFFLINE")
-    public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
-      LOGGER.debug("EmptySegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() : {}", message);
-    }
-
-    @Transition(from = "OFFLINE", to = "DROPPED")
-    public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
-      LOGGER.debug("EmptySegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() : {}", message);
-    }
-
-    @Transition(from = "ONLINE", to = "DROPPED")
-    public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
-      LOGGER.debug("EmptySegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() : {}", message);
-    }
-  }
-}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 49f5557..8ecff02 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -18,287 +18,197 @@
  */
 package org.apache.pinot.broker.broker;
 
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
-import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
 import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
+import org.apache.pinot.broker.routing.RoutingTableLookupRequest;
 import org.apache.pinot.broker.routing.TimeBoundaryService;
-import org.apache.pinot.broker.routing.builder.RoutingTableBuilder;
+import org.apache.pinot.broker.routing.TimeBoundaryService.TimeBoundaryInfo;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Broker;
+import org.apache.pinot.common.utils.CommonConstants.Helix;
+import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
-import org.testng.Assert;
-import org.testng.annotations.AfterTest;
-import org.testng.annotations.BeforeTest;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
-public class HelixBrokerStarterTest extends ControllerTest {
-  private static final int SEGMENT_COUNT = 6;
-  private static final String RAW_DINING_TABLE_NAME = "dining";
-  private static final String DINING_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_DINING_TABLE_NAME);
-  private static final String COFFEE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType("coffee");
 
-  private final Configuration _brokerConf = new BaseConfiguration();
+public class HelixBrokerStarterTest extends ControllerTest {
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+  private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+  private static final String TIME_COLUMN_NAME = "daysSinceEpoch";
+  private static final int NUM_BROKERS = 3;
+  private static final int NUM_SERVERS = 1;
+  private static final int NUM_OFFLINE_SEGMENTS = 5;
 
-  private ZkClient _zkClient;
-  private HelixBrokerStarter _helixBrokerStarter;
-  private ZkStarter.ZookeeperInstance _zookeeperInstance;
+  private HelixBrokerStarter _brokerStarter;
 
-  @BeforeTest
+  @BeforeClass
   public void setUp()
       throws Exception {
-    _zookeeperInstance = ZkStarter.startLocalZkServer();
-    _zkClient = new ZkClient(ZkStarter.DEFAULT_ZK_STR);
-
+    startZk();
     startController();
 
-    _brokerConf.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 8943);
-    _brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, 100L);
-    _helixBrokerStarter = new HelixBrokerStarter(_brokerConf, getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR);
-    _helixBrokerStarter.start();
+    Configuration brokerConf = new BaseConfiguration();
+    brokerConf.addProperty(Helix.KEY_OF_BROKER_QUERY_PORT, 18099);
+    brokerConf.addProperty(Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, 100L);
+    _brokerStarter = new HelixBrokerStarter(brokerConf, getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR);
+    _brokerStarter.start();
 
     ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, 5, true);
+        .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, NUM_BROKERS - 1,
+            true);
     ControllerRequestBuilderUtil
-        .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, 1, true);
-
-    while (_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_OFFLINE").size() == 0
-        || _helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size() == 0) {
-      Thread.sleep(100);
-    }
+        .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, NUM_SERVERS, true);
 
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+        .addTime(TIME_COLUMN_NAME, TimeUnit.DAYS, FieldSpec.DataType.INT).build();
+    _helixResourceManager.addOrUpdateSchema(schema);
     TableConfig offlineTableConfig =
-        new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(RAW_DINING_TABLE_NAME)
-            .setTimeColumnName("timeColumn").setTimeType("DAYS").build();
+        new TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+            .setTimeType(TimeUnit.DAYS.name()).build();
     _helixResourceManager.addTable(offlineTableConfig);
-    setupRealtimeTable();
+    TableConfig realtimeTimeConfig =
+        new TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+            .setTimeType(TimeUnit.DAYS.name()).
+            setStreamConfigs(getStreamConfigs()).build();
+    _helixResourceManager.addTable(realtimeTimeConfig);
 
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < NUM_OFFLINE_SEGMENTS; i++) {
       _helixResourceManager
-          .addNewSegment(DINING_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_DINING_TABLE_NAME),
+          .addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME),
               "downloadUrl");
     }
 
-    Thread.sleep(1000);
-
-    ExternalView externalView = _helixAdmin.getResourceExternalView(getHelixClusterName(), DINING_TABLE_NAME);
-    Assert.assertEquals(externalView.getPartitionSet().size(), 5);
+    TestUtils.waitForCondition(aVoid -> {
+      ExternalView offlineTableExternalView =
+          _helixAdmin.getResourceExternalView(getHelixClusterName(), OFFLINE_TABLE_NAME);
+      return offlineTableExternalView != null
+          && offlineTableExternalView.getPartitionSet().size() == NUM_OFFLINE_SEGMENTS;
+    }, 30_000L, "Failed to find all OFFLINE segments in the ExternalView");
   }
 
-  private void setupRealtimeTable()
-      throws IOException {
-    // Set up the realtime table.
+  private Map<String, String> getStreamConfigs() {
     Map<String, String> streamConfigs = new HashMap<>();
     streamConfigs.put("streamType", "kafka");
     streamConfigs.put("stream.kafka.consumer.type", "highLevel");
     streamConfigs.put("stream.kafka.topic.name", "kafkaTopic");
     streamConfigs
         .put("stream.kafka.decoder.class.name", "org.apache.pinot.core.realtime.impl.kafka.KafkaAvroMessageDecoder");
-    streamConfigs.put("stream.kafka.hlc.zk.connect.string", "localhost:1111/zkConnect");
-    streamConfigs.put("stream.kafka.decoder.prop.schema.registry.rest.url", "http://localhost:2222/schemaRegistry");
-    TableConfig realtimeTimeConfig =
-        new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_DINING_TABLE_NAME)
-            .setTimeColumnName("timeColumn").setTimeType("DAYS").
-            setStreamConfigs(streamConfigs).build();
-    Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_DINING_TABLE_NAME)
-        .addTime("timeColumn", TimeUnit.DAYS, FieldSpec.DataType.INT).build();
-    _helixResourceManager.addOrUpdateSchema(schema);
-    _helixResourceManager.addTable(realtimeTimeConfig);
-    _helixBrokerStarter.getHelixExternalViewBasedRouting()
-        .markDataResourceOnline(realtimeTimeConfig, null, new ArrayList<>());
-  }
-
-  @AfterTest
-  public void tearDown() {
-    _helixResourceManager.stop();
-    _zkClient.close();
-    ZkStarter.stopLocalZkServer(_zookeeperInstance);
+    return streamConfigs;
   }
 
   @Test
   public void testResourceAndTagAssignment()
       throws Exception {
-    IdealState idealState;
-
-    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(),
-        6);
-    idealState =
-        _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
-    Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT);
-
-    ExternalView externalView =
-        _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
-    Assert.assertEquals(externalView.getStateMap(DINING_TABLE_NAME).size(), SEGMENT_COUNT);
-
-    HelixExternalViewBasedRouting helixExternalViewBasedRouting =
-        _helixBrokerStarter.getHelixExternalViewBasedRouting();
-    Field brokerRoutingTableBuilderMapField;
-    brokerRoutingTableBuilderMapField = HelixExternalViewBasedRouting.class.getDeclaredField("_routingTableBuilderMap");
-    brokerRoutingTableBuilderMapField.setAccessible(true);
-
-    final Map<String, RoutingTableBuilder> brokerRoutingTableBuilderMap =
-        (Map<String, RoutingTableBuilder>) brokerRoutingTableBuilderMapField.get(helixExternalViewBasedRouting);
-
-    // Wait up to 30s for routing table to reach the expected size
-    waitForPredicate(new Callable<Boolean>() {
-      @Override
-      public Boolean call()
-          throws Exception {
-        return brokerRoutingTableBuilderMap.size() == 1;
-      }
-    }, 30000L);
-
-    Assert.assertEquals(Arrays.toString(brokerRoutingTableBuilderMap.keySet().toArray()),
-        "[dining_OFFLINE, dining_REALTIME]");
-
-    final String tableName = "coffee";
-    TableConfig tableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(tableName)
-        .setBrokerTenant("testBroker").setServerTenant("testServer").build();
-    _helixResourceManager.addTable(tableConfig);
-
-    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(),
-        6);
-    idealState =
-        _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
-    Assert.assertEquals(idealState.getInstanceSet(COFFEE_TABLE_NAME).size(), SEGMENT_COUNT);
-    Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT);
-
-    // Wait up to 30s for broker external view to reach the expected size
-    waitForPredicate(new Callable<Boolean>() {
-      @Override
-      public Boolean call()
-          throws Exception {
-        return
-            _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
-                .getStateMap(COFFEE_TABLE_NAME).size() == SEGMENT_COUNT;
-      }
-    }, 30000L);
-
-    externalView =
-        _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
-    Assert.assertEquals(externalView.getStateMap(COFFEE_TABLE_NAME).size(), SEGMENT_COUNT);
-
-    // Wait up to 30s for routing table to reach the expected size
-    waitForPredicate(new Callable<Boolean>() {
-      @Override
-      public Boolean call()
-          throws Exception {
-        return brokerRoutingTableBuilderMap.size() == 2;
-      }
-    }, 30000L);
-
-    Object[] tableArray = brokerRoutingTableBuilderMap.keySet().toArray();
-    Arrays.sort(tableArray);
-    Assert.assertEquals(Arrays.toString(tableArray), "[coffee_OFFLINE, dining_OFFLINE, dining_REALTIME]");
-
-    Assert.assertEquals(
-        brokerRoutingTableBuilderMap.get(DINING_TABLE_NAME).getRoutingTables().get(0).values().iterator().next().size(),
-        5);
-
+    assertEquals(
+        _helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), TagNameUtils.getBrokerTagForTenant(null))
+            .size(), NUM_BROKERS);
+
+    IdealState brokerResourceIdealState =
+        _helixAdmin.getResourceIdealState(getHelixClusterName(), Helix.BROKER_RESOURCE_INSTANCE);
+    assertEquals(brokerResourceIdealState.getInstanceSet(OFFLINE_TABLE_NAME).size(), NUM_BROKERS);
+    assertEquals(brokerResourceIdealState.getInstanceSet(REALTIME_TABLE_NAME).size(), NUM_BROKERS);
+
+    ExternalView brokerResourceExternalView =
+        _helixAdmin.getResourceExternalView(getHelixClusterName(), Helix.BROKER_RESOURCE_INSTANCE);
+    assertEquals(brokerResourceExternalView.getStateMap(OFFLINE_TABLE_NAME).size(), NUM_BROKERS);
+    assertEquals(brokerResourceExternalView.getStateMap(REALTIME_TABLE_NAME).size(), NUM_BROKERS);
+
+    HelixExternalViewBasedRouting routing = _brokerStarter.getHelixExternalViewBasedRouting();
+    assertTrue(routing.routingTableExists(OFFLINE_TABLE_NAME));
+    assertTrue(routing.routingTableExists(REALTIME_TABLE_NAME));
+
+    RoutingTableLookupRequest routingTableLookupRequest = new RoutingTableLookupRequest(OFFLINE_TABLE_NAME);
+    Map<String, List<String>> routingTable = routing.getRoutingTable(routingTableLookupRequest);
+    assertEquals(routingTable.size(), NUM_SERVERS);
+    assertEquals(routingTable.values().iterator().next().size(), NUM_OFFLINE_SEGMENTS);
+
+    // Add a new segment into the OFFLINE table
     _helixResourceManager
-        .addNewSegment(DINING_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_DINING_TABLE_NAME),
-            "downloadUrl");
-
-    // Wait up to 30s for external view to reach the expected size
-    waitForPredicate(new Callable<Boolean>() {
-      @Override
-      public Boolean call()
-          throws Exception {
-        return _helixAdmin.getResourceExternalView(getHelixClusterName(), DINING_TABLE_NAME).getPartitionSet().size()
-            == SEGMENT_COUNT;
-      }
-    }, 30000L);
-
-    externalView = _helixAdmin.getResourceExternalView(getHelixClusterName(), DINING_TABLE_NAME);
-    Assert.assertEquals(externalView.getPartitionSet().size(), SEGMENT_COUNT);
-    tableArray = brokerRoutingTableBuilderMap.keySet().toArray();
-    Arrays.sort(tableArray);
-    Assert.assertEquals(Arrays.toString(tableArray), "[coffee_OFFLINE, dining_OFFLINE, dining_REALTIME]");
-
-    // Wait up to 30s for routing table to reach the expected size
-    waitForPredicate(new Callable<Boolean>() {
-      @Override
-      public Boolean call()
-          throws Exception {
-        Map<String, List<String>> routingTable =
-            brokerRoutingTableBuilderMap.get(DINING_TABLE_NAME).getRoutingTables().get(0);
-        return routingTable.values().iterator().next().size() == SEGMENT_COUNT;
-      }
-    }, 30000L);
-
-    Assert.assertEquals(
-        brokerRoutingTableBuilderMap.get(DINING_TABLE_NAME).getRoutingTables().get(0).values().iterator().next().size(),
-        SEGMENT_COUNT);
+        .addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), "downloadUrl");
+
+    TestUtils.waitForCondition(
+        aVoid -> routing.getRoutingTable(routingTableLookupRequest).values().iterator().next().size()
+            == NUM_OFFLINE_SEGMENTS + 1, 30_000L, "Failed to add the new segment into the routing table");
+
+    // Add a new table with different broker tenant
+    String newRawTableName = "newTable";
+    String newOfflineTableName = TableNameBuilder.OFFLINE.tableNameWithType(newRawTableName);
+    TableConfig newTableConfig =
+        new TableConfig.Builder(TableType.OFFLINE).setTableName(newRawTableName).setBrokerTenant("testBroker").build();
+    _helixResourceManager.addTable(newTableConfig);
+
+    // Broker tenant should be overridden to DefaultTenant
+    TableConfig newTableConfigInCluster = _helixResourceManager.getTableConfig(newOfflineTableName);
+    assertNotNull(newTableConfigInCluster);
+    assertEquals(newTableConfigInCluster.getTenantConfig().getBroker(), TagNameUtils.DEFAULT_TENANT_NAME);
+
+    brokerResourceIdealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), Helix.BROKER_RESOURCE_INSTANCE);
+    assertEquals(brokerResourceIdealState.getInstanceSet(newOfflineTableName).size(), NUM_BROKERS);
+
+    TestUtils.waitForCondition(aVoid -> {
+      Map<String, String> newTableStateMap =
+          _helixAdmin.getResourceExternalView(getHelixClusterName(), Helix.BROKER_RESOURCE_INSTANCE)
+              .getStateMap(newOfflineTableName);
+      return newTableStateMap != null && newTableStateMap.size() == NUM_BROKERS;
+    }, 30_000L, "Failed to find all brokers for the new table in the brokerResource ExternalView");
+
+    assertTrue(routing.routingTableExists(newOfflineTableName));
   }
 
+  /**
+   * This test verifies that when the segments of an OFFLINE are refreshed, the TimeBoundaryInfo is also updated.
+   */
   @Test
-  public void testTimeBoundaryUpdate()
-      throws Exception {
-    // This test verifies that when the segments of an offline table are refreshed, the TimeBoundaryInfo is also updated
-    // to a newer timestamp.
-    final long currentTimeBoundary = 10;
-    TimeBoundaryService.TimeBoundaryInfo tbi = _helixBrokerStarter.getHelixExternalViewBasedRouting().
-        getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME);
-
-    Assert.assertEquals(tbi.getTimeValue(), Long.toString(currentTimeBoundary - 1));
-
-    List<String> segmentNames = _helixResourceManager.getSegmentsFor(DINING_TABLE_NAME);
-    long endTime = currentTimeBoundary + 10;
-    // Refresh all 5 segments.
-    for (String segment : segmentNames) {
-      OfflineSegmentZKMetadata offlineSegmentZKMetadata =
-          _helixResourceManager.getOfflineSegmentZKMetadata(RAW_DINING_TABLE_NAME, segment);
-      Assert.assertNotNull(offlineSegmentZKMetadata);
-      _helixResourceManager.refreshSegment(DINING_TABLE_NAME,
-          SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_DINING_TABLE_NAME, segment, endTime++),
-          offlineSegmentZKMetadata);
-    }
-    // Due to the asynchronous nature of the TimeboundaryInfo update and thread scheduling, the updated time boundary
-    // may not always be the max endtime of segments. We do not expect such exact update either as long as the timestamp
-    // is updated to some newer value.
-    waitForPredicate(() -> {
-      TimeBoundaryService.TimeBoundaryInfo timeBoundaryInfo = _helixBrokerStarter.getHelixExternalViewBasedRouting().
-          getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME);
-      return currentTimeBoundary < Long.parseLong(timeBoundaryInfo.getTimeValue());
-    }, 5 * _brokerConf.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL));
-    tbi = _helixBrokerStarter.getHelixExternalViewBasedRouting().
-        getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME);
-    Assert.assertTrue(currentTimeBoundary < Long.parseLong(tbi.getTimeValue()));
+  public void testTimeBoundaryUpdate() {
+    TimeBoundaryService timeBoundaryService = _brokerStarter.getHelixExternalViewBasedRouting().
+        getTimeBoundaryService();
+
+    // Time boundary should be 1 day smaller than the end time
+    int currentEndTime = 10;
+    TimeBoundaryInfo timeBoundaryInfo = timeBoundaryService.getTimeBoundaryInfoFor(OFFLINE_TABLE_NAME);
+    assertEquals(timeBoundaryInfo.getTimeValue(), Integer.toString(currentEndTime - 1));
+
+    // Refresh a segment with a new end time
+    String segmentToRefresh = _helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME).get(0);
+    int newEndTime = currentEndTime + 10;
+    OfflineSegmentZKMetadata segmentZKMetadata =
+        _helixResourceManager.getOfflineSegmentZKMetadata(RAW_TABLE_NAME, segmentToRefresh);
+    _helixResourceManager.refreshSegment(OFFLINE_TABLE_NAME,
+        SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, segmentToRefresh, newEndTime),
+        segmentZKMetadata);
+
+    TestUtils.waitForCondition(aVoid -> timeBoundaryService.getTimeBoundaryInfoFor(OFFLINE_TABLE_NAME).getTimeValue()
+        .equals(Integer.toString(newEndTime - 1)), 30_000L, "Failed to update the time boundary for refreshed segment");
   }
 
-  private void waitForPredicate(Callable<Boolean> predicate, long timeout) {
-    long deadline = System.currentTimeMillis() + timeout;
-    while (System.currentTimeMillis() < deadline) {
-      try {
-        if (predicate.call()) {
-          return;
-        }
-      } catch (Exception e) {
-        // Do nothing
-      }
-
-      Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-    }
+  @AfterClass
+  public void tearDown() {
+    _brokerStarter.shutdown();
+    stopController();
+    stopZk();
   }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
index bfe155b..cb06fc4 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
@@ -69,7 +69,7 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
-    }, 500L, 10_000L, "Expected three instances after creation of tagged instances");
+    }, 10_000L, "Expected three instances after creation of tagged instances");
 
     // Create tagged broker and server instances
     brokerInstance.put("tag", "someTag");
@@ -90,7 +90,7 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
-    }, 500L, 10_000L, "Expected five instances after creation of tagged instances");
+    }, 10_000L, "Expected five instances after creation of tagged instances");
 
     // Create duplicate broker and server instances (both calls should fail)
     try {
diff --git a/pinot-core/src/test/java/org/apache/pinot/util/TestUtils.java b/pinot-core/src/test/java/org/apache/pinot/util/TestUtils.java
index 08d1de6..de17ff4 100644
--- a/pinot-core/src/test/java/org/apache/pinot/util/TestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/util/TestUtils.java
@@ -22,13 +22,9 @@ import com.google.common.base.Function;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.response.broker.GroupByResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -40,10 +36,6 @@ import org.testng.Assert;
  */
 public class TestUtils {
   private static final Logger LOGGER = LoggerFactory.getLogger(TestUtils.class);
-  private static final int testThreshold = 1000;
-
-  public static final double hllEstimationThreshold = 0.5;
-  public static final double digestEstimationThreshold = 0.1;
 
   public static String getFileFromResourceUrl(@Nonnull URL resourceUrl) {
     // For maven cross package use case, we need to extract the resource from jar to a temporary directory.
@@ -65,60 +57,6 @@ public class TestUtils {
   }
 
   /**
-   * assert estimation in error range
-   * @param estimate
-   * @param actual
-   */
-  public static void assertApproximation(double estimate, double actual, double precision) {
-    estimate = Math.abs(estimate);
-    actual = Math.abs(actual);
-
-    if (estimate < testThreshold && actual < testThreshold) {
-      double errorDiff = Math.abs(actual - estimate);
-      LOGGER.debug("estimate: " + estimate + " actual: " + actual + " error (in difference): " + errorDiff);
-      LOGGER.debug("small value comparison ignored!");
-      //Assert.assertEquals(error < 3, true);
-    } else {
-      double errorRate = 1;
-      if (actual > 0) {
-        errorRate = Math.abs((actual - estimate) / actual);
-      }
-      LOGGER.debug("estimate: " + estimate + " actual: " + actual + " error (in rate): " + errorRate);
-      Assert.assertTrue(errorRate < precision);
-    }
-  }
-
-  public static void assertGroupByResultsApproximation(List<GroupByResult> estimateValues,
-      List<GroupByResult> actualValues, double precision) {
-    LOGGER.info("====== assertGroupByResultsApproximation ======");
-    // estimation should not affect number of groups formed
-    Assert.assertEquals(estimateValues.size(), actualValues.size());
-
-    Map<List<String>, Double> mapEstimate = new HashMap<>();
-    Map<List<String>, Double> mapActual = new HashMap<>();
-    for (GroupByResult gby : estimateValues) {
-      mapEstimate.put(gby.getGroup(), Double.parseDouble(gby.getValue().toString()));
-    }
-    for (GroupByResult gby : actualValues) {
-      mapActual.put(gby.getGroup(), Double.parseDouble(gby.getValue().toString()));
-    }
-
-    LOGGER.info("estimate: " + mapEstimate.keySet());
-    LOGGER.info("actual: " + mapActual.keySet());
-
-    int cnt = 0;
-    for (List<String> key : mapEstimate.keySet()) {
-      // Not strictly enforced, since in quantile, top 100 groups from accurate maybe not be top 100 from estimate
-      // Assert.assertEquals(mapActual.keySet().contains(key), true);
-      if (mapActual.keySet().contains(key)) {
-        assertApproximation(mapEstimate.get(key), mapActual.get(key), precision);
-        cnt += 1;
-      }
-    }
-    LOGGER.info("group overlap rate: " + (cnt + 0.0) / mapEstimate.keySet().size());
-  }
-
-  /**
    * Ensure the given directories exist and are empty.
    *
    * @param dirs Directories to be cleared
@@ -159,6 +97,6 @@ public class TestUtils {
 
   public static void waitForCondition(Function<Void, Boolean> condition, long timeoutMs,
       @Nullable String errorMessage) {
-    waitForCondition(condition, 1000L, timeoutMs, errorMessage);
+    waitForCondition(condition, 100L, timeoutMs, errorMessage);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org