You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/07/13 03:17:03 UTC

[incubator-pinot] branch master updated: Misc fix for controller tests (#4431)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 626c43a  Misc fix for controller tests (#4431)
626c43a is described below

commit 626c43a37483af8b32e485ad241962935bd9232f
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Jul 12 20:16:53 2019 -0700

    Misc fix for controller tests (#4431)
---
 .../api/resources/PinotControllerHealthCheck.java  |  4 +--
 .../resources/PinotTableConfigRestletResource.java |  2 ++
 .../api/resources/PinotTableRestletResource.java   | 10 +++----
 .../helix/core/PinotHelixResourceManager.java      | 34 +++++++++-------------
 .../helix/core/SegmentDeletionManager.java         | 10 ++++---
 .../controller/validation/StorageQuotaChecker.java | 32 ++++++++++----------
 .../tests/BaseClusterIntegrationTest.java          |  1 -
 .../tests/OfflineClusterIntegrationTest.java       | 13 +++++----
 8 files changed, 52 insertions(+), 54 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
index e6c5182..7370085 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
@@ -32,14 +32,14 @@ import org.apache.pinot.controller.ControllerConf;
 
 
 @Api(tags = Constants.HEALTH_TAG)
-@Path("/pinot-controller/admin")
+@Path("/")
 public class PinotControllerHealthCheck {
 
   @Inject
   ControllerConf controllerConf;
 
   @GET
-  @Path("/")
+  @Path("pinot-controller/admin")
   @ApiOperation(value = "Check controller health")
   @ApiResponses(value = {@ApiResponse(code = 200, message = "Good")})
   @Produces(MediaType.TEXT_PLAIN)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
index 01d5fdf..5717170 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
@@ -110,6 +110,8 @@ public class PinotTableConfigRestletResource {
             .type(MediaType.TEXT_PLAIN_TYPE).build();
       }
 
+      // TODO: Fix the bug - when schema is not configured, after deserialization, CombinedConfig will have a non-null
+      //       schema with null schema name
       if (config.getSchema() != null) {
         _resourceManager.addOrUpdateSchema(config.getSchema());
       }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index daf6f88..fe3491a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -62,7 +62,6 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
 import org.apache.pinot.core.util.ReplicationUtils;
 import org.slf4j.LoggerFactory;
 
@@ -252,14 +251,16 @@ public class PinotTableRestletResource {
   @ApiOperation(value = "Deletes a table", notes = "Deletes a table")
   public SuccessResponse deleteTable(
       @ApiParam(value = "Name of the table to delete", required = true) @PathParam("tableName") String tableName,
-      @ApiParam(value = "realtime|offline", required = false) @QueryParam("type") String tableTypeStr) {
+      @ApiParam(value = "realtime|offline") @QueryParam("type") String tableTypeStr) {
     List<String> tablesDeleted = new LinkedList<>();
     try {
-      if (tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.OFFLINE.name())) {
+      if ((tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.OFFLINE.name()))
+          && !TableNameBuilder.REALTIME.tableHasTypeSuffix(tableName)) {
         _pinotHelixResourceManager.deleteOfflineTable(tableName);
         tablesDeleted.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
       }
-      if (tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.REALTIME.name())) {
+      if ((tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.REALTIME.name()))
+          && !TableNameBuilder.OFFLINE.tableHasTypeSuffix(tableName)) {
         _pinotHelixResourceManager.deleteRealtimeTable(tableName);
         tablesDeleted.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
       }
@@ -373,7 +374,6 @@ public class PinotTableRestletResource {
       throw new PinotHelixResourceManager.InvalidTableConfigException(errorMsg, e);
     }
 
-
     if (verifyReplication) {
       int requestReplication;
       try {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 5eeeddc..0cf2f50 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -117,7 +117,6 @@ import org.slf4j.LoggerFactory;
 
 public class PinotHelixResourceManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(PinotHelixResourceManager.class);
-  private static final long DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS = 120_000L; // 2 minutes
   private static final long DEFAULT_EXTERNAL_VIEW_UPDATE_RETRY_INTERVAL_MILLIS = 500L;
   private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
   public static final String APPEND = "APPEND";
@@ -160,10 +159,10 @@ public class PinotHelixResourceManager {
 
   public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
     this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
-        CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + controllerConf.getControllerHost() + "_"
-            + controllerConf.getControllerPort(), controllerConf.getDataDir(),
-        controllerConf.getExternalViewOnlineToOfflineTimeout(), controllerConf.tenantIsolationEnabled(),
-        controllerConf.getEnableBatchMessageMode(), controllerConf.getHLCTablesAllowed());
+        CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + controllerConf.getControllerHost() + "_" + controllerConf
+            .getControllerPort(), controllerConf.getDataDir(), controllerConf.getExternalViewOnlineToOfflineTimeout(),
+        controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(),
+        controllerConf.getHLCTablesAllowed());
   }
 
   /**
@@ -1087,9 +1086,6 @@ public class PinotHelixResourceManager {
         // lets add table configs
         ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord());
 
-        _propertyStore.create(ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType),
-            new ZNRecord(tableNameWithType), AccessOption.PERSISTENT);
-
         // Update replica group partition assignment to the property store if applicable
         updateReplicaGroupPartitionAssignment(tableConfig);
         break;
@@ -1228,9 +1224,8 @@ public class PinotHelixResourceManager {
           servers = getInstancesWithTag(realtimeTagConfig.getConsumingServerTag());
         }
         int numReplicas = ReplicationUtils.getReplication(tableConfig);
-        ReplicaGroupPartitionAssignment partitionAssignment =
-            partitionAssignmentGenerator.buildReplicaGroupPartitionAssignment(tableNameWithType, tableConfig,
-                numReplicas, servers);
+        ReplicaGroupPartitionAssignment partitionAssignment = partitionAssignmentGenerator
+            .buildReplicaGroupPartitionAssignment(tableNameWithType, tableConfig, numReplicas, servers);
         partitionAssignmentGenerator.writeReplicaGroupPartitionAssignment(partitionAssignment);
       }
     }
@@ -1272,7 +1267,8 @@ public class PinotHelixResourceManager {
     // Check if HLC table is allowed.
     StreamConfig streamConfig = new StreamConfig(indexingConfig.getStreamConfigs());
     if (streamConfig.hasHighLevelConsumerType() && !_allowHLCTables) {
-      throw new InvalidTableConfigException("Creating HLC realtime table is not allowed for Table: " + tableNameWithType);
+      throw new InvalidTableConfigException(
+          "Creating HLC realtime table is not allowed for Table: " + tableNameWithType);
     }
   }
 
@@ -1468,8 +1464,8 @@ public class PinotHelixResourceManager {
     LOGGER.info("Deleting table {}: Finish", offlineTableName);
   }
 
-  public void deleteRealtimeTable(String rawTableName) {
-    String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+  public void deleteRealtimeTable(String tableName) {
+    String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
     LOGGER.info("Deleting table {}: Start", realtimeTableName);
 
     // Remove the table from brokerResource
@@ -2071,14 +2067,12 @@ public class PinotHelixResourceManager {
         : PinotResourceManagerResponse.failure("Timed out. External view not completely updated");
   }
 
-  public boolean hasRealtimeTable(String tableName) {
-    String actualTableName = tableName + "_REALTIME";
-    return getAllTables().contains(actualTableName);
+  public boolean hasOfflineTable(String tableName) {
+    return getAllResources().contains(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
   }
 
-  public boolean hasOfflineTable(String tableName) {
-    String actualTableName = tableName + "_OFFLINE";
-    return getAllTables().contains(actualTableName);
+  public boolean hasRealtimeTable(String tableName) {
+    return getAllResources().contains(TableNameBuilder.REALTIME.tableNameWithType(tableName));
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index b2ab8b3..53bcb61 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -170,8 +170,12 @@ public class SegmentDeletionManager {
   }
 
   protected void removeSegmentFromStore(String tableNameWithType, String segmentId) {
-    final String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+    // Ignore HLC segments as they are not stored in Pinot FS
+    if (SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
+      return;
+    }
     if (_dataDir != null) {
+      String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
       URI fileToMoveURI = URIUtils.getUri(_dataDir, rawTableName, URIUtils.encode(segmentId));
       URI deletedSegmentDestURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, rawTableName, URIUtils.encode(segmentId));
       PinotFS pinotFS = PinotFSFactory.create(fileToMoveURI.getScheme());
@@ -190,9 +194,7 @@ public class SegmentDeletionManager {
                 deletedSegmentDestURI.toString());
           }
         } else {
-          if (!SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
-            LOGGER.warn("Not found local segment file for segment {}" + fileToMoveURI.toString());
-          }
+          LOGGER.warn("Failed to find local segment file for segment {}", fileToMoveURI.toString());
         }
       } catch (IOException e) {
         LOGGER.warn("Could not move segment {} from {} to {}", segmentId, fileToMoveURI.toString(),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
index db23301..e895242 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
@@ -21,8 +21,6 @@ package org.apache.pinot.controller.validation;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import java.io.File;
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.config.QuotaConfig;
 import org.apache.pinot.common.config.TableConfig;
@@ -83,15 +81,12 @@ public class StorageQuotaChecker {
    * @param segmentFile untarred segment. This should not be null.
    *                    segmentFile must exist on disk and must be a directory
    * @param segmentName name of the segment being added
-   * @param timeoutMsec timeout in milliseconds for reading table sizes from server
+   * @param timeoutMs timeout in milliseconds for reading table sizes from server
    *
    */
-  public QuotaCheckerResponse isSegmentStorageWithinQuota(@Nonnull File segmentFile, @Nonnull String segmentName,
-      @Nonnegative int timeoutMsec)
+  public QuotaCheckerResponse isSegmentStorageWithinQuota(File segmentFile, String segmentName, int timeoutMs)
       throws InvalidConfigException {
-    Preconditions.checkNotNull(segmentFile);
-    Preconditions.checkNotNull(segmentName);
-    Preconditions.checkArgument(timeoutMsec > 0, "Timeout value must be > 0, input: %s", timeoutMsec);
+    Preconditions.checkArgument(timeoutMs > 0, "Timeout value must be > 0, input: %s", timeoutMs);
     Preconditions.checkArgument(segmentFile.exists(), "Segment file: %s does not exist", segmentFile);
     Preconditions.checkArgument(segmentFile.isDirectory(), "Segment file: %s is not a directory", segmentFile);
 
@@ -105,23 +100,28 @@ public class StorageQuotaChecker {
 
     if (quotaConfig == null || Strings.isNullOrEmpty(quotaConfig.getStorage())) {
       // no quota configuration...so ignore for backwards compatibility
-      LOGGER.warn("Quota configuration not set for table: {}", tableNameWithType);
-      return success("Quota configuration not set for table: " + tableNameWithType);
+      String message =
+          String.format("Storage quota is not configured for table: %s, skipping the check", tableNameWithType);
+      LOGGER.info(message);
+      return success(message);
     }
 
     long allowedStorageBytes = numReplicas * quotaConfig.storageSizeBytes();
-    if (allowedStorageBytes < 0) {
-      LOGGER.warn("Storage quota is not configured for table: {}", tableNameWithType);
-      return success("Storage quota is not configured for table: " + tableNameWithType);
+    if (allowedStorageBytes <= 0) {
+      String message = String
+          .format("Invalid storage quota: %s for table: %s, skipping the check", quotaConfig.getStorage(),
+              tableNameWithType);
+      LOGGER.warn(message);
+      return success(message);
     }
     _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_QUOTA, allowedStorageBytes);
 
     long incomingSegmentSizeBytes = FileUtils.sizeOfDirectory(segmentFile);
 
     // read table size
-    TableSizeReader.TableSubTypeSizeDetails tableSubtypeSize = null;
+    TableSizeReader.TableSubTypeSizeDetails tableSubtypeSize;
     try {
-      tableSubtypeSize = _tableSizeReader.getTableSubtypeSize(tableNameWithType, timeoutMsec);
+      tableSubtypeSize = _tableSizeReader.getTableSubtypeSize(tableNameWithType, timeoutMs);
     } catch (InvalidConfigException e) {
       LOGGER.error("Failed to get table size for table {}", tableNameWithType, e);
       throw e;
@@ -157,7 +157,7 @@ public class StorageQuotaChecker {
         tableNameWithType, tableSubtypeSize.estimatedSizeInBytes, tableSubtypeSize.reportedSizeInBytes);
 
     // Only emit the real percentage of storage quota usage by lead controller, otherwise emit 0L.
-    if (isLeader() && allowedStorageBytes != 0L) {
+    if (isLeader()) {
       long existingStorageQuotaUtilization = tableSubtypeSize.estimatedSizeInBytes * 100 / allowedStorageBytes;
       _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION,
           existingStorageQuotaUtilization);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 2eb17f5..67fabc6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -71,7 +71,6 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
 
   protected final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
   protected final File _avroDir = new File(_tempDir, "avroDir");
-  protected final File _preprocessingDir = new File(_tempDir, "preprocessingDir");
   protected final File _segmentDir = new File(_tempDir, "segmentDir");
   protected final File _tarDir = new File(_tempDir, "tarDir");
   protected List<KafkaServerStartable> _kafkaStarters;
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 71598b9..75358b1 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -19,8 +19,8 @@
 package org.apache.pinot.integration.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
@@ -41,7 +41,6 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.core.plan.SelectionPlanNode;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -611,13 +610,16 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     pqlQuery = "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
     JsonNode response2 = postQuery(pqlQuery);
 
-    pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch + " OR timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
+    pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch
+        + " OR timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
     JsonNode response3 = postQuery(pqlQuery);
 
-    pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch + " AND timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
+    pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch
+        + " AND timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
     JsonNode response4 = postQuery(pqlQuery);
 
-    pqlQuery = "SELECT count(*) FROM mytable WHERE DIV(timeConvert(DaysSinceEpoch,'DAYS','SECONDS'),1) = " + secondsSinceEpoch;
+    pqlQuery =
+        "SELECT count(*) FROM mytable WHERE DIV(timeConvert(DaysSinceEpoch,'DAYS','SECONDS'),1) = " + secondsSinceEpoch;
     JsonNode response5 = postQuery(pqlQuery);
 
     double val1 = response1.get("aggregationResults").get(0).get("value").asDouble();
@@ -653,7 +655,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
       double val2 = response2.get("aggregationResults").get(0).get("value").asDouble();
       Assert.assertEquals(val1, val2);
     }
-
   }
 
   @AfterClass


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org