You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2019/07/23 23:29:20 UTC

[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #4446: Add support in the rebalancer for the user to provide minimum number of serving replicas

mcvsubbu commented on a change in pull request #4446: Add support in the rebalancer for the user to provide minimum number of serving replicas
URL: https://github.com/apache/incubator-pinot/pull/4446#discussion_r306562976
 
 

 ##########
 File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalancerAdminToolClusterIntegrationTest.java
 ##########
 @@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.config.TagNameUtils;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.NetUtil;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.controller.helix.core.TableRebalancer;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.server.realtime.ControllerLeaderLocator;
+import org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory;
+import org.apache.pinot.tools.PinotTableRebalancer;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Class to test {@link TableRebalancer} using {@link PinotTableRebalancer}
+ * as the entry point to test the end-to-end path from pinot-admin tool.
+ */
+public class TableRebalancerAdminToolClusterIntegrationTest extends BaseClusterIntegrationTestSet {
+
+  private static final String ZKSTR = ZkStarter.DEFAULT_ZK_STR;
+  private static final int NUM_INITIAL_SERVERS = 3;
+  private final List<HelixManager> _helixManagers = new ArrayList<>();
+  private final Set<String> servers = new HashSet<>();
+  private StateTransitionStats _stateTransitionStats;
+
+  @Override
+  protected boolean isUsingNewConfigFormat() {
+    return true;
+  }
+
+  @BeforeClass
+  public void setup()
+      throws Exception {
+    startZk();
+    startController();
+    startBroker();
+    startFakeServers(NUM_INITIAL_SERVERS, CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT);
+    addOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, getInvertedIndexColumns(),
+        getBloomFilterIndexColumns(), getTaskConfig(), null, null);
+    completeTableConfiguration();
+  }
+
+  @AfterClass
+  public void tearDown() {
+    stopFakeServers();
+    stopBroker();
+    stopController();
+    stopZk();
+  }
+
+  private void startFakeServers(final int numServers, final int basePort)
+      throws Exception {
+    for (int i = 0; i < numServers; i++) {
+      final int nettyPort = basePort + i;
+      final String instanceId =
+          CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE + NetUtil.getHostAddress() + "_" + nettyPort;
+      if (servers.contains(instanceId)) {
+        continue;
+      }
+      servers.add(instanceId);
+      final HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(_clusterName, instanceId, InstanceType.PARTICIPANT, ZkStarter.DEFAULT_ZK_STR);
+      helixManager.getStateMachineEngine()
+          .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
+              new FakeServerSegmentStateModelFactory());
+      helixManager.connect();
+      helixManager.getClusterManagmentTool().addInstanceTag(_clusterName, instanceId,
+          TableNameBuilder.OFFLINE.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
+      _helixManagers.add(helixManager);
+      ControllerLeaderLocator.create(helixManager);
+    }
+  }
+
+  private void stopFakeServers() {
+    for (HelixManager helixManager : _helixManagers) {
+      helixManager.disconnect();
+    }
+  }
+
+  /**
+   * Test in no-downtime mode when there are sufficient
+   * common hosts between current and target ideal states
+   * to keep up the min number of serving replicas. In this
+   * case, the algorithm in {@link TableRebalancer} that
+   * makes changes to ideal state to get to a target ideal
+   * state can do the transition in one go.
+   *
+   * Scenario:
+   *
+   * Current ideal state
+   *
+   * segment1: {host1:online, host2:online, host3:online}
+   * segment2: {host1:online, host2:online, host3:online}
+   *
+   * We add 2 additional hosts: host4 and host5
+   * The rebalancer will use the table config to get the
+   * appropriate rebalance strategy (default strategy in this case)
+   * and come up with the target ideal state
+   *
+   * segment1: {host1:online, host3:online, host5:online}
+   * segment2: {host1:online, host2:online, host4:online}
+   *
+   * The test specifies the min replicas to keep up as 2.
+   * With the above setup, the rebalancer can set to target
+   * partition state for each segment with a direct update
+   * (because of common hosts that still satisfy the min
+   * replica requirement). Since this happens once for each
+   * segment before we get to target ideal state, the
+   * number of direct transitions are 2.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testPinotTableRebalancerWithDirectTransitions()
+      throws Exception {
+    _stateTransitionStats = new StateTransitionStats();
+
+    // write ideal state
+    createIdealState(2, NUM_INITIAL_SERVERS);
+
+    // add additional servers to trigger the rebalance strategy,
+    // otherwise rebalance will be a NO-OP
+    startFakeServers(2, CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT + NUM_INITIAL_SERVERS);
+
+    // rebalance
+    final PinotTableRebalancer tableRebalancer = new PinotTableRebalancer(ZKSTR, _clusterName, false, true, false, 2);
+    tableRebalancer.rebalance(getTableName(), "OFFLINE");
+    final TableRebalancer.RebalancerStats stats = tableRebalancer.getRebalancerStats();
+
+    // check the algorithm stats on how we moved from current
+    // to target ideal state
+    Assert.assertEquals(0, stats.getDryRun());
+    Assert.assertEquals(2, stats.getDirectTransitions());
+    Assert.assertEquals(0, stats.getIncrementalTransitions());
+
+    // as part of rebalancing, host2 lost segment1 and host3
+    // lost segment2 -- so 2 transitions from ON to OFF and
+    // OFF to DROP
+    Assert.assertEquals(2, _stateTransitionStats.offFromOn);
+    Assert.assertEquals(2, _stateTransitionStats.dropFromOff);
+
+    // clear for next test to avoid mixing state transition stats
+    clearIdealState(2);
+  }
+
+  /**
+   * Test in no-downtime mode when there are not sufficient
+   * common hosts between current and target ideal states
+   * to keep up the min number of serving replicas. In this
+   * case, the algorithm in {@link TableRebalancer} that
+   * makes changes to ideal state to get to a target ideal
+   * state does the changes incrementally while satisfying
+   * the min replica requirement
+   *
+   * Scenario:
+   *
+   * Current ideal state
+   *
+   * segment1: {host1:online, host2:online, host3:online}
+   * segment2: {host1:online, host2:online, host3:online}
+   * segment3: {host1:online, host2:online, host3:online}
+   * segment4: {host1:online, host2:online, host3:online}
+   *
+   * We add 3 additional hosts: host4, host5 and host6
+   * The rebalancer will use the table config to get the
+   * appropriate rebalance strategy (default strategy in this case)
+   * and come up with the target ideal state
+   *
+   * segment1: {host1:online, host3:online, host5:online}
+   * segment2: {host2:online, host4:online, host6:online}
+   * segment3: {host1:online, host3:online, host5:online}
+   * segment4: {host2:online, host4:online, host6:online}
+   *
+   * The test specifies the min replicas to keep up as 2.
+   * With the above setup, the rebalancer can set to target
+   * partition state for each segment with a direct update
+   * (because of common hosts that still satisfy the min
+   * replica requirement) only for segments 1 and 3.
+   *
+   * so far direct transitions 2
+   *
+   * for segments 2 and 4, we have to do one incremental
+   * change each by removing a current server and adding
+   * a new server while keeping up with requirement of
+   * 2 min serving replicas
+   *
+   * so far incremental transitions 2
+   *
+   * Since we have looked at all segments once, the updated
+   * ideal state is persisted in ZK, we wait for external view
+   * to converge and next iteration of rebalancing begins by
+   * checking if we have reached target.
+   * We haven't so we go over each segment again.
+   *
+   * Now for each segment we can do direct update -- once for
+   * each of the 4 segments while still satisfying
+   * the min replica requirement
+   *
+   * So, direct transitions: 2+4 = 6, increment transitions = 2
+   * @throws Exception
+   */
+  @Test
+  public void testPinotTableRebalancerWithIncrementalTransitions()
+      throws Exception {
+    _stateTransitionStats = new StateTransitionStats();
+
+    // write ideal state
+    createIdealState(4, NUM_INITIAL_SERVERS);
+
+    // add additional servers to trigger the rebalance strategy,
+    // otherwise rebalance will be a NO-OP
+    startFakeServers(3, CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT + NUM_INITIAL_SERVERS);
+
+    // rebalance
+    final PinotTableRebalancer tableRebalancer = new PinotTableRebalancer(ZKSTR, _clusterName, false, true, false, 2);
+    tableRebalancer.rebalance(getTableName(), "OFFLINE");
+    TableRebalancer.RebalancerStats stats = tableRebalancer.getRebalancerStats();
+
+    // verify the algorithm stats on how we moved from
+    // current to target ideal state
+    Assert.assertEquals(0, stats.getDryRun());
+    Assert.assertEquals(6, stats.getDirectTransitions());
+    Assert.assertEquals(2, stats.getIncrementalTransitions());
+
+    // as part of rebalancing, host2 lost segment1 and segment3,
+    // host1 and host3 lost segment2 and segment4 -- so 6
+    // transitions from ON to OFF and OFF to DROP
+    Assert.assertEquals(6, _stateTransitionStats.offFromOn);
+    Assert.assertEquals(6, _stateTransitionStats.dropFromOff);
+  }
+
+  private void clearIdealState(final int numSegments) throws Exception {
+    final HelixDataAccessor dataAccessor = _helixManagers.get(0).getHelixDataAccessor();
+    final String tableNameWithType = getTableName() + "_OFFLINE";
+    final PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(tableNameWithType);
+    final IdealState idealState = dataAccessor.getProperty(idealStateKey);
+    for (int i = 0; i < numSegments; i++) {
+      final String segmentID = "segment" + i;
+      if (idealState.getInstanceStateMap(segmentID) != null) {
+        idealState.getInstanceStateMap(segmentID).clear();
+      }
+    }
+    final ZkBaseDataAccessor zkBaseDataAccessor = (ZkBaseDataAccessor) dataAccessor.getBaseDataAccessor();
+    zkBaseDataAccessor.set(idealStateKey.getPath(), idealState.getRecord(), idealState.getRecord().getVersion(),
+        AccessOption.PERSISTENT);
+
+    // should be enough for the callbacks to come and go away
+    Thread.sleep(2000);
 
 Review comment:
   You can avoid the clearing of idealstate by just deleting the table. Much faster I think. Only thing is, you will need to use a new table for each test case, which is fine

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org