You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/07/13 03:17:03 UTC
[incubator-pinot] branch master updated: Misc fix for controller
tests (#4431)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 626c43a Misc fix for controller tests (#4431)
626c43a is described below
commit 626c43a37483af8b32e485ad241962935bd9232f
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Jul 12 20:16:53 2019 -0700
Misc fix for controller tests (#4431)
---
.../api/resources/PinotControllerHealthCheck.java | 4 +--
.../resources/PinotTableConfigRestletResource.java | 2 ++
.../api/resources/PinotTableRestletResource.java | 10 +++----
.../helix/core/PinotHelixResourceManager.java | 34 +++++++++-------------
.../helix/core/SegmentDeletionManager.java | 10 ++++---
.../controller/validation/StorageQuotaChecker.java | 32 ++++++++++----------
.../tests/BaseClusterIntegrationTest.java | 1 -
.../tests/OfflineClusterIntegrationTest.java | 13 +++++----
8 files changed, 52 insertions(+), 54 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
index e6c5182..7370085 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
@@ -32,14 +32,14 @@ import org.apache.pinot.controller.ControllerConf;
@Api(tags = Constants.HEALTH_TAG)
-@Path("/pinot-controller/admin")
+@Path("/")
public class PinotControllerHealthCheck {
@Inject
ControllerConf controllerConf;
@GET
- @Path("/")
+ @Path("pinot-controller/admin")
@ApiOperation(value = "Check controller health")
@ApiResponses(value = {@ApiResponse(code = 200, message = "Good")})
@Produces(MediaType.TEXT_PLAIN)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
index 01d5fdf..5717170 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
@@ -110,6 +110,8 @@ public class PinotTableConfigRestletResource {
.type(MediaType.TEXT_PLAIN_TYPE).build();
}
+ // TODO: Fix the bug - when schema is not configured, after deserialization, CombinedConfig will have a non-null
+ // schema with null schema name
if (config.getSchema() != null) {
_resourceManager.addOrUpdateSchema(config.getSchema());
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index daf6f88..fe3491a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -62,7 +62,6 @@ import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.apache.pinot.core.util.ReplicationUtils;
import org.slf4j.LoggerFactory;
@@ -252,14 +251,16 @@ public class PinotTableRestletResource {
@ApiOperation(value = "Deletes a table", notes = "Deletes a table")
public SuccessResponse deleteTable(
@ApiParam(value = "Name of the table to delete", required = true) @PathParam("tableName") String tableName,
- @ApiParam(value = "realtime|offline", required = false) @QueryParam("type") String tableTypeStr) {
+ @ApiParam(value = "realtime|offline") @QueryParam("type") String tableTypeStr) {
List<String> tablesDeleted = new LinkedList<>();
try {
- if (tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.OFFLINE.name())) {
+ if ((tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.OFFLINE.name()))
+ && !TableNameBuilder.REALTIME.tableHasTypeSuffix(tableName)) {
_pinotHelixResourceManager.deleteOfflineTable(tableName);
tablesDeleted.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
}
- if (tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.REALTIME.name())) {
+ if ((tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.REALTIME.name()))
+ && !TableNameBuilder.OFFLINE.tableHasTypeSuffix(tableName)) {
_pinotHelixResourceManager.deleteRealtimeTable(tableName);
tablesDeleted.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
}
@@ -373,7 +374,6 @@ public class PinotTableRestletResource {
throw new PinotHelixResourceManager.InvalidTableConfigException(errorMsg, e);
}
-
if (verifyReplication) {
int requestReplication;
try {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 5eeeddc..0cf2f50 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -117,7 +117,6 @@ import org.slf4j.LoggerFactory;
public class PinotHelixResourceManager {
private static final Logger LOGGER = LoggerFactory.getLogger(PinotHelixResourceManager.class);
- private static final long DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS = 120_000L; // 2 minutes
private static final long DEFAULT_EXTERNAL_VIEW_UPDATE_RETRY_INTERVAL_MILLIS = 500L;
private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
public static final String APPEND = "APPEND";
@@ -160,10 +159,10 @@ public class PinotHelixResourceManager {
public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
- CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + controllerConf.getControllerHost() + "_"
- + controllerConf.getControllerPort(), controllerConf.getDataDir(),
- controllerConf.getExternalViewOnlineToOfflineTimeout(), controllerConf.tenantIsolationEnabled(),
- controllerConf.getEnableBatchMessageMode(), controllerConf.getHLCTablesAllowed());
+ CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + controllerConf.getControllerHost() + "_" + controllerConf
+ .getControllerPort(), controllerConf.getDataDir(), controllerConf.getExternalViewOnlineToOfflineTimeout(),
+ controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(),
+ controllerConf.getHLCTablesAllowed());
}
/**
@@ -1087,9 +1086,6 @@ public class PinotHelixResourceManager {
// lets add table configs
ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord());
- _propertyStore.create(ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType),
- new ZNRecord(tableNameWithType), AccessOption.PERSISTENT);
-
// Update replica group partition assignment to the property store if applicable
updateReplicaGroupPartitionAssignment(tableConfig);
break;
@@ -1228,9 +1224,8 @@ public class PinotHelixResourceManager {
servers = getInstancesWithTag(realtimeTagConfig.getConsumingServerTag());
}
int numReplicas = ReplicationUtils.getReplication(tableConfig);
- ReplicaGroupPartitionAssignment partitionAssignment =
- partitionAssignmentGenerator.buildReplicaGroupPartitionAssignment(tableNameWithType, tableConfig,
- numReplicas, servers);
+ ReplicaGroupPartitionAssignment partitionAssignment = partitionAssignmentGenerator
+ .buildReplicaGroupPartitionAssignment(tableNameWithType, tableConfig, numReplicas, servers);
partitionAssignmentGenerator.writeReplicaGroupPartitionAssignment(partitionAssignment);
}
}
@@ -1272,7 +1267,8 @@ public class PinotHelixResourceManager {
// Check if HLC table is allowed.
StreamConfig streamConfig = new StreamConfig(indexingConfig.getStreamConfigs());
if (streamConfig.hasHighLevelConsumerType() && !_allowHLCTables) {
- throw new InvalidTableConfigException("Creating HLC realtime table is not allowed for Table: " + tableNameWithType);
+ throw new InvalidTableConfigException(
+ "Creating HLC realtime table is not allowed for Table: " + tableNameWithType);
}
}
@@ -1468,8 +1464,8 @@ public class PinotHelixResourceManager {
LOGGER.info("Deleting table {}: Finish", offlineTableName);
}
- public void deleteRealtimeTable(String rawTableName) {
- String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+ public void deleteRealtimeTable(String tableName) {
+ String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
LOGGER.info("Deleting table {}: Start", realtimeTableName);
// Remove the table from brokerResource
@@ -2071,14 +2067,12 @@ public class PinotHelixResourceManager {
: PinotResourceManagerResponse.failure("Timed out. External view not completely updated");
}
- public boolean hasRealtimeTable(String tableName) {
- String actualTableName = tableName + "_REALTIME";
- return getAllTables().contains(actualTableName);
+ public boolean hasOfflineTable(String tableName) {
+ return getAllResources().contains(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
}
- public boolean hasOfflineTable(String tableName) {
- String actualTableName = tableName + "_OFFLINE";
- return getAllTables().contains(actualTableName);
+ public boolean hasRealtimeTable(String tableName) {
+ return getAllResources().contains(TableNameBuilder.REALTIME.tableNameWithType(tableName));
}
/**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index b2ab8b3..53bcb61 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -170,8 +170,12 @@ public class SegmentDeletionManager {
}
protected void removeSegmentFromStore(String tableNameWithType, String segmentId) {
- final String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+ // Ignore HLC segments as they are not stored in Pinot FS
+ if (SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
+ return;
+ }
if (_dataDir != null) {
+ String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
URI fileToMoveURI = URIUtils.getUri(_dataDir, rawTableName, URIUtils.encode(segmentId));
URI deletedSegmentDestURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, rawTableName, URIUtils.encode(segmentId));
PinotFS pinotFS = PinotFSFactory.create(fileToMoveURI.getScheme());
@@ -190,9 +194,7 @@ public class SegmentDeletionManager {
deletedSegmentDestURI.toString());
}
} else {
- if (!SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
- LOGGER.warn("Not found local segment file for segment {}" + fileToMoveURI.toString());
- }
+ LOGGER.warn("Failed to find local segment file for segment {}", fileToMoveURI.toString());
}
} catch (IOException e) {
LOGGER.warn("Could not move segment {} from {} to {}", segmentId, fileToMoveURI.toString(),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
index db23301..e895242 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
@@ -21,8 +21,6 @@ package org.apache.pinot.controller.validation;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.io.File;
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.config.QuotaConfig;
import org.apache.pinot.common.config.TableConfig;
@@ -83,15 +81,12 @@ public class StorageQuotaChecker {
* @param segmentFile untarred segment. This should not be null.
* segmentFile must exist on disk and must be a directory
* @param segmentName name of the segment being added
- * @param timeoutMsec timeout in milliseconds for reading table sizes from server
+ * @param timeoutMs timeout in milliseconds for reading table sizes from server
*
*/
- public QuotaCheckerResponse isSegmentStorageWithinQuota(@Nonnull File segmentFile, @Nonnull String segmentName,
- @Nonnegative int timeoutMsec)
+ public QuotaCheckerResponse isSegmentStorageWithinQuota(File segmentFile, String segmentName, int timeoutMs)
throws InvalidConfigException {
- Preconditions.checkNotNull(segmentFile);
- Preconditions.checkNotNull(segmentName);
- Preconditions.checkArgument(timeoutMsec > 0, "Timeout value must be > 0, input: %s", timeoutMsec);
+ Preconditions.checkArgument(timeoutMs > 0, "Timeout value must be > 0, input: %s", timeoutMs);
Preconditions.checkArgument(segmentFile.exists(), "Segment file: %s does not exist", segmentFile);
Preconditions.checkArgument(segmentFile.isDirectory(), "Segment file: %s is not a directory", segmentFile);
@@ -105,23 +100,28 @@ public class StorageQuotaChecker {
if (quotaConfig == null || Strings.isNullOrEmpty(quotaConfig.getStorage())) {
// no quota configuration...so ignore for backwards compatibility
- LOGGER.warn("Quota configuration not set for table: {}", tableNameWithType);
- return success("Quota configuration not set for table: " + tableNameWithType);
+ String message =
+ String.format("Storage quota is not configured for table: %s, skipping the check", tableNameWithType);
+ LOGGER.info(message);
+ return success(message);
}
long allowedStorageBytes = numReplicas * quotaConfig.storageSizeBytes();
- if (allowedStorageBytes < 0) {
- LOGGER.warn("Storage quota is not configured for table: {}", tableNameWithType);
- return success("Storage quota is not configured for table: " + tableNameWithType);
+ if (allowedStorageBytes <= 0) {
+ String message = String
+ .format("Invalid storage quota: %s for table: %s, skipping the check", quotaConfig.getStorage(),
+ tableNameWithType);
+ LOGGER.warn(message);
+ return success(message);
}
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_QUOTA, allowedStorageBytes);
long incomingSegmentSizeBytes = FileUtils.sizeOfDirectory(segmentFile);
// read table size
- TableSizeReader.TableSubTypeSizeDetails tableSubtypeSize = null;
+ TableSizeReader.TableSubTypeSizeDetails tableSubtypeSize;
try {
- tableSubtypeSize = _tableSizeReader.getTableSubtypeSize(tableNameWithType, timeoutMsec);
+ tableSubtypeSize = _tableSizeReader.getTableSubtypeSize(tableNameWithType, timeoutMs);
} catch (InvalidConfigException e) {
LOGGER.error("Failed to get table size for table {}", tableNameWithType, e);
throw e;
@@ -157,7 +157,7 @@ public class StorageQuotaChecker {
tableNameWithType, tableSubtypeSize.estimatedSizeInBytes, tableSubtypeSize.reportedSizeInBytes);
// Only emit the real percentage of storage quota usage by lead controller, otherwise emit 0L.
- if (isLeader() && allowedStorageBytes != 0L) {
+ if (isLeader()) {
long existingStorageQuotaUtilization = tableSubtypeSize.estimatedSizeInBytes * 100 / allowedStorageBytes;
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION,
existingStorageQuotaUtilization);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 2eb17f5..67fabc6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -71,7 +71,6 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
protected final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
protected final File _avroDir = new File(_tempDir, "avroDir");
- protected final File _preprocessingDir = new File(_tempDir, "preprocessingDir");
protected final File _segmentDir = new File(_tempDir, "segmentDir");
protected final File _tarDir = new File(_tempDir, "tarDir");
protected List<KafkaServerStartable> _kafkaStarters;
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 71598b9..75358b1 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -19,8 +19,8 @@
package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
@@ -41,7 +41,6 @@ import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.JsonUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.core.plan.SelectionPlanNode;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -611,13 +610,16 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
pqlQuery = "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
JsonNode response2 = postQuery(pqlQuery);
- pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch + " OR timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
+ pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch
+ + " OR timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
JsonNode response3 = postQuery(pqlQuery);
- pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch + " AND timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
+ pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch
+ + " AND timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
JsonNode response4 = postQuery(pqlQuery);
- pqlQuery = "SELECT count(*) FROM mytable WHERE DIV(timeConvert(DaysSinceEpoch,'DAYS','SECONDS'),1) = " + secondsSinceEpoch;
+ pqlQuery =
+ "SELECT count(*) FROM mytable WHERE DIV(timeConvert(DaysSinceEpoch,'DAYS','SECONDS'),1) = " + secondsSinceEpoch;
JsonNode response5 = postQuery(pqlQuery);
double val1 = response1.get("aggregationResults").get(0).get("value").asDouble();
@@ -653,7 +655,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
double val2 = response2.get("aggregationResults").get(0).get("value").asDouble();
Assert.assertEquals(val1, val2);
}
-
}
@AfterClass
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org