You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "shounakmk219 (via GitHub)" <gi...@apache.org> on 2023/07/18 11:49:29 UTC

[GitHub] [pinot] shounakmk219 opened a new pull request, #11128: Tenant rebalance and status tracking APIs

shounakmk219 opened a new pull request, #11128:
URL: https://github.com/apache/pinot/pull/11128

   ### Tenant Rebalance API
   
   This API addresses the task to rebalance all the tables for a given tenant. Usually when we tag new servers to a tenant it gets tedious to rebalance each table under that tenant to utilise the newly added servers. This operation becomes impossible to handle if the tenant has high number of tables. The proposed utility will allow users to rebalance a tenant and track its progress with minimal operational overhead.
   
   More details are captured in this doc : [Tenant Rebalance](https://docs.google.com/document/d/1s00wFOq2Ox9V8sOUhbWX2VjGkmb9s0IsRgGjpW_O6mE/edit?usp=sharing)
   
   Path : `/tenants/{tenantName}/rebalance`
   Method : `POST`
   Sample request body : 
   ```
   {
     "tenantName": "DefaultTenant",
     "degreeOfParallelism": 2,
     "parallelWhitelist": [
       "airlineStats_OFFLINE"
     ],
     "parallelBlacklist": [
       "airlineStats_REALTIME"
     ],
     "verboseResult": false,
     "dryRun": false,
     "downtime": false,
     "reassignInstances": false,
     "includeConsuming": false,
     "bootstrap": false,
     "minAvailableReplicas": 1,
     "bestEfforts": false,
     "updateTargetTier": false,
     "externalViewCheckIntervalInMs": 1000,
     "externalViewStabilizationTimeoutInMs": 3600000
   }
   ```
   Sample response :
   ```
   {
     "jobId": "dfbbebb7-1f62-497d-82a7-ded6e0d855e1",
     "rebalanceTableResults": {
       "airlineStats_OFFLINE": {
         "jobId": "2d4dc2da-1071-42b5-a20c-ac38a6d53fc4",
         "status": "NO_OP",
         "description": "Table is already balanced"
       },
       "airlineStats_REALTIME": {
         "jobId": "9284f137-29c1-4c5a-a113-17b90a484403",
         "status": "NO_OP",
         "description": "Table is already balanced"
       }
     }
   }
   ```
   
   ### Tenant Rebalance Status tracking API
   Path : `/tenants/rebalanceStatus/{jobId}`
   Method : `GET`
   Sample response body : 
   ```
   {
     "tenantRebalanceProgressStats": {
       "startTimeMs": 1689679866904,
       "timeToFinishInSeconds": 0,
       "completionStatusMsg": "Successfully rebalanced tenant DefaultTenant.",
       "tableStatusMap": {
         "airlineStats_OFFLINE": "Table is already balanced",
         "airlineStats_REALTIME": "Table is already balanced"
       },
       "totalTables": 2,
       "remainingTables": 0,
       "tableRebalanceJobIdMap": {
         "airlineStats_OFFLINE": "2d4dc2da-1071-42b5-a20c-ac38a6d53fc4",
         "airlineStats_REALTIME": "9284f137-29c1-4c5a-a113-17b90a484403"
       }
     },
     "timeElapsedSinceStartInSeconds": 0
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on pull request #11128: Tenant rebalance and status tracking APIs

Posted by "saurabhd336 (via GitHub)" <gi...@apache.org>.
saurabhd336 commented on PR #11128:
URL: https://github.com/apache/pinot/pull/11128#issuecomment-1643661309

   also adding @swaminathanmanish for review 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] shounakmk219 commented on a diff in pull request #11128: Tenant rebalance and status tracking APIs

Posted by "shounakmk219 (via GitHub)" <gi...@apache.org>.
shounakmk219 commented on code in PR #11128:
URL: https://github.com/apache/pinot/pull/11128#discussion_r1276125063


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -103,6 +112,9 @@ public class PinotTenantRestletResource {
   @Inject
   ControllerMetrics _controllerMetrics;
 
+  @Inject
+  ExecutorService _executorService;

Review Comment:
   Makes sense. Even I referred to the single table rebalance implementation but agree with you point that it will affect the shared executorService i cases where the tenant has lot of tables and user runs the rebalance with high parallelism. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #11128: Tenant rebalance and status tracking APIs

Posted by "saurabhd336 (via GitHub)" <gi...@apache.org>.
saurabhd336 commented on code in PR #11128:
URL: https://github.com/apache/pinot/pull/11128#discussion_r1269243313


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultTenantRebalancer implements TenantRebalancer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTenantRebalancer.class);
+  PinotHelixResourceManager _pinotHelixResourceManager;
+  ExecutorService _executorService;
+
+  public DefaultTenantRebalancer(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _executorService = executorService;
+  }
+
+  @Override
+  public TenantRebalanceResult rebalance(TenantRebalanceContext context) {
+    Map<String, RebalanceResult> rebalanceResult = new HashMap<>();
+    Set<String> tables = getTenantTables(context.getTenantName());
+    tables.forEach(table -> {
+      try {
+        Configuration config = extractRebalanceConfig(context);
+        config.setProperty(RebalanceConfigConstants.DRY_RUN, true);
+        rebalanceResult.put(table, _pinotHelixResourceManager.rebalanceTable(table, config, false));

Review Comment:
   Why are we rebalancing twice in this function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] snleee commented on a diff in pull request #11128: Tenant rebalance and status tracking APIs

Posted by "snleee (via GitHub)" <gi...@apache.org>.
snleee commented on code in PR #11128:
URL: https://github.com/apache/pinot/pull/11128#discussion_r1285399188


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java:
##########
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+@ApiModel
+public class RebalanceContext {
+  @JsonProperty("dryRun")
+  @ApiModelProperty(example = "false")
+  private Boolean _dryRun = false;

Review Comment:
   Default should be `dryRun=true` to make the protection? What's the current behavior for the rebalance API?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java:
##########
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+@ApiModel
+public class RebalanceContext {
+  @JsonProperty("dryRun")
+  @ApiModelProperty(example = "false")
+  private Boolean _dryRun = false;
+  @JsonProperty("reassignInstances")

Review Comment:
   Are the default values for configs are the same as the existing API? If so, let's add `TODO` comment to see if we can make the config easier.
   
   Ideally, these configuration should be intelligently be picked up as much as possible. For instance, `reassignInstances` should be triggered if we detect the need (i.e. there is extra server in the tenant that is not part of instance assignment or there is server in the instance assignment that is no longer be part of tenant).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #11128: Tenant rebalance and status tracking APIs

Posted by "saurabhd336 (via GitHub)" <gi...@apache.org>.
saurabhd336 commented on code in PR #11128:
URL: https://github.com/apache/pinot/pull/11128#discussion_r1274817803


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultTenantRebalancer implements TenantRebalancer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTenantRebalancer.class);
+  PinotHelixResourceManager _pinotHelixResourceManager;
+  ExecutorService _executorService;
+
+  public DefaultTenantRebalancer(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _executorService = executorService;
+  }
+
+  @Override
+  public TenantRebalanceResult rebalance(TenantRebalanceContext context) {
+    Map<String, RebalanceResult> rebalanceResult = new HashMap<>();
+    Set<String> tables = getTenantTables(context.getTenantName());
+    tables.forEach(table -> {
+      try {
+        Configuration config = extractRebalanceConfig(context);
+        config.setProperty(RebalanceConfigConstants.DRY_RUN, true);
+        rebalanceResult.put(table, _pinotHelixResourceManager.rebalanceTable(table, config, false));
+      } catch (TableNotFoundException exception) {
+        rebalanceResult.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(),
+            null, null, null));
+      }
+    });
+    if (context.isDryRun() || context.isDowntime()) {
+      return new TenantRebalanceResult(null, rebalanceResult, context.isVerboseResult());
+    } else {
+      for (String table : rebalanceResult.keySet()) {
+        RebalanceResult result = rebalanceResult.get(table);
+        if (result.getStatus() == RebalanceResult.Status.DONE) {
+          rebalanceResult.put(table, new RebalanceResult(result.getJobId(), RebalanceResult.Status.IN_PROGRESS,
+              "In progress, check controller task status for the", result.getInstanceAssignment(),
+              result.getTierInstanceAssignment(), result.getSegmentAssignment()));
+        }
+      }
+    }
+
+    String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+    TenantRebalanceObserver observer = new ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, context.getTenantName(),
+        tables, _pinotHelixResourceManager);
+    observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, null);
+    Set<String> parallelTables;
+    Set<String> sequentialTables;
+    final ConcurrentLinkedQueue<String> parallelQueue;
+    // ensure atleast 1 thread is created to run the sequential table rebalance operations
+    int parallelism = Math.max(context.getDegreeOfParallelism(), 1);
+    AtomicInteger activeThreads = new AtomicInteger(parallelism);
+    try {
+      if (parallelism > 1) {
+        if (!context.getParallelWhitelist().isEmpty()) {
+          parallelTables = new HashSet<>(context.getParallelWhitelist());
+        } else {
+          parallelTables = new HashSet<>(tables);
+        }
+        if (!context.getParallelBlacklist().isEmpty()) {
+          parallelTables = Sets.difference(parallelTables, context.getParallelBlacklist());
+        }
+        parallelQueue = new ConcurrentLinkedQueue<>(parallelTables);
+        sequentialTables = Sets.difference(tables, parallelTables);
+      } else {
+        sequentialTables = new HashSet<>(tables);
+        parallelQueue = new ConcurrentLinkedQueue<>();
+      }
+      for (int i = 0; i < parallelism; i++) {
+        _executorService.submit(() -> {
+          try {
+            while (!parallelQueue.isEmpty()) {
+              String table = parallelQueue.remove();

Review Comment:
   This is not atomic. String table can be null after 108



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] shounakmk219 commented on a diff in pull request #11128: Tenant rebalance and status tracking APIs

Posted by "shounakmk219 (via GitHub)" <gi...@apache.org>.
shounakmk219 commented on code in PR #11128:
URL: https://github.com/apache/pinot/pull/11128#discussion_r1269556890


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultTenantRebalancer implements TenantRebalancer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTenantRebalancer.class);
+  PinotHelixResourceManager _pinotHelixResourceManager;
+  ExecutorService _executorService;
+
+  public DefaultTenantRebalancer(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _executorService = executorService;
+  }
+
+  @Override
+  public TenantRebalanceResult rebalance(TenantRebalanceContext context) {
+    Map<String, RebalanceResult> rebalanceResult = new HashMap<>();
+    Set<String> tables = getTenantTables(context.getTenantName());
+    tables.forEach(table -> {
+      try {
+        Configuration config = extractRebalanceConfig(context);
+        config.setProperty(RebalanceConfigConstants.DRY_RUN, true);
+        rebalanceResult.put(table, _pinotHelixResourceManager.rebalanceTable(table, config, false));

Review Comment:
   I am trying to keep the behaviour equivalent to the table rebalance endpoint. The first rebalance runs in DRY_RUN mode so the table is not actually rebalanced but we get the expected state which the user can expect once the async operations are completed. The next rebalance calls will do the actual work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] shounakmk219 commented on a diff in pull request #11128: Tenant rebalance and status tracking APIs

Posted by "shounakmk219 (via GitHub)" <gi...@apache.org>.
shounakmk219 commented on code in PR #11128:
URL: https://github.com/apache/pinot/pull/11128#discussion_r1285724684


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java:
##########
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+@ApiModel
+public class RebalanceContext {
+  @JsonProperty("dryRun")
+  @ApiModelProperty(example = "false")
+  private Boolean _dryRun = false;
+  @JsonProperty("reassignInstances")

Review Comment:
   Yes, I am referring the existing table rebalance API configs and their default values.
   Great point on the auto feeding the configs wherever possible! I can take a follow up item to enhance the configs so as to keep the scope of the current PR limited. Will add a `TODO`.
   These config enhancements should be reflected at table rebalance as well as tenant rebalance at the same time so that we maintain the consistency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] shounakmk219 commented on a diff in pull request #11128: Tenant rebalance and status tracking APIs

Posted by "shounakmk219 (via GitHub)" <gi...@apache.org>.
shounakmk219 commented on code in PR #11128:
URL: https://github.com/apache/pinot/pull/11128#discussion_r1287412875


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java:
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+@ApiModel
+public class RebalanceContext {
+  // TODO : simplify the rebalance configs wherever possible
+  @JsonProperty("dryRun")
+  @ApiModelProperty(example = "false")
+  private Boolean _dryRun = false;
+  @JsonProperty("reassignInstances")
+  @ApiModelProperty(example = "false")
+  private Boolean _reassignInstances = false;
+  @JsonProperty("includeConsuming")
+  @ApiModelProperty(example = "false")
+  private Boolean _includeConsuming = false;

Review Comment:
   make it true by default



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] shounakmk219 commented on a diff in pull request #11128: Tenant rebalance and status tracking APIs

Posted by "shounakmk219 (via GitHub)" <gi...@apache.org>.
shounakmk219 commented on code in PR #11128:
URL: https://github.com/apache/pinot/pull/11128#discussion_r1276125063


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -103,6 +112,9 @@ public class PinotTenantRestletResource {
   @Inject
   ControllerMetrics _controllerMetrics;
 
+  @Inject
+  ExecutorService _executorService;

Review Comment:
   Makes sense. Even I referred to the single table rebalance implementation but agree with your point that it will affect the shared executorService in cases where the tenant has lot of tables and user runs the rebalance with high parallelism. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] shounakmk219 closed pull request #11128: Tenant rebalance and status tracking APIs

Posted by "shounakmk219 (via GitHub)" <gi...@apache.org>.
shounakmk219 closed pull request #11128: Tenant rebalance and status tracking APIs
URL: https://github.com/apache/pinot/pull/11128


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #11128: Tenant rebalance and status tracking APIs

Posted by "saurabhd336 (via GitHub)" <gi...@apache.org>.
saurabhd336 commented on code in PR #11128:
URL: https://github.com/apache/pinot/pull/11128#discussion_r1269254465


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultTenantRebalancer implements TenantRebalancer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTenantRebalancer.class);
+  PinotHelixResourceManager _pinotHelixResourceManager;
+  ExecutorService _executorService;
+
+  public DefaultTenantRebalancer(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _executorService = executorService;
+  }
+
+  @Override
+  public TenantRebalanceResult rebalance(TenantRebalanceContext context) {
+    Map<String, RebalanceResult> rebalanceResult = new HashMap<>();
+    Set<String> tables = getTenantTables(context.getTenantName());
+    tables.forEach(table -> {
+      try {
+        Configuration config = extractRebalanceConfig(context);
+        config.setProperty(RebalanceConfigConstants.DRY_RUN, true);
+        rebalanceResult.put(table, _pinotHelixResourceManager.rebalanceTable(table, config, false));
+      } catch (TableNotFoundException exception) {
+        rebalanceResult.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(),
+            null, null, null));
+      }
+    });
+    if (context.isDryRun() || context.isDowntime()) {
+      return new TenantRebalanceResult(null, rebalanceResult, context.isVerboseResult());
+    } else {
+      for (String table : rebalanceResult.keySet()) {
+        RebalanceResult result = rebalanceResult.get(table);
+        if (result.getStatus() == RebalanceResult.Status.DONE) {
+          rebalanceResult.put(table, new RebalanceResult(result.getJobId(), RebalanceResult.Status.IN_PROGRESS,
+              "In progress, check controller task status for the", result.getInstanceAssignment(),
+              result.getTierInstanceAssignment(), result.getSegmentAssignment()));
+        }
+      }
+    }
+
+    String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+    TenantRebalanceObserver observer = new ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, context.getTenantName(),
+        tables, _pinotHelixResourceManager);
+    observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, null);
+    Set<String> parallelTables;
+    Set<String> sequentialTables;
+    final ConcurrentLinkedQueue<String> parallelQueue;
+    try {
+      if (context.getDegreeOfParallelism() > 1) {
+        if (!context.getParallelWhitelist().isEmpty()) {
+          parallelTables = new HashSet<>(context.getParallelWhitelist());
+        } else {
+          parallelTables = new HashSet<>(tables);
+        }
+        if (!context.getParallelBlacklist().isEmpty()) {
+          parallelTables = Sets.difference(parallelTables, context.getParallelBlacklist());
+        }
+        parallelQueue = new ConcurrentLinkedQueue<>(parallelTables);
+        for (int i = 0; i < context.getDegreeOfParallelism(); i++) {
+          _executorService.submit(() -> {
+            try {
+              while (true) {
+                String table = parallelQueue.remove();
+                Configuration config = extractRebalanceConfig(context);
+                config.setProperty(RebalanceConfigConstants.DRY_RUN, false);
+                config.setProperty(RebalanceConfigConstants.JOB_ID, rebalanceResult.get(table).getJobId());
+                rebalanceTable(table, config, observer);
+              }
+            } catch (NoSuchElementException ignore) {
+            }
+          });
+        }
+        sequentialTables = Sets.difference(tables, parallelTables);
+      } else {
+        sequentialTables = new HashSet<>(tables);
+        parallelQueue = new ConcurrentLinkedQueue<>();
+      }
+      _executorService.submit(() -> {
+        while (!parallelQueue.isEmpty()) {
+          try {
+            Thread.sleep(5000);

Review Comment:
   This will hog a thread from the shared executorService. We should yield thread for other tasks while waiting for some condition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #11128: Tenant rebalance and status tracking APIs

Posted by "saurabhd336 (via GitHub)" <gi...@apache.org>.
saurabhd336 commented on code in PR #11128:
URL: https://github.com/apache/pinot/pull/11128#discussion_r1269249674


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultTenantRebalancer implements TenantRebalancer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTenantRebalancer.class);
+  PinotHelixResourceManager _pinotHelixResourceManager;
+  ExecutorService _executorService;
+
+  public DefaultTenantRebalancer(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _executorService = executorService;
+  }
+
+  @Override
+  public TenantRebalanceResult rebalance(TenantRebalanceContext context) {
+    Map<String, RebalanceResult> rebalanceResult = new HashMap<>();
+    Set<String> tables = getTenantTables(context.getTenantName());
+    tables.forEach(table -> {
+      try {
+        Configuration config = extractRebalanceConfig(context);
+        config.setProperty(RebalanceConfigConstants.DRY_RUN, true);
+        rebalanceResult.put(table, _pinotHelixResourceManager.rebalanceTable(table, config, false));
+      } catch (TableNotFoundException exception) {
+        rebalanceResult.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(),
+            null, null, null));
+      }
+    });
+    if (context.isDryRun() || context.isDowntime()) {
+      return new TenantRebalanceResult(null, rebalanceResult, context.isVerboseResult());
+    } else {
+      for (String table : rebalanceResult.keySet()) {
+        RebalanceResult result = rebalanceResult.get(table);
+        if (result.getStatus() == RebalanceResult.Status.DONE) {
+          rebalanceResult.put(table, new RebalanceResult(result.getJobId(), RebalanceResult.Status.IN_PROGRESS,
+              "In progress, check controller task status for the", result.getInstanceAssignment(),
+              result.getTierInstanceAssignment(), result.getSegmentAssignment()));
+        }
+      }
+    }
+
+    String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+    TenantRebalanceObserver observer = new ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, context.getTenantName(),
+        tables, _pinotHelixResourceManager);
+    observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, null);
+    Set<String> parallelTables;
+    Set<String> sequentialTables;
+    final ConcurrentLinkedQueue<String> parallelQueue;
+    try {
+      if (context.getDegreeOfParallelism() > 1) {
+        if (!context.getParallelWhitelist().isEmpty()) {
+          parallelTables = new HashSet<>(context.getParallelWhitelist());
+        } else {
+          parallelTables = new HashSet<>(tables);
+        }
+        if (!context.getParallelBlacklist().isEmpty()) {
+          parallelTables = Sets.difference(parallelTables, context.getParallelBlacklist());
+        }
+        parallelQueue = new ConcurrentLinkedQueue<>(parallelTables);
+        for (int i = 0; i < context.getDegreeOfParallelism(); i++) {
+          _executorService.submit(() -> {
+            try {
+              while (true) {
+                String table = parallelQueue.remove();
+                Configuration config = extractRebalanceConfig(context);
+                config.setProperty(RebalanceConfigConstants.DRY_RUN, false);
+                config.setProperty(RebalanceConfigConstants.JOB_ID, rebalanceResult.get(table).getJobId());
+                rebalanceTable(table, config, observer);
+              }
+            } catch (NoSuchElementException ignore) {

Review Comment:
   We should break out of the while loop explicitly and not by throwing an exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #11128: Tenant rebalance and status tracking APIs

Posted by "saurabhd336 (via GitHub)" <gi...@apache.org>.
saurabhd336 commented on code in PR #11128:
URL: https://github.com/apache/pinot/pull/11128#discussion_r1274842035


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -103,6 +112,9 @@ public class PinotTenantRestletResource {
   @Inject
   ControllerMetrics _controllerMetrics;
 
+  @Inject
+  ExecutorService _executorService;

Review Comment:
   I'm not sure if using the same shared executorService (which is also being used to handle multi get API call responses) for parallelly running multiple blocking operation like table rebalance is a good idea.. Although I do see same executorService being used to run rebalance for single table.
   Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] snleee merged pull request #11128: Tenant rebalance and status tracking APIs

Posted by "snleee (via GitHub)" <gi...@apache.org>.
snleee merged PR #11128:
URL: https://github.com/apache/pinot/pull/11128


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] shounakmk219 commented on a diff in pull request #11128: Tenant rebalance and status tracking APIs

Posted by "shounakmk219 (via GitHub)" <gi...@apache.org>.
shounakmk219 commented on code in PR #11128:
URL: https://github.com/apache/pinot/pull/11128#discussion_r1287412875


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java:
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+@ApiModel
+public class RebalanceContext {
+  // TODO : simplify the rebalance configs wherever possible
+  @JsonProperty("dryRun")
+  @ApiModelProperty(example = "false")
+  private Boolean _dryRun = false;
+  @JsonProperty("reassignInstances")
+  @ApiModelProperty(example = "false")
+  private Boolean _reassignInstances = false;
+  @JsonProperty("includeConsuming")
+  @ApiModelProperty(example = "false")
+  private Boolean _includeConsuming = false;

Review Comment:
   make it true by default



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] shounakmk219 commented on a diff in pull request #11128: Tenant rebalance and status tracking APIs

Posted by "shounakmk219 (via GitHub)" <gi...@apache.org>.
shounakmk219 commented on code in PR #11128:
URL: https://github.com/apache/pinot/pull/11128#discussion_r1285714926


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java:
##########
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+@ApiModel
+public class RebalanceContext {
+  @JsonProperty("dryRun")
+  @ApiModelProperty(example = "false")
+  private Boolean _dryRun = false;

Review Comment:
   On the table rebalance API its false by default. I am keeping all the configs and their defaults inline with the table rebalance API so that user gets consistent behaviour for similar APIs. But I agree with you point as tenant rebalance operation will have a bigger impact radius. For now I am inclined to keep the consistency with table rebalance API but let me know if you think otherwise and we can make `dryRun=true` by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] shounakmk219 commented on a diff in pull request #11128: Tenant rebalance and status tracking APIs

Posted by "shounakmk219 (via GitHub)" <gi...@apache.org>.
shounakmk219 commented on code in PR #11128:
URL: https://github.com/apache/pinot/pull/11128#discussion_r1274574539


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultTenantRebalancer implements TenantRebalancer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTenantRebalancer.class);
+  PinotHelixResourceManager _pinotHelixResourceManager;
+  ExecutorService _executorService;
+
+  public DefaultTenantRebalancer(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _executorService = executorService;
+  }
+
+  @Override
+  public TenantRebalanceResult rebalance(TenantRebalanceContext context) {
+    Map<String, RebalanceResult> rebalanceResult = new HashMap<>();
+    Set<String> tables = getTenantTables(context.getTenantName());
+    tables.forEach(table -> {
+      try {
+        Configuration config = extractRebalanceConfig(context);
+        config.setProperty(RebalanceConfigConstants.DRY_RUN, true);
+        rebalanceResult.put(table, _pinotHelixResourceManager.rebalanceTable(table, config, false));
+      } catch (TableNotFoundException exception) {
+        rebalanceResult.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(),
+            null, null, null));
+      }
+    });
+    if (context.isDryRun() || context.isDowntime()) {
+      return new TenantRebalanceResult(null, rebalanceResult, context.isVerboseResult());
+    } else {
+      for (String table : rebalanceResult.keySet()) {
+        RebalanceResult result = rebalanceResult.get(table);
+        if (result.getStatus() == RebalanceResult.Status.DONE) {
+          rebalanceResult.put(table, new RebalanceResult(result.getJobId(), RebalanceResult.Status.IN_PROGRESS,
+              "In progress, check controller task status for the", result.getInstanceAssignment(),
+              result.getTierInstanceAssignment(), result.getSegmentAssignment()));
+        }
+      }
+    }
+
+    String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+    TenantRebalanceObserver observer = new ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, context.getTenantName(),
+        tables, _pinotHelixResourceManager);
+    observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, null);
+    Set<String> parallelTables;
+    Set<String> sequentialTables;
+    final ConcurrentLinkedQueue<String> parallelQueue;
+    try {
+      if (context.getDegreeOfParallelism() > 1) {
+        if (!context.getParallelWhitelist().isEmpty()) {
+          parallelTables = new HashSet<>(context.getParallelWhitelist());
+        } else {
+          parallelTables = new HashSet<>(tables);
+        }
+        if (!context.getParallelBlacklist().isEmpty()) {
+          parallelTables = Sets.difference(parallelTables, context.getParallelBlacklist());
+        }
+        parallelQueue = new ConcurrentLinkedQueue<>(parallelTables);
+        for (int i = 0; i < context.getDegreeOfParallelism(); i++) {
+          _executorService.submit(() -> {
+            try {
+              while (true) {
+                String table = parallelQueue.remove();
+                Configuration config = extractRebalanceConfig(context);
+                config.setProperty(RebalanceConfigConstants.DRY_RUN, false);
+                config.setProperty(RebalanceConfigConstants.JOB_ID, rebalanceResult.get(table).getJobId());
+                rebalanceTable(table, config, observer);
+              }
+            } catch (NoSuchElementException ignore) {
+            }
+          });
+        }
+        sequentialTables = Sets.difference(tables, parallelTables);
+      } else {
+        sequentialTables = new HashSet<>(tables);
+        parallelQueue = new ConcurrentLinkedQueue<>();
+      }
+      _executorService.submit(() -> {
+        while (!parallelQueue.isEmpty()) {
+          try {
+            Thread.sleep(5000);

Review Comment:
   Hey @saurabhd336 thanks for pointing this out, I have made changes to avoid the thread hogging. Now the last active thread from the parallel threads will pick up the job to rebalance the sequential tables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #11128: Tenant rebalance and status tracking APIs

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #11128:
URL: https://github.com/apache/pinot/pull/11128#issuecomment-1640116971

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/11128?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#11128](https://app.codecov.io/gh/apache/pinot/pull/11128?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (b695686) into [master](https://app.codecov.io/gh/apache/pinot/commit/f7a076496ae6e07a42207cb2c978dc74562cf0cf?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (f7a0764) will **increase** coverage by `0.00%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@            Coverage Diff            @@
   ##           master   #11128     +/-   ##
   =========================================
     Coverage    0.11%    0.11%             
   =========================================
     Files        2200     2156     -44     
     Lines      118829   115977   -2852     
     Branches    18023    17596    -427     
   =========================================
     Hits          137      137             
   + Misses     118672   115820   -2852     
     Partials       20       20             
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1temurin11 | `?` | |
   | integration1temurin17 | `?` | |
   | integration1temurin20 | `?` | |
   | integration2temurin17 | `?` | |
   | integration2temurin20 | `?` | |
   | unittests1temurin11 | `?` | |
   | unittests1temurin17 | `?` | |
   | unittests1temurin20 | `?` | |
   | unittests2temurin11 | `?` | |
   | unittests2temurin17 | `0.11% <0.00%> (+<0.01%)` | :arrow_up: |
   | unittests2temurin20 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://app.codecov.io/gh/apache/pinot/pull/11128?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...mmon/metadata/controllerjob/ControllerJobType.java](https://app.codecov.io/gh/apache/pinot/pull/11128?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0YWRhdGEvY29udHJvbGxlcmpvYi9Db250cm9sbGVySm9iVHlwZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ller/api/resources/PinotTenantRestletResource.java](https://app.codecov.io/gh/apache/pinot/pull/11128?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvcmVzb3VyY2VzL1Bpbm90VGVuYW50UmVzdGxldFJlc291cmNlLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...pi/resources/TenantRebalanceJobStatusResponse.java](https://app.codecov.io/gh/apache/pinot/pull/11128?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvcmVzb3VyY2VzL1RlbmFudFJlYmFsYW5jZUpvYlN0YXR1c1Jlc3BvbnNlLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...troller/helix/core/rebalance/RebalanceContext.java](https://app.codecov.io/gh/apache/pinot/pull/11128?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL3JlYmFsYW5jZS9SZWJhbGFuY2VDb250ZXh0LmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...ntroller/helix/core/rebalance/RebalanceResult.java](https://app.codecov.io/gh/apache/pinot/pull/11128?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL3JlYmFsYW5jZS9SZWJhbGFuY2VSZXN1bHQuamF2YQ==) | `0.00% <ø> (ø)` | |
   | [...core/rebalance/tenant/DefaultTenantRebalancer.java](https://app.codecov.io/gh/apache/pinot/pull/11128?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL3JlYmFsYW5jZS90ZW5hbnQvRGVmYXVsdFRlbmFudFJlYmFsYW5jZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [.../core/rebalance/tenant/TenantRebalanceContext.java](https://app.codecov.io/gh/apache/pinot/pull/11128?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL3JlYmFsYW5jZS90ZW5hbnQvVGVuYW50UmViYWxhbmNlQ29udGV4dC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...core/rebalance/tenant/TenantRebalanceObserver.java](https://app.codecov.io/gh/apache/pinot/pull/11128?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL3JlYmFsYW5jZS90ZW5hbnQvVGVuYW50UmViYWxhbmNlT2JzZXJ2ZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...rebalance/tenant/TenantRebalanceProgressStats.java](https://app.codecov.io/gh/apache/pinot/pull/11128?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL3JlYmFsYW5jZS90ZW5hbnQvVGVuYW50UmViYWxhbmNlUHJvZ3Jlc3NTdGF0cy5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...x/core/rebalance/tenant/TenantRebalanceResult.java](https://app.codecov.io/gh/apache/pinot/pull/11128?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL3JlYmFsYW5jZS90ZW5hbnQvVGVuYW50UmViYWxhbmNlUmVzdWx0LmphdmE=) | `0.00% <0.00%> (ø)` | |
   | ... and [2 more](https://app.codecov.io/gh/apache/pinot/pull/11128?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | |
   
   ... and [114 files with indirect coverage changes](https://app.codecov.io/gh/apache/pinot/pull/11128/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] shounakmk219 commented on a diff in pull request #11128: Tenant rebalance and status tracking APIs

Posted by "shounakmk219 (via GitHub)" <gi...@apache.org>.
shounakmk219 commented on code in PR #11128:
URL: https://github.com/apache/pinot/pull/11128#discussion_r1269557502


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultTenantRebalancer implements TenantRebalancer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTenantRebalancer.class);
+  PinotHelixResourceManager _pinotHelixResourceManager;
+  ExecutorService _executorService;
+
+  public DefaultTenantRebalancer(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _executorService = executorService;
+  }
+
+  @Override
+  public TenantRebalanceResult rebalance(TenantRebalanceContext context) {
+    Map<String, RebalanceResult> rebalanceResult = new HashMap<>();
+    Set<String> tables = getTenantTables(context.getTenantName());
+    tables.forEach(table -> {
+      try {
+        Configuration config = extractRebalanceConfig(context);
+        config.setProperty(RebalanceConfigConstants.DRY_RUN, true);
+        rebalanceResult.put(table, _pinotHelixResourceManager.rebalanceTable(table, config, false));
+      } catch (TableNotFoundException exception) {
+        rebalanceResult.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(),
+            null, null, null));
+      }
+    });
+    if (context.isDryRun() || context.isDowntime()) {
+      return new TenantRebalanceResult(null, rebalanceResult, context.isVerboseResult());
+    } else {
+      for (String table : rebalanceResult.keySet()) {
+        RebalanceResult result = rebalanceResult.get(table);
+        if (result.getStatus() == RebalanceResult.Status.DONE) {
+          rebalanceResult.put(table, new RebalanceResult(result.getJobId(), RebalanceResult.Status.IN_PROGRESS,
+              "In progress, check controller task status for the", result.getInstanceAssignment(),
+              result.getTierInstanceAssignment(), result.getSegmentAssignment()));
+        }
+      }
+    }
+
+    String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+    TenantRebalanceObserver observer = new ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, context.getTenantName(),
+        tables, _pinotHelixResourceManager);
+    observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, null);
+    Set<String> parallelTables;
+    Set<String> sequentialTables;
+    final ConcurrentLinkedQueue<String> parallelQueue;
+    try {
+      if (context.getDegreeOfParallelism() > 1) {
+        if (!context.getParallelWhitelist().isEmpty()) {
+          parallelTables = new HashSet<>(context.getParallelWhitelist());
+        } else {
+          parallelTables = new HashSet<>(tables);
+        }
+        if (!context.getParallelBlacklist().isEmpty()) {
+          parallelTables = Sets.difference(parallelTables, context.getParallelBlacklist());
+        }
+        parallelQueue = new ConcurrentLinkedQueue<>(parallelTables);
+        for (int i = 0; i < context.getDegreeOfParallelism(); i++) {
+          _executorService.submit(() -> {
+            try {
+              while (true) {
+                String table = parallelQueue.remove();
+                Configuration config = extractRebalanceConfig(context);
+                config.setProperty(RebalanceConfigConstants.DRY_RUN, false);
+                config.setProperty(RebalanceConfigConstants.JOB_ID, rebalanceResult.get(table).getJobId());
+                rebalanceTable(table, config, observer);
+              }
+            } catch (NoSuchElementException ignore) {

Review Comment:
   Will change it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #11128: Tenant rebalance and status tracking APIs

Posted by "saurabhd336 (via GitHub)" <gi...@apache.org>.
saurabhd336 commented on code in PR #11128:
URL: https://github.com/apache/pinot/pull/11128#discussion_r1274817803


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultTenantRebalancer implements TenantRebalancer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTenantRebalancer.class);
+  PinotHelixResourceManager _pinotHelixResourceManager;
+  ExecutorService _executorService;
+
+  public DefaultTenantRebalancer(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _executorService = executorService;
+  }
+
+  @Override
+  public TenantRebalanceResult rebalance(TenantRebalanceContext context) {
+    Map<String, RebalanceResult> rebalanceResult = new HashMap<>();
+    Set<String> tables = getTenantTables(context.getTenantName());
+    tables.forEach(table -> {
+      try {
+        Configuration config = extractRebalanceConfig(context);
+        config.setProperty(RebalanceConfigConstants.DRY_RUN, true);
+        rebalanceResult.put(table, _pinotHelixResourceManager.rebalanceTable(table, config, false));
+      } catch (TableNotFoundException exception) {
+        rebalanceResult.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(),
+            null, null, null));
+      }
+    });
+    if (context.isDryRun() || context.isDowntime()) {
+      return new TenantRebalanceResult(null, rebalanceResult, context.isVerboseResult());
+    } else {
+      for (String table : rebalanceResult.keySet()) {
+        RebalanceResult result = rebalanceResult.get(table);
+        if (result.getStatus() == RebalanceResult.Status.DONE) {
+          rebalanceResult.put(table, new RebalanceResult(result.getJobId(), RebalanceResult.Status.IN_PROGRESS,
+              "In progress, check controller task status for the", result.getInstanceAssignment(),
+              result.getTierInstanceAssignment(), result.getSegmentAssignment()));
+        }
+      }
+    }
+
+    String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+    TenantRebalanceObserver observer = new ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, context.getTenantName(),
+        tables, _pinotHelixResourceManager);
+    observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, null);
+    Set<String> parallelTables;
+    Set<String> sequentialTables;
+    final ConcurrentLinkedQueue<String> parallelQueue;
+    // ensure atleast 1 thread is created to run the sequential table rebalance operations
+    int parallelism = Math.max(context.getDegreeOfParallelism(), 1);
+    AtomicInteger activeThreads = new AtomicInteger(parallelism);
+    try {
+      if (parallelism > 1) {
+        if (!context.getParallelWhitelist().isEmpty()) {
+          parallelTables = new HashSet<>(context.getParallelWhitelist());
+        } else {
+          parallelTables = new HashSet<>(tables);
+        }
+        if (!context.getParallelBlacklist().isEmpty()) {
+          parallelTables = Sets.difference(parallelTables, context.getParallelBlacklist());
+        }
+        parallelQueue = new ConcurrentLinkedQueue<>(parallelTables);
+        sequentialTables = Sets.difference(tables, parallelTables);
+      } else {
+        sequentialTables = new HashSet<>(tables);
+        parallelQueue = new ConcurrentLinkedQueue<>();
+      }
+      for (int i = 0; i < parallelism; i++) {
+        _executorService.submit(() -> {
+          try {
+            while (!parallelQueue.isEmpty()) {
+              String table = parallelQueue.remove();

Review Comment:
   This is not atomic. String table can be null after 108



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #11128: Tenant rebalance and status tracking APIs

Posted by "saurabhd336 (via GitHub)" <gi...@apache.org>.
saurabhd336 commented on code in PR #11128:
URL: https://github.com/apache/pinot/pull/11128#discussion_r1274829847


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultTenantRebalancer implements TenantRebalancer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTenantRebalancer.class);
+  PinotHelixResourceManager _pinotHelixResourceManager;
+  ExecutorService _executorService;
+
+  public DefaultTenantRebalancer(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _executorService = executorService;
+  }
+
+  @Override
+  public TenantRebalanceResult rebalance(TenantRebalanceContext context) {
+    Map<String, RebalanceResult> rebalanceResult = new HashMap<>();
+    Set<String> tables = getTenantTables(context.getTenantName());
+    tables.forEach(table -> {
+      try {
+        Configuration config = extractRebalanceConfig(context);
+        config.setProperty(RebalanceConfigConstants.DRY_RUN, true);
+        rebalanceResult.put(table, _pinotHelixResourceManager.rebalanceTable(table, config, false));
+      } catch (TableNotFoundException exception) {
+        rebalanceResult.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(),
+            null, null, null));
+      }
+    });
+    if (context.isDryRun() || context.isDowntime()) {
+      return new TenantRebalanceResult(null, rebalanceResult, context.isVerboseResult());
+    } else {
+      for (String table : rebalanceResult.keySet()) {
+        RebalanceResult result = rebalanceResult.get(table);
+        if (result.getStatus() == RebalanceResult.Status.DONE) {
+          rebalanceResult.put(table, new RebalanceResult(result.getJobId(), RebalanceResult.Status.IN_PROGRESS,
+              "In progress, check controller task status for the", result.getInstanceAssignment(),
+              result.getTierInstanceAssignment(), result.getSegmentAssignment()));
+        }
+      }
+    }
+
+    String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+    TenantRebalanceObserver observer = new ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, context.getTenantName(),
+        tables, _pinotHelixResourceManager);
+    observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, null);
+    Set<String> parallelTables;
+    Set<String> sequentialTables;
+    final ConcurrentLinkedQueue<String> parallelQueue;
+    // ensure atleast 1 thread is created to run the sequential table rebalance operations
+    int parallelism = Math.max(context.getDegreeOfParallelism(), 1);
+    AtomicInteger activeThreads = new AtomicInteger(parallelism);
+    try {
+      if (parallelism > 1) {
+        if (!context.getParallelWhitelist().isEmpty()) {
+          parallelTables = new HashSet<>(context.getParallelWhitelist());
+        } else {
+          parallelTables = new HashSet<>(tables);
+        }
+        if (!context.getParallelBlacklist().isEmpty()) {
+          parallelTables = Sets.difference(parallelTables, context.getParallelBlacklist());
+        }
+        parallelQueue = new ConcurrentLinkedQueue<>(parallelTables);
+        sequentialTables = Sets.difference(tables, parallelTables);
+      } else {
+        sequentialTables = new HashSet<>(tables);
+        parallelQueue = new ConcurrentLinkedQueue<>();
+      }
+      for (int i = 0; i < parallelism; i++) {
+        _executorService.submit(() -> {
+          try {
+            while (!parallelQueue.isEmpty()) {
+              String table = parallelQueue.remove();
+              Configuration config = extractRebalanceConfig(context);
+              config.setProperty(RebalanceConfigConstants.DRY_RUN, false);
+              config.setProperty(RebalanceConfigConstants.JOB_ID, rebalanceResult.get(table).getJobId());
+              rebalanceTable(table, config, observer);
+            }
+          } catch (NoSuchElementException ignore) {
+            // race condition between queue empty check and last element being dequeued
+          }
+          // Last parallel thread to finish the table rebalance job will pick up the
+          // sequential table rebalance execution
+          if (activeThreads.decrementAndGet() == 0) {

Review Comment:
   clever! 👍 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultTenantRebalancer implements TenantRebalancer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTenantRebalancer.class);
+  PinotHelixResourceManager _pinotHelixResourceManager;
+  ExecutorService _executorService;
+
+  public DefaultTenantRebalancer(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _executorService = executorService;
+  }
+
+  @Override
+  public TenantRebalanceResult rebalance(TenantRebalanceContext context) {
+    Map<String, RebalanceResult> rebalanceResult = new HashMap<>();
+    Set<String> tables = getTenantTables(context.getTenantName());
+    tables.forEach(table -> {
+      try {
+        Configuration config = extractRebalanceConfig(context);
+        config.setProperty(RebalanceConfigConstants.DRY_RUN, true);
+        rebalanceResult.put(table, _pinotHelixResourceManager.rebalanceTable(table, config, false));
+      } catch (TableNotFoundException exception) {
+        rebalanceResult.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(),
+            null, null, null));
+      }
+    });
+    if (context.isDryRun() || context.isDowntime()) {
+      return new TenantRebalanceResult(null, rebalanceResult, context.isVerboseResult());
+    } else {
+      for (String table : rebalanceResult.keySet()) {
+        RebalanceResult result = rebalanceResult.get(table);
+        if (result.getStatus() == RebalanceResult.Status.DONE) {
+          rebalanceResult.put(table, new RebalanceResult(result.getJobId(), RebalanceResult.Status.IN_PROGRESS,
+              "In progress, check controller task status for the", result.getInstanceAssignment(),
+              result.getTierInstanceAssignment(), result.getSegmentAssignment()));
+        }
+      }
+    }
+
+    String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+    TenantRebalanceObserver observer = new ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, context.getTenantName(),
+        tables, _pinotHelixResourceManager);
+    observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, null);
+    Set<String> parallelTables;
+    Set<String> sequentialTables;
+    final ConcurrentLinkedQueue<String> parallelQueue;
+    // ensure atleast 1 thread is created to run the sequential table rebalance operations
+    int parallelism = Math.max(context.getDegreeOfParallelism(), 1);
+    AtomicInteger activeThreads = new AtomicInteger(parallelism);
+    try {
+      if (parallelism > 1) {
+        if (!context.getParallelWhitelist().isEmpty()) {
+          parallelTables = new HashSet<>(context.getParallelWhitelist());
+        } else {
+          parallelTables = new HashSet<>(tables);
+        }
+        if (!context.getParallelBlacklist().isEmpty()) {
+          parallelTables = Sets.difference(parallelTables, context.getParallelBlacklist());
+        }
+        parallelQueue = new ConcurrentLinkedQueue<>(parallelTables);
+        sequentialTables = Sets.difference(tables, parallelTables);
+      } else {
+        sequentialTables = new HashSet<>(tables);
+        parallelQueue = new ConcurrentLinkedQueue<>();
+      }
+      for (int i = 0; i < parallelism; i++) {
+        _executorService.submit(() -> {
+          try {
+            while (!parallelQueue.isEmpty()) {
+              String table = parallelQueue.remove();
+              Configuration config = extractRebalanceConfig(context);
+              config.setProperty(RebalanceConfigConstants.DRY_RUN, false);
+              config.setProperty(RebalanceConfigConstants.JOB_ID, rebalanceResult.get(table).getJobId());
+              rebalanceTable(table, config, observer);
+            }
+          } catch (NoSuchElementException ignore) {
+            // race condition between queue empty check and last element being dequeued
+          }
+          // Last parallel thread to finish the table rebalance job will pick up the
+          // sequential table rebalance execution
+          if (activeThreads.decrementAndGet() == 0) {

Review Comment:
   clever! 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org