You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/07/28 09:40:09 UTC
[incubator-pinot] branch master updated: Refactor
HelixBrokerStarterTest (#4473)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9d64d8a Refactor HelixBrokerStarterTest (#4473)
9d64d8a is described below
commit 9d64d8a664defa355664110ca89f2a80c09052b1
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sun Jul 28 02:40:03 2019 -0700
Refactor HelixBrokerStarterTest (#4473)
- Use condition based wait instead of sleep to fix the flakiness of the test
- Fix the wrong assert which causes the test to last for more than 1 minute
- Use the TestUtils.waitForCondition so that the test can fail when condition cannot be met
- Reduce the default interval for TestUtils.waitForCondition to accelerate the test
- Remove some un-used class
---
.../pinot/broker/routing/CfgBasedRouting.java | 64 ----
.../EmptyBrokerOnlineOfflineStateModelFactory.java | 70 ----
...EmptySegmentOnlineOfflineStateModelFactory.java | 71 -----
.../broker/broker/HelixBrokerStarterTest.java | 354 ++++++++-------------
.../api/PinotInstanceRestletResourceTest.java | 4 +-
.../test/java/org/apache/pinot/util/TestUtils.java | 64 +---
6 files changed, 135 insertions(+), 492 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/CfgBasedRouting.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/CfgBasedRouting.java
deleted file mode 100644
index c7c5d01..0000000
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/CfgBasedRouting.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.routing;
-
-import java.util.List;
-import java.util.Map;
-import org.apache.pinot.transport.config.PerTableRoutingConfig;
-import org.apache.pinot.transport.config.RoutingTableConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class CfgBasedRouting implements RoutingTable {
- private static final Logger LOGGER = LoggerFactory.getLogger(CfgBasedRouting.class);
-
- private RoutingTableConfig _cfg;
-
- public CfgBasedRouting() {
- }
-
- public void init(RoutingTableConfig cfg) {
- _cfg = cfg;
- }
-
- @Override
- public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
- String tableName = request.getTableName();
- PerTableRoutingConfig cfg = _cfg.getPerTableRoutingCfg().get(tableName);
-
- if (cfg == null) {
- LOGGER.warn("Unable to find routing setting for table: {}", tableName);
- return null;
- }
-
- return cfg.buildRequestRoutingMap();
- }
-
- @Override
- public boolean routingTableExists(String tableName) {
- return _cfg.getPerTableRoutingCfg().containsKey(tableName);
- }
-
- @Override
- public String dumpSnapshot(String tableName)
- throws Exception {
- return null;
- }
-}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/EmptyBrokerOnlineOfflineStateModelFactory.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/EmptyBrokerOnlineOfflineStateModelFactory.java
deleted file mode 100644
index ffecb62..0000000
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/EmptyBrokerOnlineOfflineStateModelFactory.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.broker;
-
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.helix.participant.statemachine.StateModelInfo;
-import org.apache.helix.participant.statemachine.Transition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class EmptyBrokerOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
-
- @Override
- public StateModel createNewStateModel(String partitionName) {
- final EmptyBrokerOnlineOfflineStateModel SegmentOnlineOfflineStateModel = new EmptyBrokerOnlineOfflineStateModel();
- return SegmentOnlineOfflineStateModel;
- }
-
- public EmptyBrokerOnlineOfflineStateModelFactory() {
- }
-
- public static String getStateModelDef() {
- return "BrokerResourceOnlineOfflineStateModel";
- }
-
- @StateModelInfo(states = "{'OFFLINE','ONLINE', 'DROPPED'}", initialState = "OFFLINE")
- public static class EmptyBrokerOnlineOfflineStateModel extends StateModel {
- private static final Logger LOGGER = LoggerFactory.getLogger(EmptyBrokerOnlineOfflineStateModel.class);
-
- @Transition(from = "OFFLINE", to = "ONLINE")
- public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
- LOGGER.debug("EmptyBrokerOnlineOfflineStateModel.onBecomeOnlineFromOffline() : {}", message);
- }
-
- @Transition(from = "ONLINE", to = "OFFLINE")
- public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
- LOGGER.debug("EmptyBrokerOnlineOfflineStateModel.onBecomeOfflineFromOnline() : {}", message);
- }
-
- @Transition(from = "OFFLINE", to = "DROPPED")
- public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
- LOGGER.debug("EmptyBrokerOnlineOfflineStateModel.onBecomeDroppedFromOffline() : {}", message);
- }
-
- @Transition(from = "ONLINE", to = "DROPPED")
- public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
- LOGGER.debug("EmptyBrokerOnlineOfflineStateModel.onBecomeDroppedFromOnline() : {}", message);
- }
- }
-}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/EmptySegmentOnlineOfflineStateModelFactory.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/EmptySegmentOnlineOfflineStateModelFactory.java
deleted file mode 100644
index 6920000..0000000
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/EmptySegmentOnlineOfflineStateModelFactory.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.broker;
-
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.helix.participant.statemachine.StateModelInfo;
-import org.apache.helix.participant.statemachine.Transition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class EmptySegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
-
- @Override
- public StateModel createNewStateModel(String partitionName) {
- final EmptySegmentOnlineOfflineStateModel SegmentOnlineOfflineStateModel =
- new EmptySegmentOnlineOfflineStateModel();
- return SegmentOnlineOfflineStateModel;
- }
-
- public EmptySegmentOnlineOfflineStateModelFactory() {
- }
-
- public static String getStateModelDef() {
- return "SegmentOnlineOfflineStateModel";
- }
-
- @StateModelInfo(states = "{'OFFLINE','ONLINE', 'DROPPED'}", initialState = "OFFLINE")
- public static class EmptySegmentOnlineOfflineStateModel extends StateModel {
- private static final Logger LOGGER = LoggerFactory.getLogger(EmptySegmentOnlineOfflineStateModel.class);
-
- @Transition(from = "OFFLINE", to = "ONLINE")
- public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
- LOGGER.debug("EmptySegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : {}", message);
- }
-
- @Transition(from = "ONLINE", to = "OFFLINE")
- public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
- LOGGER.debug("EmptySegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() : {}", message);
- }
-
- @Transition(from = "OFFLINE", to = "DROPPED")
- public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
- LOGGER.debug("EmptySegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() : {}", message);
- }
-
- @Transition(from = "ONLINE", to = "DROPPED")
- public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
- LOGGER.debug("EmptySegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() : {}", message);
- }
- }
-}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 49f5557..8ecff02 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -18,287 +18,197 @@
*/
package org.apache.pinot.broker.broker;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
-import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
+import org.apache.pinot.broker.routing.RoutingTableLookupRequest;
import org.apache.pinot.broker.routing.TimeBoundaryService;
-import org.apache.pinot.broker.routing.builder.RoutingTableBuilder;
+import org.apache.pinot.broker.routing.TimeBoundaryService.TimeBoundaryInfo;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Broker;
+import org.apache.pinot.common.utils.CommonConstants.Helix;
+import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
-import org.testng.Assert;
-import org.testng.annotations.AfterTest;
-import org.testng.annotations.BeforeTest;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
-public class HelixBrokerStarterTest extends ControllerTest {
- private static final int SEGMENT_COUNT = 6;
- private static final String RAW_DINING_TABLE_NAME = "dining";
- private static final String DINING_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_DINING_TABLE_NAME);
- private static final String COFFEE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType("coffee");
- private final Configuration _brokerConf = new BaseConfiguration();
+public class HelixBrokerStarterTest extends ControllerTest {
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+ private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+ private static final String TIME_COLUMN_NAME = "daysSinceEpoch";
+ private static final int NUM_BROKERS = 3;
+ private static final int NUM_SERVERS = 1;
+ private static final int NUM_OFFLINE_SEGMENTS = 5;
- private ZkClient _zkClient;
- private HelixBrokerStarter _helixBrokerStarter;
- private ZkStarter.ZookeeperInstance _zookeeperInstance;
+ private HelixBrokerStarter _brokerStarter;
- @BeforeTest
+ @BeforeClass
public void setUp()
throws Exception {
- _zookeeperInstance = ZkStarter.startLocalZkServer();
- _zkClient = new ZkClient(ZkStarter.DEFAULT_ZK_STR);
-
+ startZk();
startController();
- _brokerConf.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 8943);
- _brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, 100L);
- _helixBrokerStarter = new HelixBrokerStarter(_brokerConf, getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR);
- _helixBrokerStarter.start();
+ Configuration brokerConf = new BaseConfiguration();
+ brokerConf.addProperty(Helix.KEY_OF_BROKER_QUERY_PORT, 18099);
+ brokerConf.addProperty(Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, 100L);
+ _brokerStarter = new HelixBrokerStarter(brokerConf, getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR);
+ _brokerStarter.start();
ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, 5, true);
+ .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, NUM_BROKERS - 1,
+ true);
ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, 1, true);
-
- while (_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_OFFLINE").size() == 0
- || _helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size() == 0) {
- Thread.sleep(100);
- }
+ .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, NUM_SERVERS, true);
+ Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+ .addTime(TIME_COLUMN_NAME, TimeUnit.DAYS, FieldSpec.DataType.INT).build();
+ _helixResourceManager.addOrUpdateSchema(schema);
TableConfig offlineTableConfig =
- new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(RAW_DINING_TABLE_NAME)
- .setTimeColumnName("timeColumn").setTimeType("DAYS").build();
+ new TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+ .setTimeType(TimeUnit.DAYS.name()).build();
_helixResourceManager.addTable(offlineTableConfig);
- setupRealtimeTable();
+ TableConfig realtimeTimeConfig =
+ new TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+ .setTimeType(TimeUnit.DAYS.name()).
+ setStreamConfigs(getStreamConfigs()).build();
+ _helixResourceManager.addTable(realtimeTimeConfig);
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < NUM_OFFLINE_SEGMENTS; i++) {
_helixResourceManager
- .addNewSegment(DINING_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_DINING_TABLE_NAME),
+ .addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME),
"downloadUrl");
}
- Thread.sleep(1000);
-
- ExternalView externalView = _helixAdmin.getResourceExternalView(getHelixClusterName(), DINING_TABLE_NAME);
- Assert.assertEquals(externalView.getPartitionSet().size(), 5);
+ TestUtils.waitForCondition(aVoid -> {
+ ExternalView offlineTableExternalView =
+ _helixAdmin.getResourceExternalView(getHelixClusterName(), OFFLINE_TABLE_NAME);
+ return offlineTableExternalView != null
+ && offlineTableExternalView.getPartitionSet().size() == NUM_OFFLINE_SEGMENTS;
+ }, 30_000L, "Failed to find all OFFLINE segments in the ExternalView");
}
- private void setupRealtimeTable()
- throws IOException {
- // Set up the realtime table.
+ private Map<String, String> getStreamConfigs() {
Map<String, String> streamConfigs = new HashMap<>();
streamConfigs.put("streamType", "kafka");
streamConfigs.put("stream.kafka.consumer.type", "highLevel");
streamConfigs.put("stream.kafka.topic.name", "kafkaTopic");
streamConfigs
.put("stream.kafka.decoder.class.name", "org.apache.pinot.core.realtime.impl.kafka.KafkaAvroMessageDecoder");
- streamConfigs.put("stream.kafka.hlc.zk.connect.string", "localhost:1111/zkConnect");
- streamConfigs.put("stream.kafka.decoder.prop.schema.registry.rest.url", "http://localhost:2222/schemaRegistry");
- TableConfig realtimeTimeConfig =
- new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_DINING_TABLE_NAME)
- .setTimeColumnName("timeColumn").setTimeType("DAYS").
- setStreamConfigs(streamConfigs).build();
- Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_DINING_TABLE_NAME)
- .addTime("timeColumn", TimeUnit.DAYS, FieldSpec.DataType.INT).build();
- _helixResourceManager.addOrUpdateSchema(schema);
- _helixResourceManager.addTable(realtimeTimeConfig);
- _helixBrokerStarter.getHelixExternalViewBasedRouting()
- .markDataResourceOnline(realtimeTimeConfig, null, new ArrayList<>());
- }
-
- @AfterTest
- public void tearDown() {
- _helixResourceManager.stop();
- _zkClient.close();
- ZkStarter.stopLocalZkServer(_zookeeperInstance);
+ return streamConfigs;
}
@Test
public void testResourceAndTagAssignment()
throws Exception {
- IdealState idealState;
-
- Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(),
- 6);
- idealState =
- _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
- Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT);
-
- ExternalView externalView =
- _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
- Assert.assertEquals(externalView.getStateMap(DINING_TABLE_NAME).size(), SEGMENT_COUNT);
-
- HelixExternalViewBasedRouting helixExternalViewBasedRouting =
- _helixBrokerStarter.getHelixExternalViewBasedRouting();
- Field brokerRoutingTableBuilderMapField;
- brokerRoutingTableBuilderMapField = HelixExternalViewBasedRouting.class.getDeclaredField("_routingTableBuilderMap");
- brokerRoutingTableBuilderMapField.setAccessible(true);
-
- final Map<String, RoutingTableBuilder> brokerRoutingTableBuilderMap =
- (Map<String, RoutingTableBuilder>) brokerRoutingTableBuilderMapField.get(helixExternalViewBasedRouting);
-
- // Wait up to 30s for routing table to reach the expected size
- waitForPredicate(new Callable<Boolean>() {
- @Override
- public Boolean call()
- throws Exception {
- return brokerRoutingTableBuilderMap.size() == 1;
- }
- }, 30000L);
-
- Assert.assertEquals(Arrays.toString(brokerRoutingTableBuilderMap.keySet().toArray()),
- "[dining_OFFLINE, dining_REALTIME]");
-
- final String tableName = "coffee";
- TableConfig tableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(tableName)
- .setBrokerTenant("testBroker").setServerTenant("testServer").build();
- _helixResourceManager.addTable(tableConfig);
-
- Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(),
- 6);
- idealState =
- _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
- Assert.assertEquals(idealState.getInstanceSet(COFFEE_TABLE_NAME).size(), SEGMENT_COUNT);
- Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT);
-
- // Wait up to 30s for broker external view to reach the expected size
- waitForPredicate(new Callable<Boolean>() {
- @Override
- public Boolean call()
- throws Exception {
- return
- _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
- .getStateMap(COFFEE_TABLE_NAME).size() == SEGMENT_COUNT;
- }
- }, 30000L);
-
- externalView =
- _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
- Assert.assertEquals(externalView.getStateMap(COFFEE_TABLE_NAME).size(), SEGMENT_COUNT);
-
- // Wait up to 30s for routing table to reach the expected size
- waitForPredicate(new Callable<Boolean>() {
- @Override
- public Boolean call()
- throws Exception {
- return brokerRoutingTableBuilderMap.size() == 2;
- }
- }, 30000L);
-
- Object[] tableArray = brokerRoutingTableBuilderMap.keySet().toArray();
- Arrays.sort(tableArray);
- Assert.assertEquals(Arrays.toString(tableArray), "[coffee_OFFLINE, dining_OFFLINE, dining_REALTIME]");
-
- Assert.assertEquals(
- brokerRoutingTableBuilderMap.get(DINING_TABLE_NAME).getRoutingTables().get(0).values().iterator().next().size(),
- 5);
-
+ assertEquals(
+ _helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), TagNameUtils.getBrokerTagForTenant(null))
+ .size(), NUM_BROKERS);
+
+ IdealState brokerResourceIdealState =
+ _helixAdmin.getResourceIdealState(getHelixClusterName(), Helix.BROKER_RESOURCE_INSTANCE);
+ assertEquals(brokerResourceIdealState.getInstanceSet(OFFLINE_TABLE_NAME).size(), NUM_BROKERS);
+ assertEquals(brokerResourceIdealState.getInstanceSet(REALTIME_TABLE_NAME).size(), NUM_BROKERS);
+
+ ExternalView brokerResourceExternalView =
+ _helixAdmin.getResourceExternalView(getHelixClusterName(), Helix.BROKER_RESOURCE_INSTANCE);
+ assertEquals(brokerResourceExternalView.getStateMap(OFFLINE_TABLE_NAME).size(), NUM_BROKERS);
+ assertEquals(brokerResourceExternalView.getStateMap(REALTIME_TABLE_NAME).size(), NUM_BROKERS);
+
+ HelixExternalViewBasedRouting routing = _brokerStarter.getHelixExternalViewBasedRouting();
+ assertTrue(routing.routingTableExists(OFFLINE_TABLE_NAME));
+ assertTrue(routing.routingTableExists(REALTIME_TABLE_NAME));
+
+ RoutingTableLookupRequest routingTableLookupRequest = new RoutingTableLookupRequest(OFFLINE_TABLE_NAME);
+ Map<String, List<String>> routingTable = routing.getRoutingTable(routingTableLookupRequest);
+ assertEquals(routingTable.size(), NUM_SERVERS);
+ assertEquals(routingTable.values().iterator().next().size(), NUM_OFFLINE_SEGMENTS);
+
+ // Add a new segment into the OFFLINE table
_helixResourceManager
- .addNewSegment(DINING_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_DINING_TABLE_NAME),
- "downloadUrl");
-
- // Wait up to 30s for external view to reach the expected size
- waitForPredicate(new Callable<Boolean>() {
- @Override
- public Boolean call()
- throws Exception {
- return _helixAdmin.getResourceExternalView(getHelixClusterName(), DINING_TABLE_NAME).getPartitionSet().size()
- == SEGMENT_COUNT;
- }
- }, 30000L);
-
- externalView = _helixAdmin.getResourceExternalView(getHelixClusterName(), DINING_TABLE_NAME);
- Assert.assertEquals(externalView.getPartitionSet().size(), SEGMENT_COUNT);
- tableArray = brokerRoutingTableBuilderMap.keySet().toArray();
- Arrays.sort(tableArray);
- Assert.assertEquals(Arrays.toString(tableArray), "[coffee_OFFLINE, dining_OFFLINE, dining_REALTIME]");
-
- // Wait up to 30s for routing table to reach the expected size
- waitForPredicate(new Callable<Boolean>() {
- @Override
- public Boolean call()
- throws Exception {
- Map<String, List<String>> routingTable =
- brokerRoutingTableBuilderMap.get(DINING_TABLE_NAME).getRoutingTables().get(0);
- return routingTable.values().iterator().next().size() == SEGMENT_COUNT;
- }
- }, 30000L);
-
- Assert.assertEquals(
- brokerRoutingTableBuilderMap.get(DINING_TABLE_NAME).getRoutingTables().get(0).values().iterator().next().size(),
- SEGMENT_COUNT);
+ .addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), "downloadUrl");
+
+ TestUtils.waitForCondition(
+ aVoid -> routing.getRoutingTable(routingTableLookupRequest).values().iterator().next().size()
+ == NUM_OFFLINE_SEGMENTS + 1, 30_000L, "Failed to add the new segment into the routing table");
+
+ // Add a new table with different broker tenant
+ String newRawTableName = "newTable";
+ String newOfflineTableName = TableNameBuilder.OFFLINE.tableNameWithType(newRawTableName);
+ TableConfig newTableConfig =
+ new TableConfig.Builder(TableType.OFFLINE).setTableName(newRawTableName).setBrokerTenant("testBroker").build();
+ _helixResourceManager.addTable(newTableConfig);
+
+ // Broker tenant should be overridden to DefaultTenant
+ TableConfig newTableConfigInCluster = _helixResourceManager.getTableConfig(newOfflineTableName);
+ assertNotNull(newTableConfigInCluster);
+ assertEquals(newTableConfigInCluster.getTenantConfig().getBroker(), TagNameUtils.DEFAULT_TENANT_NAME);
+
+ brokerResourceIdealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), Helix.BROKER_RESOURCE_INSTANCE);
+ assertEquals(brokerResourceIdealState.getInstanceSet(newOfflineTableName).size(), NUM_BROKERS);
+
+ TestUtils.waitForCondition(aVoid -> {
+ Map<String, String> newTableStateMap =
+ _helixAdmin.getResourceExternalView(getHelixClusterName(), Helix.BROKER_RESOURCE_INSTANCE)
+ .getStateMap(newOfflineTableName);
+ return newTableStateMap != null && newTableStateMap.size() == NUM_BROKERS;
+ }, 30_000L, "Failed to find all brokers for the new table in the brokerResource ExternalView");
+
+ assertTrue(routing.routingTableExists(newOfflineTableName));
}
+ /**
+ * This test verifies that when the segments of an OFFLINE are refreshed, the TimeBoundaryInfo is also updated.
+ */
@Test
- public void testTimeBoundaryUpdate()
- throws Exception {
- // This test verifies that when the segments of an offline table are refreshed, the TimeBoundaryInfo is also updated
- // to a newer timestamp.
- final long currentTimeBoundary = 10;
- TimeBoundaryService.TimeBoundaryInfo tbi = _helixBrokerStarter.getHelixExternalViewBasedRouting().
- getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME);
-
- Assert.assertEquals(tbi.getTimeValue(), Long.toString(currentTimeBoundary - 1));
-
- List<String> segmentNames = _helixResourceManager.getSegmentsFor(DINING_TABLE_NAME);
- long endTime = currentTimeBoundary + 10;
- // Refresh all 5 segments.
- for (String segment : segmentNames) {
- OfflineSegmentZKMetadata offlineSegmentZKMetadata =
- _helixResourceManager.getOfflineSegmentZKMetadata(RAW_DINING_TABLE_NAME, segment);
- Assert.assertNotNull(offlineSegmentZKMetadata);
- _helixResourceManager.refreshSegment(DINING_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_DINING_TABLE_NAME, segment, endTime++),
- offlineSegmentZKMetadata);
- }
- // Due to the asynchronous nature of the TimeboundaryInfo update and thread scheduling, the updated time boundary
- // may not always be the max endtime of segments. We do not expect such exact update either as long as the timestamp
- // is updated to some newer value.
- waitForPredicate(() -> {
- TimeBoundaryService.TimeBoundaryInfo timeBoundaryInfo = _helixBrokerStarter.getHelixExternalViewBasedRouting().
- getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME);
- return currentTimeBoundary < Long.parseLong(timeBoundaryInfo.getTimeValue());
- }, 5 * _brokerConf.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL));
- tbi = _helixBrokerStarter.getHelixExternalViewBasedRouting().
- getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME);
- Assert.assertTrue(currentTimeBoundary < Long.parseLong(tbi.getTimeValue()));
+ public void testTimeBoundaryUpdate() {
+ TimeBoundaryService timeBoundaryService = _brokerStarter.getHelixExternalViewBasedRouting().
+ getTimeBoundaryService();
+
+ // Time boundary should be 1 day smaller than the end time
+ int currentEndTime = 10;
+ TimeBoundaryInfo timeBoundaryInfo = timeBoundaryService.getTimeBoundaryInfoFor(OFFLINE_TABLE_NAME);
+ assertEquals(timeBoundaryInfo.getTimeValue(), Integer.toString(currentEndTime - 1));
+
+ // Refresh a segment with a new end time
+ String segmentToRefresh = _helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME).get(0);
+ int newEndTime = currentEndTime + 10;
+ OfflineSegmentZKMetadata segmentZKMetadata =
+ _helixResourceManager.getOfflineSegmentZKMetadata(RAW_TABLE_NAME, segmentToRefresh);
+ _helixResourceManager.refreshSegment(OFFLINE_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, segmentToRefresh, newEndTime),
+ segmentZKMetadata);
+
+ TestUtils.waitForCondition(aVoid -> timeBoundaryService.getTimeBoundaryInfoFor(OFFLINE_TABLE_NAME).getTimeValue()
+ .equals(Integer.toString(newEndTime - 1)), 30_000L, "Failed to update the time boundary for refreshed segment");
}
- private void waitForPredicate(Callable<Boolean> predicate, long timeout) {
- long deadline = System.currentTimeMillis() + timeout;
- while (System.currentTimeMillis() < deadline) {
- try {
- if (predicate.call()) {
- return;
- }
- } catch (Exception e) {
- // Do nothing
- }
-
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
- }
+ @AfterClass
+ public void tearDown() {
+ _brokerStarter.shutdown();
+ stopController();
+ stopZk();
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
index bfe155b..cb06fc4 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
@@ -69,7 +69,7 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
} catch (Exception e) {
throw new RuntimeException(e);
}
- }, 500L, 10_000L, "Expected three instances after creation of tagged instances");
+ }, 10_000L, "Expected three instances after creation of tagged instances");
// Create tagged broker and server instances
brokerInstance.put("tag", "someTag");
@@ -90,7 +90,7 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
} catch (Exception e) {
throw new RuntimeException(e);
}
- }, 500L, 10_000L, "Expected five instances after creation of tagged instances");
+ }, 10_000L, "Expected five instances after creation of tagged instances");
// Create duplicate broker and server instances (both calls should fail)
try {
diff --git a/pinot-core/src/test/java/org/apache/pinot/util/TestUtils.java b/pinot-core/src/test/java/org/apache/pinot/util/TestUtils.java
index 08d1de6..de17ff4 100644
--- a/pinot-core/src/test/java/org/apache/pinot/util/TestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/util/TestUtils.java
@@ -22,13 +22,9 @@ import com.google.common.base.Function;
import java.io.File;
import java.io.IOException;
import java.net.URL;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.response.broker.GroupByResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -40,10 +36,6 @@ import org.testng.Assert;
*/
public class TestUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(TestUtils.class);
- private static final int testThreshold = 1000;
-
- public static final double hllEstimationThreshold = 0.5;
- public static final double digestEstimationThreshold = 0.1;
public static String getFileFromResourceUrl(@Nonnull URL resourceUrl) {
// For maven cross package use case, we need to extract the resource from jar to a temporary directory.
@@ -65,60 +57,6 @@ public class TestUtils {
}
/**
- * assert estimation in error range
- * @param estimate
- * @param actual
- */
- public static void assertApproximation(double estimate, double actual, double precision) {
- estimate = Math.abs(estimate);
- actual = Math.abs(actual);
-
- if (estimate < testThreshold && actual < testThreshold) {
- double errorDiff = Math.abs(actual - estimate);
- LOGGER.debug("estimate: " + estimate + " actual: " + actual + " error (in difference): " + errorDiff);
- LOGGER.debug("small value comparison ignored!");
- //Assert.assertEquals(error < 3, true);
- } else {
- double errorRate = 1;
- if (actual > 0) {
- errorRate = Math.abs((actual - estimate) / actual);
- }
- LOGGER.debug("estimate: " + estimate + " actual: " + actual + " error (in rate): " + errorRate);
- Assert.assertTrue(errorRate < precision);
- }
- }
-
- public static void assertGroupByResultsApproximation(List<GroupByResult> estimateValues,
- List<GroupByResult> actualValues, double precision) {
- LOGGER.info("====== assertGroupByResultsApproximation ======");
- // estimation should not affect number of groups formed
- Assert.assertEquals(estimateValues.size(), actualValues.size());
-
- Map<List<String>, Double> mapEstimate = new HashMap<>();
- Map<List<String>, Double> mapActual = new HashMap<>();
- for (GroupByResult gby : estimateValues) {
- mapEstimate.put(gby.getGroup(), Double.parseDouble(gby.getValue().toString()));
- }
- for (GroupByResult gby : actualValues) {
- mapActual.put(gby.getGroup(), Double.parseDouble(gby.getValue().toString()));
- }
-
- LOGGER.info("estimate: " + mapEstimate.keySet());
- LOGGER.info("actual: " + mapActual.keySet());
-
- int cnt = 0;
- for (List<String> key : mapEstimate.keySet()) {
- // Not strictly enforced, since in quantile, top 100 groups from accurate maybe not be top 100 from estimate
- // Assert.assertEquals(mapActual.keySet().contains(key), true);
- if (mapActual.keySet().contains(key)) {
- assertApproximation(mapEstimate.get(key), mapActual.get(key), precision);
- cnt += 1;
- }
- }
- LOGGER.info("group overlap rate: " + (cnt + 0.0) / mapEstimate.keySet().size());
- }
-
- /**
* Ensure the given directories exist and are empty.
*
* @param dirs Directories to be cleared
@@ -159,6 +97,6 @@ public class TestUtils {
public static void waitForCondition(Function<Void, Boolean> condition, long timeoutMs,
@Nullable String errorMessage) {
- waitForCondition(condition, 1000L, timeoutMs, errorMessage);
+ waitForCondition(condition, 100L, timeoutMs, errorMessage);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org