You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/07/10 05:32:25 UTC
[incubator-pinot] 01/01: Improve controller tests
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch helix_debug
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 5b9cfaff5aabe4195220191902b5f4cfb5d58e87
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Tue Jul 9 22:32:04 2019 -0700
Improve controller tests
---
.../broker/broker/helix/ClusterChangeMediator.java | 12 +-
.../apache/pinot/controller/ControllerConf.java | 22 +--
.../apache/pinot/controller/ControllerStarter.java | 15 +-
.../api/resources/PinotControllerHealthCheck.java | 4 +-
.../resources/PinotTableConfigRestletResource.java | 2 +
.../api/resources/PinotTableRestletResource.java | 10 +-
.../helix/core/PinotHelixResourceManager.java | 33 ++--
.../helix/core/SegmentDeletionManager.java | 17 +-
.../helix/core/util/HelixSetupUtils.java | 180 ++++++++-------------
.../controller/validation/StorageQuotaChecker.java | 12 +-
.../helix/ControllerPeriodicTaskStarterTest.java | 23 +--
.../pinot/controller/helix/ControllerTest.java | 47 ++----
.../controller/helix/PinotControllerModeTest.java | 10 +-
.../tests/BaseClusterIntegrationTest.java | 3 +-
.../tests/OfflineClusterIntegrationTest.java | 13 +-
15 files changed, 145 insertions(+), 258 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
index 64b2a1e..72be04d 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
@@ -62,7 +62,7 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
private final Thread _clusterChangeHandlingThread;
- private volatile boolean _stopped = false;
+ private boolean _stopped = false;
public ClusterChangeMediator(Map<ChangeType, List<ClusterChangeHandler>> changeHandlersMap,
BrokerMetrics brokerMetrics) {
@@ -145,7 +145,7 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
/**
* Starts the cluster change mediator.
*/
- public void start() {
+ public synchronized void start() {
LOGGER.info("Starting the cluster change handling thread");
_clusterChangeHandlingThread.start();
}
@@ -153,7 +153,7 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
/**
* Stops the cluster change mediator.
*/
- public void stop() {
+ public synchronized void stop() {
LOGGER.info("Stopping the cluster change handling thread");
_stopped = true;
synchronized (_lastChangeTimeMap) {
@@ -197,7 +197,11 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan
*
* @param changeType Type of the change
*/
- private void enqueueChange(ChangeType changeType) {
+ private synchronized void enqueueChange(ChangeType changeType) {
+ // Do not enqueue changes if already stopped
+ if (_stopped) {
+ return;
+ }
if (_clusterChangeHandlingThread.isAlive()) {
LOGGER.info("Enqueue {} change", changeType);
synchronized (_lastChangeTimeMap) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 697fcf3..414d3bc 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -21,7 +21,6 @@ package org.apache.pinot.controller;
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.net.URI;
-import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
@@ -59,9 +58,7 @@ public class ControllerConf extends PropertiesConfiguration {
private static final String CONTROLLER_MODE = "controller.mode";
public enum ControllerMode {
- DUAL,
- PINOT_ONLY,
- HELIX_ONLY
+ DUAL, PINOT_ONLY, HELIX_ONLY
}
public static class ControllerPeriodicTasksConf {
@@ -175,22 +172,17 @@ public class ControllerConf extends PropertiesConfiguration {
* Returns the URI for the given path, appends the local (file) scheme to the URI if no scheme exists.
*/
public static URI getUriFromPath(String path) {
- try {
- URI uri = new URI(path);
- if (uri.getScheme() != null) {
- return uri;
- } else {
- return new URI(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME, path, null);
- }
- } catch (URISyntaxException e) {
- LOGGER.error("Could not construct uri from path {}", path);
- throw new RuntimeException(e);
+ URI uri = URI.create(path);
+ if (uri.getScheme() == null) {
+ uri = URI.create(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME + ":" + path);
}
+ return uri;
}
public static URI constructSegmentLocation(String baseDataDir, String tableName, String segmentName) {
try {
- return getUriFromPath(StringUtil.join(File.separator, baseDataDir, tableName, URLEncoder.encode(segmentName, "UTF-8")));
+ return getUriFromPath(
+ StringUtil.join(File.separator, baseDataDir, tableName, URLEncoder.encode(segmentName, "UTF-8")));
} catch (UnsupportedEncodingException e) {
LOGGER
.error("Could not construct segment location with baseDataDir {}, tableName {}, segmentName {}", baseDataDir,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index c313616..ac7dbff 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -19,6 +19,7 @@
package org.apache.pinot.controller;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.yammer.metrics.core.MetricsRegistry;
@@ -236,12 +237,10 @@ public class ControllerStarter {
}
private void setUpPinotController() {
- // Note: Right now we don't allow pinot-only mode to be used in production yet.
- // Now we only have this mode used in tests.
- // TODO: Remove this logic once all the helix separation PRs are committed.
- if (_controllerMode == ControllerConf.ControllerMode.PINOT_ONLY && !isPinotOnlyModeSupported()) {
- throw new RuntimeException("Pinot only controller currently isn't supported in production yet.");
- }
+ // Note: Right now we don't allow Pinot-only controller as ControllerLeadershipManager is setup in Helix controller
+ // and Pinot controller relies on it
+ // TODO: Remove ControllerLeadershipManager
+ Preconditions.checkState(_controllerLeadershipManager != null);
// Set up Pinot cluster in Helix
HelixSetupUtils.setupPinotCluster(_helixClusterName, _helixZkURL, _isUpdateStateModel, _enableBatchMessageMode);
@@ -506,10 +505,6 @@ public class ControllerStarter {
}
}
- public boolean isPinotOnlyModeSupported() {
- return false;
- }
-
public MetricsRegistry getMetricsRegistry() {
return _metricsRegistry;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
index e6c5182..7370085 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
@@ -32,14 +32,14 @@ import org.apache.pinot.controller.ControllerConf;
@Api(tags = Constants.HEALTH_TAG)
-@Path("/pinot-controller/admin")
+@Path("/")
public class PinotControllerHealthCheck {
@Inject
ControllerConf controllerConf;
@GET
- @Path("/")
+ @Path("pinot-controller/admin")
@ApiOperation(value = "Check controller health")
@ApiResponses(value = {@ApiResponse(code = 200, message = "Good")})
@Produces(MediaType.TEXT_PLAIN)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
index 01d5fdf..5717170 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
@@ -110,6 +110,8 @@ public class PinotTableConfigRestletResource {
.type(MediaType.TEXT_PLAIN_TYPE).build();
}
+ // TODO: Fix the bug - when schema is not configured, after deserialization, CombinedConfig will have a non-null
+ // schema with null schema name
if (config.getSchema() != null) {
_resourceManager.addOrUpdateSchema(config.getSchema());
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index daf6f88..fe3491a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -62,7 +62,6 @@ import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.apache.pinot.core.util.ReplicationUtils;
import org.slf4j.LoggerFactory;
@@ -252,14 +251,16 @@ public class PinotTableRestletResource {
@ApiOperation(value = "Deletes a table", notes = "Deletes a table")
public SuccessResponse deleteTable(
@ApiParam(value = "Name of the table to delete", required = true) @PathParam("tableName") String tableName,
- @ApiParam(value = "realtime|offline", required = false) @QueryParam("type") String tableTypeStr) {
+ @ApiParam(value = "realtime|offline") @QueryParam("type") String tableTypeStr) {
List<String> tablesDeleted = new LinkedList<>();
try {
- if (tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.OFFLINE.name())) {
+ if ((tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.OFFLINE.name()))
+ && !TableNameBuilder.REALTIME.tableHasTypeSuffix(tableName)) {
_pinotHelixResourceManager.deleteOfflineTable(tableName);
tablesDeleted.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
}
- if (tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.REALTIME.name())) {
+ if ((tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.REALTIME.name()))
+ && !TableNameBuilder.OFFLINE.tableHasTypeSuffix(tableName)) {
_pinotHelixResourceManager.deleteRealtimeTable(tableName);
tablesDeleted.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
}
@@ -373,7 +374,6 @@ public class PinotTableRestletResource {
throw new PinotHelixResourceManager.InvalidTableConfigException(errorMsg, e);
}
-
if (verifyReplication) {
int requestReplication;
try {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 5eeeddc..3c0ca0b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -160,10 +160,10 @@ public class PinotHelixResourceManager {
public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
- CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + controllerConf.getControllerHost() + "_"
- + controllerConf.getControllerPort(), controllerConf.getDataDir(),
- controllerConf.getExternalViewOnlineToOfflineTimeout(), controllerConf.tenantIsolationEnabled(),
- controllerConf.getEnableBatchMessageMode(), controllerConf.getHLCTablesAllowed());
+ CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + controllerConf.getControllerHost() + "_" + controllerConf
+ .getControllerPort(), controllerConf.getDataDir(), controllerConf.getExternalViewOnlineToOfflineTimeout(),
+ controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(),
+ controllerConf.getHLCTablesAllowed());
}
/**
@@ -1087,9 +1087,6 @@ public class PinotHelixResourceManager {
// lets add table configs
ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord());
- _propertyStore.create(ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType),
- new ZNRecord(tableNameWithType), AccessOption.PERSISTENT);
-
// Update replica group partition assignment to the property store if applicable
updateReplicaGroupPartitionAssignment(tableConfig);
break;
@@ -1228,9 +1225,8 @@ public class PinotHelixResourceManager {
servers = getInstancesWithTag(realtimeTagConfig.getConsumingServerTag());
}
int numReplicas = ReplicationUtils.getReplication(tableConfig);
- ReplicaGroupPartitionAssignment partitionAssignment =
- partitionAssignmentGenerator.buildReplicaGroupPartitionAssignment(tableNameWithType, tableConfig,
- numReplicas, servers);
+ ReplicaGroupPartitionAssignment partitionAssignment = partitionAssignmentGenerator
+ .buildReplicaGroupPartitionAssignment(tableNameWithType, tableConfig, numReplicas, servers);
partitionAssignmentGenerator.writeReplicaGroupPartitionAssignment(partitionAssignment);
}
}
@@ -1272,7 +1268,8 @@ public class PinotHelixResourceManager {
// Check if HLC table is allowed.
StreamConfig streamConfig = new StreamConfig(indexingConfig.getStreamConfigs());
if (streamConfig.hasHighLevelConsumerType() && !_allowHLCTables) {
- throw new InvalidTableConfigException("Creating HLC realtime table is not allowed for Table: " + tableNameWithType);
+ throw new InvalidTableConfigException(
+ "Creating HLC realtime table is not allowed for Table: " + tableNameWithType);
}
}
@@ -1468,8 +1465,8 @@ public class PinotHelixResourceManager {
LOGGER.info("Deleting table {}: Finish", offlineTableName);
}
- public void deleteRealtimeTable(String rawTableName) {
- String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+ public void deleteRealtimeTable(String tableName) {
+ String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
LOGGER.info("Deleting table {}: Start", realtimeTableName);
// Remove the table from brokerResource
@@ -2071,14 +2068,12 @@ public class PinotHelixResourceManager {
: PinotResourceManagerResponse.failure("Timed out. External view not completely updated");
}
- public boolean hasRealtimeTable(String tableName) {
- String actualTableName = tableName + "_REALTIME";
- return getAllTables().contains(actualTableName);
+ public boolean hasOfflineTable(String tableName) {
+ return getAllResources().contains(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
}
- public boolean hasOfflineTable(String tableName) {
- String actualTableName = tableName + "_OFFLINE";
- return getAllTables().contains(actualTableName);
+ public boolean hasRealtimeTable(String tableName) {
+ return getAllResources().contains(TableNameBuilder.REALTIME.tableNameWithType(tableName));
}
/**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index 46c84c0..6415b06 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -172,16 +172,17 @@ public class SegmentDeletionManager {
}
protected void removeSegmentFromStore(String tableNameWithType, String segmentId) {
- final String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+ // Ignore HLC segments as they are not stored in Pinot FS
+ if (SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
+ return;
+ }
if (_dataDir != null) {
- URI fileToMoveURI;
- PinotFS pinotFS;
- URI dataDirURI = ControllerConf.getUriFromPath(_dataDir);
- fileToMoveURI = ControllerConf.constructSegmentLocation(_dataDir, rawTableName, segmentId);
+ String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+ URI fileToMoveURI = ControllerConf.constructSegmentLocation(_dataDir, rawTableName, segmentId);
URI deletedSegmentDestURI = ControllerConf
.constructSegmentLocation(StringUtil.join(File.separator, _dataDir, DELETED_SEGMENTS), rawTableName,
segmentId);
- pinotFS = PinotFSFactory.create(dataDirURI.getScheme());
+ PinotFS pinotFS = PinotFSFactory.create(fileToMoveURI.getScheme());
try {
if (pinotFS.exists(fileToMoveURI)) {
@@ -197,9 +198,7 @@ public class SegmentDeletionManager {
deletedSegmentDestURI.toString());
}
} else {
- if (!SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
- LOGGER.warn("Not found local segment file for segment {}" + fileToMoveURI.toString());
- }
+ LOGGER.warn("Not found local segment file for segment {}", fileToMoveURI.toString());
}
} catch (IOException e) {
LOGGER.warn("Could not move segment {} from {} to {}", segmentId, fileToMoveURI.toString(),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index 82c4cab..74682c8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -20,16 +20,10 @@ package org.apache.pinot.controller.helix.core.util;
import com.google.common.base.Preconditions;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
import java.util.concurrent.TimeUnit;
-import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.ZNRecord;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -37,16 +31,15 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator;
import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
@@ -102,150 +95,103 @@ public class HelixSetupUtils {
*/
public static void setupPinotCluster(String helixClusterName, String zkPath, boolean isUpdateStateModel,
boolean enableBatchMessageMode) {
- final HelixAdmin admin = new ZKHelixAdmin(zkPath);
- Preconditions.checkState(admin.getClusters().contains(helixClusterName),
- String.format("Helix cluster: %s hasn't been set up", helixClusterName));
-
- // Add segment state model definition if needed
- addSegmentStateModelDefinitionIfNeeded(helixClusterName, admin, zkPath, isUpdateStateModel);
-
- // Add broker resource if needed
- createBrokerResourceIfNeeded(helixClusterName, admin, enableBatchMessageMode);
-
- // Add lead controller resource if needed
- createLeadControllerResourceIfNeeded(helixClusterName, admin, enableBatchMessageMode);
-
- // Init property store if needed
- initPropertyStoreIfNeeded(helixClusterName, zkPath);
+ HelixZkClient zkClient = null;
+ try {
+ zkClient = SharedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(zkPath),
+ new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()).setConnectInitTimeout(
+ TimeUnit.SECONDS.toMillis(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC)));
+ zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS);
+ HelixAdmin helixAdmin = new ZKHelixAdmin(zkClient);
+ HelixDataAccessor helixDataAccessor =
+ new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<>(zkClient));
+
+ Preconditions.checkState(helixAdmin.getClusters().contains(helixClusterName),
+ String.format("Helix cluster: %s hasn't been set up", helixClusterName));
+
+ // Add segment state model definition if needed
+ addSegmentStateModelDefinitionIfNeeded(helixClusterName, helixAdmin, helixDataAccessor, isUpdateStateModel);
+
+ // Add broker resource if needed
+ createBrokerResourceIfNeeded(helixClusterName, helixAdmin, enableBatchMessageMode);
+
+ // Add lead controller resource if needed
+ createLeadControllerResourceIfNeeded(helixClusterName, helixAdmin, enableBatchMessageMode);
+ } finally {
+ if (zkClient != null) {
+ zkClient.close();
+ }
+ }
}
- private static void addSegmentStateModelDefinitionIfNeeded(String helixClusterName, HelixAdmin admin, String zkPath,
- boolean isUpdateStateModel) {
- final String segmentStateModelName =
+ private static void addSegmentStateModelDefinitionIfNeeded(String helixClusterName, HelixAdmin helixAdmin,
+ HelixDataAccessor helixDataAccessor, boolean isUpdateStateModel) {
+ String segmentStateModelName =
PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
- StateModelDefinition stateModelDefinition = admin.getStateModelDef(helixClusterName, segmentStateModelName);
- if (stateModelDefinition == null) {
- LOGGER.info("Adding state model {} (with CONSUMED state) generated using {}", segmentStateModelName,
- PinotHelixSegmentOnlineOfflineStateModelGenerator.class.toString());
- admin.addStateModelDef(helixClusterName, segmentStateModelName,
- PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
- } else if (isUpdateStateModel) {
- final StateModelDefinition curStateModelDef = admin.getStateModelDef(helixClusterName, segmentStateModelName);
- List<String> states = curStateModelDef.getStatesPriorityList();
- if (states.contains(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE)) {
- LOGGER.info("State model {} already updated to contain CONSUMING state", segmentStateModelName);
+ StateModelDefinition stateModelDefinition = helixAdmin.getStateModelDef(helixClusterName, segmentStateModelName);
+ if (stateModelDefinition == null || isUpdateStateModel) {
+ if (stateModelDefinition == null) {
+ LOGGER.info("Adding state model: {} with CONSUMING state", segmentStateModelName);
} else {
- LOGGER.info("Updating {} to add states for low level consumers", segmentStateModelName);
- StateModelDefinition newStateModelDef =
- PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition();
- ZkClient zkClient = new ZkClient(zkPath);
- zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS);
- zkClient.setZkSerializer(new ZNRecordSerializer());
- HelixDataAccessor accessor = new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<>(zkClient));
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.stateModelDef(segmentStateModelName), newStateModelDef);
- LOGGER.info("Completed updating state model {}", segmentStateModelName);
- zkClient.close();
+ LOGGER.info("Updating state model: {} to contain CONSUMING state", segmentStateModelName);
}
+ helixDataAccessor
+ .createStateModelDef(PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
}
}
- private static void createBrokerResourceIfNeeded(String helixClusterName, HelixAdmin admin,
+ private static void createBrokerResourceIfNeeded(String helixClusterName, HelixAdmin helixAdmin,
boolean enableBatchMessageMode) {
// Add broker resource online offline state model definition if needed
- StateModelDefinition brokerResourceStateModelDefinition = admin.getStateModelDef(helixClusterName,
+ StateModelDefinition brokerResourceStateModelDefinition = helixAdmin.getStateModelDef(helixClusterName,
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL);
if (brokerResourceStateModelDefinition == null) {
LOGGER.info("Adding state model definition named : {} generated using : {}",
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.class.toString());
- admin.addStateModelDef(helixClusterName,
+ helixAdmin.addStateModelDef(helixClusterName,
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
}
// Create broker resource if needed.
- IdealState brokerResourceIdealState =
- admin.getResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
- if (brokerResourceIdealState == null) {
+ if (helixAdmin.getResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE) == null) {
LOGGER.info("Adding empty ideal state for Broker!");
- HelixHelper
- .updateResourceConfigsFor(new HashMap<>(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, helixClusterName,
- admin);
- IdealState idealState = PinotTableIdealStateBuilder
- .buildEmptyIdealStateForBrokerResource(admin, helixClusterName, enableBatchMessageMode);
- admin.setResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState);
+ IdealState emptyIdealStateForBrokerResource = PinotTableIdealStateBuilder
+ .buildEmptyIdealStateForBrokerResource(helixAdmin, helixClusterName, enableBatchMessageMode);
+ helixAdmin.setResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE,
+ emptyIdealStateForBrokerResource);
}
}
- private static void createLeadControllerResourceIfNeeded(String helixClusterName, HelixAdmin admin,
+ private static void createLeadControllerResourceIfNeeded(String helixClusterName, HelixAdmin helixAdmin,
boolean enableBatchMessageMode) {
- StateModelDefinition masterSlaveStateModelDefinition =
- admin.getStateModelDef(helixClusterName, MasterSlaveSMD.name);
- if (masterSlaveStateModelDefinition == null) {
- LOGGER.info("Adding state model definition named : {} generated using : {}", MasterSlaveSMD.name,
- MasterSlaveSMD.class.toString());
- admin.addStateModelDef(helixClusterName, MasterSlaveSMD.name, MasterSlaveSMD.build());
- }
-
- IdealState leadControllerResourceIdealState =
- admin.getResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME);
- if (leadControllerResourceIdealState == null) {
+ if (helixAdmin.getResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME) == null) {
LOGGER.info("Cluster {} doesn't contain {}. Creating one.", helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME);
- HelixHelper.updateResourceConfigsFor(new HashMap<>(), LEAD_CONTROLLER_RESOURCE_NAME, helixClusterName, admin);
- // FULL-AUTO Master-Slave state model with CrushED reBalance strategy.
- admin.addResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME,
- CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE, MasterSlaveSMD.name,
- IdealState.RebalanceMode.FULL_AUTO.toString(), CrushEdRebalanceStrategy.class.getName());
+ // FULL-AUTO Master-Slave state model with CrushED rebalance strategy.
+ IdealState leadControllerResourceIdealState = new IdealState(LEAD_CONTROLLER_RESOURCE_NAME);
+ leadControllerResourceIdealState
+ .setNumPartitions(CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE);
+ leadControllerResourceIdealState.setStateModelDefRef(MasterSlaveSMD.name);
+ leadControllerResourceIdealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+ leadControllerResourceIdealState.setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
+ leadControllerResourceIdealState.setReplicas("0");
// Set instance group tag for lead controller resource.
- IdealState leadControllerIdealState =
- admin.getResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME);
- leadControllerIdealState.setInstanceGroupTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
- leadControllerIdealState.setBatchMessageMode(enableBatchMessageMode);
+ leadControllerResourceIdealState.setInstanceGroupTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+ leadControllerResourceIdealState.setBatchMessageMode(enableBatchMessageMode);
// The below config guarantees if active number of replicas is no less than minimum active replica, there will not be partition movements happened.
// Set min active replicas to 0 and rebalance delay to 5 minutes so that if any master goes offline, Helix controller waits at most 5 minutes and then re-calculate the participant assignment.
// This delay is helpful when periodic tasks are running and we don't want them to be re-run too frequently.
// Plus, if virtual id is applied to controller hosts, swapping hosts would be easy as new hosts can use the same virtual id and it takes least effort to change the configs.
- leadControllerIdealState.setMinActiveReplicas(MIN_ACTIVE_REPLICAS);
- leadControllerIdealState.setRebalanceDelay(REBALANCE_DELAY_MS);
- leadControllerIdealState.setDelayRebalanceEnabled(ENABLE_DELAY_REBALANCE);
- admin.setResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, leadControllerIdealState);
-
+ leadControllerResourceIdealState.setMinActiveReplicas(MIN_ACTIVE_REPLICAS);
+ leadControllerResourceIdealState.setRebalanceDelay(REBALANCE_DELAY_MS);
+ leadControllerResourceIdealState.setDelayRebalanceEnabled(ENABLE_DELAY_REBALANCE);
// Explicitly disable this resource when creating this new resource.
// When all the controllers are running the code with the logic to handle this resource, it can be enabled for backward compatibility.
// In the next major release, we can enable this resource by default, so that all the controller logic can be separated.
- admin.enableResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, false);
+ leadControllerResourceIdealState.enable(false);
- LOGGER.info("Re-balance lead controller resource with replicas: {}",
- CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
- // Set it to 1 so that there's only 1 instance (i.e. master) shown in every partitions.
- admin.rebalance(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME,
- CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
- }
- }
-
- private static void initPropertyStoreIfNeeded(String helixClusterName, String zkPath) {
- String propertyStorePath = PropertyPathBuilder.propertyStore(helixClusterName);
- ZkHelixPropertyStore<ZNRecord> propertyStore =
- new ZkHelixPropertyStore<>(zkPath, new ZNRecordSerializer(), propertyStorePath);
- if (!propertyStore.exists("/CONFIGS", AccessOption.PERSISTENT)) {
- propertyStore.create("/CONFIGS", new ZNRecord(""), AccessOption.PERSISTENT);
- }
- if (!propertyStore.exists("/CONFIGS/CLUSTER", AccessOption.PERSISTENT)) {
- propertyStore.create("/CONFIGS/CLUSTER", new ZNRecord(""), AccessOption.PERSISTENT);
- }
- if (!propertyStore.exists("/CONFIGS/TABLE", AccessOption.PERSISTENT)) {
- propertyStore.create("/CONFIGS/TABLE", new ZNRecord(""), AccessOption.PERSISTENT);
- }
- if (!propertyStore.exists("/CONFIGS/INSTANCE", AccessOption.PERSISTENT)) {
- propertyStore.create("/CONFIGS/INSTANCE", new ZNRecord(""), AccessOption.PERSISTENT);
- }
- if (!propertyStore.exists("/SCHEMAS", AccessOption.PERSISTENT)) {
- propertyStore.create("/SCHEMAS", new ZNRecord(""), AccessOption.PERSISTENT);
- }
- if (!propertyStore.exists("/SEGMENTS", AccessOption.PERSISTENT)) {
- propertyStore.create("/SEGMENTS", new ZNRecord(""), AccessOption.PERSISTENT);
+ helixAdmin.addResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, leadControllerResourceIdealState);
}
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
index db23301..efc1640 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
@@ -105,14 +105,16 @@ public class StorageQuotaChecker {
if (quotaConfig == null || Strings.isNullOrEmpty(quotaConfig.getStorage())) {
// no quota configuration...so ignore for backwards compatibility
- LOGGER.warn("Quota configuration not set for table: {}", tableNameWithType);
- return success("Quota configuration not set for table: " + tableNameWithType);
+ LOGGER.info("Storage quota is not configured for table: {}, skipping the check", tableNameWithType);
+ return success("Storage quota is not configured for table: " + tableNameWithType);
}
long allowedStorageBytes = numReplicas * quotaConfig.storageSizeBytes();
- if (allowedStorageBytes < 0) {
- LOGGER.warn("Storage quota is not configured for table: {}", tableNameWithType);
- return success("Storage quota is not configured for table: " + tableNameWithType);
+ if (allowedStorageBytes <= 0) {
+ LOGGER.warn("Invalid storage quota: {} for table: {}, skipping the check", quotaConfig.getStorage(),
+ tableNameWithType);
+ return success(
+ String.format("Invalid storage quota: %s for table: %s", quotaConfig.getStorage(), tableNameWithType));
}
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_QUOTA, allowedStorageBytes);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java
index 8f72a59..59d5003 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java
@@ -30,7 +30,6 @@ import org.testng.annotations.Test;
public class ControllerPeriodicTaskStarterTest extends ControllerTest {
- private MockControllerStarter _mockControllerStarter;
@BeforeClass
public void setup() {
@@ -51,27 +50,11 @@ public class ControllerPeriodicTaskStarterTest extends ControllerTest {
}
@Override
- protected void startControllerStarter(ControllerConf config) {
- _mockControllerStarter = new MockControllerStarter(config);
- _mockControllerStarter.start();
- _helixResourceManager = _mockControllerStarter.getHelixResourceManager();
- _helixManager = _mockControllerStarter.getHelixControllerManager();
+ protected ControllerStarter getControllerStarter(ControllerConf config) {
+ return new MockControllerStarter(config);
}
- @Override
- protected void stopControllerStarter() {
- Assert.assertNotNull(_mockControllerStarter);
-
- _mockControllerStarter.stop();
- _mockControllerStarter = null;
- }
-
- @Override
- protected ControllerStarter getControllerStarter() {
- return _mockControllerStarter;
- }
-
- private class MockControllerStarter extends TestOnlyControllerStarter {
+ private class MockControllerStarter extends ControllerStarter {
private static final int NUM_PERIODIC_TASKS = 7;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 9893377..9eadb70 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.helix;
+import com.google.common.base.Preconditions;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
@@ -38,7 +39,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.data.DimensionFieldSpec;
import org.apache.pinot.common.data.FieldSpec;
@@ -66,7 +66,6 @@ public abstract class ControllerTest {
protected ControllerRequestURLBuilder _controllerRequestURLBuilder;
protected String _controllerDataDir;
- protected ZkClient _zkClient;
protected ControllerStarter _controllerStarter;
protected PinotHelixResourceManager _helixResourceManager;
protected HelixManager _helixManager;
@@ -95,58 +94,33 @@ public abstract class ControllerTest {
}
}
- public static ControllerConf getDefaultControllerConfiguration() {
+ public ControllerConf getDefaultControllerConfiguration() {
ControllerConf config = new ControllerConf();
config.setControllerHost(LOCAL_HOST);
config.setControllerPort(Integer.toString(DEFAULT_CONTROLLER_PORT));
config.setDataDir(DEFAULT_DATA_DIR);
config.setZkStr(ZkStarter.DEFAULT_ZK_STR);
+ config.setHelixClusterName(getHelixClusterName());
return config;
}
- public class TestOnlyControllerStarter extends ControllerStarter {
-
- TestOnlyControllerStarter(ControllerConf conf) {
- super(conf);
- }
-
- @Override
- public boolean isPinotOnlyModeSupported() {
- return true;
- }
- }
-
protected void startController() {
startController(getDefaultControllerConfiguration());
}
protected void startController(ControllerConf config) {
- startController(config, true);
- }
-
- protected void startController(ControllerConf config, boolean deleteCluster) {
- Assert.assertNotNull(config);
- Assert.assertNull(_controllerStarter);
+ Preconditions.checkState(_controllerStarter == null);
_controllerPort = Integer.valueOf(config.getControllerPort());
_controllerBaseApiUrl = "http://localhost:" + _controllerPort;
_controllerRequestURLBuilder = ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl);
_controllerDataDir = config.getDataDir();
- String helixClusterName = getHelixClusterName();
- config.setHelixClusterName(helixClusterName);
-
- String zkStr = config.getZkStr();
- _zkClient = new ZkClient(zkStr);
- if (_zkClient.exists("/" + helixClusterName) && deleteCluster) {
- _zkClient.deleteRecursive("/" + helixClusterName);
- }
-
startControllerStarter(config);
// HelixResourceManager is null in Helix only mode, while HelixManager is null in Pinot only mode.
- switch (getControllerStarter().getControllerMode()) {
+ switch (_controllerStarter.getControllerMode()) {
case DUAL:
case PINOT_ONLY:
_helixAdmin = _helixResourceManager.getHelixAdmin();
@@ -160,16 +134,19 @@ public abstract class ControllerTest {
}
protected void startControllerStarter(ControllerConf config) {
- _controllerStarter = new TestOnlyControllerStarter(config);
+ _controllerStarter = getControllerStarter(config);
_controllerStarter.start();
_helixResourceManager = _controllerStarter.getHelixResourceManager();
_helixManager = _controllerStarter.getHelixControllerManager();
}
+ protected ControllerStarter getControllerStarter(ControllerConf config) {
+ return new ControllerStarter(config);
+ }
+
protected void stopController() {
stopControllerStarter();
FileUtils.deleteQuietly(new File(_controllerDataDir));
- _zkClient.close();
}
protected void stopControllerStarter() {
@@ -179,10 +156,6 @@ public abstract class ControllerTest {
_controllerStarter = null;
}
- protected ControllerStarter getControllerStarter() {
- return _controllerStarter;
- }
-
protected Schema createDummySchema(String tableName) {
Schema schema = new Schema();
schema.setSchemaName(tableName);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
index d91e612..bcd9bf8 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
@@ -78,12 +78,11 @@ public class PinotControllerModeTest extends ControllerTest {
// Starting a second dual-mode controller. Helix cluster has already been set up.
ControllerConf controllerConfig = getDefaultControllerConfiguration();
- controllerConfig.setHelixClusterName(getHelixClusterName());
controllerConfig.setControllerMode(ControllerConf.ControllerMode.DUAL);
controllerConfig.setControllerPort(
Integer.toString(Integer.parseInt(this.config.getControllerPort()) + controllerPortOffset++));
- ControllerStarter secondDualModeController = new TestOnlyControllerStarter(controllerConfig);
+ ControllerStarter secondDualModeController = getControllerStarter(controllerConfig);
secondDualModeController.start();
TestUtils
.waitForCondition(aVoid -> secondDualModeController.getHelixResourceManager().getHelixZkManager().isConnected(),
@@ -113,7 +112,6 @@ public class PinotControllerModeTest extends ControllerTest {
// Starting a helix controller.
ControllerConf config2 = getDefaultControllerConfiguration();
- config2.setHelixClusterName(getHelixClusterName());
config2.setControllerMode(ControllerConf.ControllerMode.HELIX_ONLY);
config2.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
ControllerStarter helixControllerStarter = new ControllerStarter(config2);
@@ -128,11 +126,10 @@ public class PinotControllerModeTest extends ControllerTest {
// Starting a pinot only controller.
ControllerConf config3 = getDefaultControllerConfiguration();
- config3.setHelixClusterName(getHelixClusterName());
config3.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
config3.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
- ControllerStarter firstPinotOnlyController = new TestOnlyControllerStarter(config3);
+ ControllerStarter firstPinotOnlyController = getControllerStarter(config3);
firstPinotOnlyController.start();
PinotHelixResourceManager firstPinotOnlyPinotHelixResourceManager =
firstPinotOnlyController.getHelixResourceManager();
@@ -143,11 +140,10 @@ public class PinotControllerModeTest extends ControllerTest {
// Start a second Pinot only controller.
ControllerConf config4 = getDefaultControllerConfiguration();
- config4.setHelixClusterName(getHelixClusterName());
config4.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
config4.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
- ControllerStarter secondControllerStarter = new TestOnlyControllerStarter(config4);
+ ControllerStarter secondControllerStarter = getControllerStarter(config4);
secondControllerStarter.start();
// Two controller instances assigned to cluster.
TestUtils
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 914adda..1a3b7e6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -71,7 +71,6 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
protected final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
protected final File _avroDir = new File(_tempDir, "avroDir");
- protected final File _preprocessingDir = new File(_tempDir, "preprocessingDir");
protected final File _segmentDir = new File(_tempDir, "segmentDir");
protected final File _tarDir = new File(_tempDir, "tarDir");
protected List<KafkaServerStartable> _kafkaStarters;
@@ -184,7 +183,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
}
@Nullable
- protected String getServerTenant() {
+ protected String getServerTenant() {
return TagNameUtils.DEFAULT_TENANT_NAME;
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 9ae4eef..bd8ab1a 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -19,8 +19,8 @@
package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
@@ -41,7 +41,6 @@ import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.JsonUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.core.plan.SelectionPlanNode;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -611,13 +610,16 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
pqlQuery = "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
JsonNode response2 = postQuery(pqlQuery);
- pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch + " OR timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
+ pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch
+ + " OR timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
JsonNode response3 = postQuery(pqlQuery);
- pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch + " AND timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
+ pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch
+ + " AND timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
JsonNode response4 = postQuery(pqlQuery);
- pqlQuery = "SELECT count(*) FROM mytable WHERE DIV(timeConvert(DaysSinceEpoch,'DAYS','SECONDS'),1) = " + secondsSinceEpoch;
+ pqlQuery =
+ "SELECT count(*) FROM mytable WHERE DIV(timeConvert(DaysSinceEpoch,'DAYS','SECONDS'),1) = " + secondsSinceEpoch;
JsonNode response5 = postQuery(pqlQuery);
double val1 = response1.get("aggregationResults").get(0).get("value").asDouble();
@@ -653,7 +655,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
double val2 = response2.get("aggregationResults").get(0).get("value").asDouble();
Assert.assertEquals(val1, val2);
}
-
}
@AfterClass
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org