You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/07/12 01:49:50 UTC

[incubator-pinot] 01/01: Misc fix for controller tests

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch misc_fix
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 9c4d9fa43adda13a96577e48dc37f44165253c34
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Thu Jul 11 18:46:09 2019 -0700

    Misc fix for controller tests
---
 .../api/resources/PinotControllerHealthCheck.java  |  4 +--
 .../resources/PinotTableConfigRestletResource.java |  2 ++
 .../api/resources/PinotTableRestletResource.java   | 10 +++----
 .../helix/core/PinotHelixResourceManager.java      | 34 +++++++++-------------
 .../helix/core/SegmentDeletionManager.java         | 17 +++++------
 .../controller/validation/StorageQuotaChecker.java | 12 ++++----
 .../tests/BaseClusterIntegrationTest.java          |  1 -
 .../tests/OfflineClusterIntegrationTest.java       | 13 +++++----
 8 files changed, 45 insertions(+), 48 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
index e6c5182..7370085 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
@@ -32,14 +32,14 @@ import org.apache.pinot.controller.ControllerConf;
 
 
 @Api(tags = Constants.HEALTH_TAG)
-@Path("/pinot-controller/admin")
+@Path("/")
 public class PinotControllerHealthCheck {
 
   @Inject
   ControllerConf controllerConf;
 
   @GET
-  @Path("/")
+  @Path("pinot-controller/admin")
   @ApiOperation(value = "Check controller health")
   @ApiResponses(value = {@ApiResponse(code = 200, message = "Good")})
   @Produces(MediaType.TEXT_PLAIN)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
index 01d5fdf..5717170 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
@@ -110,6 +110,8 @@ public class PinotTableConfigRestletResource {
             .type(MediaType.TEXT_PLAIN_TYPE).build();
       }
 
+      // TODO: Fix the bug - when schema is not configured, after deserialization, CombinedConfig will have a non-null
+      //       schema with null schema name
       if (config.getSchema() != null) {
         _resourceManager.addOrUpdateSchema(config.getSchema());
       }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index daf6f88..fe3491a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -62,7 +62,6 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
 import org.apache.pinot.core.util.ReplicationUtils;
 import org.slf4j.LoggerFactory;
 
@@ -252,14 +251,16 @@ public class PinotTableRestletResource {
   @ApiOperation(value = "Deletes a table", notes = "Deletes a table")
   public SuccessResponse deleteTable(
       @ApiParam(value = "Name of the table to delete", required = true) @PathParam("tableName") String tableName,
-      @ApiParam(value = "realtime|offline", required = false) @QueryParam("type") String tableTypeStr) {
+      @ApiParam(value = "realtime|offline") @QueryParam("type") String tableTypeStr) {
     List<String> tablesDeleted = new LinkedList<>();
     try {
-      if (tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.OFFLINE.name())) {
+      if ((tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.OFFLINE.name()))
+          && !TableNameBuilder.REALTIME.tableHasTypeSuffix(tableName)) {
         _pinotHelixResourceManager.deleteOfflineTable(tableName);
         tablesDeleted.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
       }
-      if (tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.REALTIME.name())) {
+      if ((tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.REALTIME.name()))
+          && !TableNameBuilder.OFFLINE.tableHasTypeSuffix(tableName)) {
         _pinotHelixResourceManager.deleteRealtimeTable(tableName);
         tablesDeleted.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
       }
@@ -373,7 +374,6 @@ public class PinotTableRestletResource {
       throw new PinotHelixResourceManager.InvalidTableConfigException(errorMsg, e);
     }
 
-
     if (verifyReplication) {
       int requestReplication;
       try {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 5eeeddc..0cf2f50 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -117,7 +117,6 @@ import org.slf4j.LoggerFactory;
 
 public class PinotHelixResourceManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(PinotHelixResourceManager.class);
-  private static final long DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS = 120_000L; // 2 minutes
   private static final long DEFAULT_EXTERNAL_VIEW_UPDATE_RETRY_INTERVAL_MILLIS = 500L;
   private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
   public static final String APPEND = "APPEND";
@@ -160,10 +159,10 @@ public class PinotHelixResourceManager {
 
   public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
     this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
-        CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + controllerConf.getControllerHost() + "_"
-            + controllerConf.getControllerPort(), controllerConf.getDataDir(),
-        controllerConf.getExternalViewOnlineToOfflineTimeout(), controllerConf.tenantIsolationEnabled(),
-        controllerConf.getEnableBatchMessageMode(), controllerConf.getHLCTablesAllowed());
+        CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + controllerConf.getControllerHost() + "_" + controllerConf
+            .getControllerPort(), controllerConf.getDataDir(), controllerConf.getExternalViewOnlineToOfflineTimeout(),
+        controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(),
+        controllerConf.getHLCTablesAllowed());
   }
 
   /**
@@ -1087,9 +1086,6 @@ public class PinotHelixResourceManager {
         // lets add table configs
         ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord());
 
-        _propertyStore.create(ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType),
-            new ZNRecord(tableNameWithType), AccessOption.PERSISTENT);
-
         // Update replica group partition assignment to the property store if applicable
         updateReplicaGroupPartitionAssignment(tableConfig);
         break;
@@ -1228,9 +1224,8 @@ public class PinotHelixResourceManager {
           servers = getInstancesWithTag(realtimeTagConfig.getConsumingServerTag());
         }
         int numReplicas = ReplicationUtils.getReplication(tableConfig);
-        ReplicaGroupPartitionAssignment partitionAssignment =
-            partitionAssignmentGenerator.buildReplicaGroupPartitionAssignment(tableNameWithType, tableConfig,
-                numReplicas, servers);
+        ReplicaGroupPartitionAssignment partitionAssignment = partitionAssignmentGenerator
+            .buildReplicaGroupPartitionAssignment(tableNameWithType, tableConfig, numReplicas, servers);
         partitionAssignmentGenerator.writeReplicaGroupPartitionAssignment(partitionAssignment);
       }
     }
@@ -1272,7 +1267,8 @@ public class PinotHelixResourceManager {
     // Check if HLC table is allowed.
     StreamConfig streamConfig = new StreamConfig(indexingConfig.getStreamConfigs());
     if (streamConfig.hasHighLevelConsumerType() && !_allowHLCTables) {
-      throw new InvalidTableConfigException("Creating HLC realtime table is not allowed for Table: " + tableNameWithType);
+      throw new InvalidTableConfigException(
+          "Creating HLC realtime table is not allowed for Table: " + tableNameWithType);
     }
   }
 
@@ -1468,8 +1464,8 @@ public class PinotHelixResourceManager {
     LOGGER.info("Deleting table {}: Finish", offlineTableName);
   }
 
-  public void deleteRealtimeTable(String rawTableName) {
-    String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+  public void deleteRealtimeTable(String tableName) {
+    String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
     LOGGER.info("Deleting table {}: Start", realtimeTableName);
 
     // Remove the table from brokerResource
@@ -2071,14 +2067,12 @@ public class PinotHelixResourceManager {
         : PinotResourceManagerResponse.failure("Timed out. External view not completely updated");
   }
 
-  public boolean hasRealtimeTable(String tableName) {
-    String actualTableName = tableName + "_REALTIME";
-    return getAllTables().contains(actualTableName);
+  public boolean hasOfflineTable(String tableName) {
+    return getAllResources().contains(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
   }
 
-  public boolean hasOfflineTable(String tableName) {
-    String actualTableName = tableName + "_OFFLINE";
-    return getAllTables().contains(actualTableName);
+  public boolean hasRealtimeTable(String tableName) {
+    return getAllResources().contains(TableNameBuilder.REALTIME.tableNameWithType(tableName));
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index 46c84c0..8c6187a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -172,16 +172,17 @@ public class SegmentDeletionManager {
   }
 
   protected void removeSegmentFromStore(String tableNameWithType, String segmentId) {
-    final String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+    // Ignore HLC segments as they are not stored in Pinot FS
+    if (SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
+      return;
+    }
     if (_dataDir != null) {
-      URI fileToMoveURI;
-      PinotFS pinotFS;
-      URI dataDirURI = ControllerConf.getUriFromPath(_dataDir);
-      fileToMoveURI = ControllerConf.constructSegmentLocation(_dataDir, rawTableName, segmentId);
+      String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+      URI fileToMoveURI = ControllerConf.constructSegmentLocation(_dataDir, rawTableName, segmentId);
       URI deletedSegmentDestURI = ControllerConf
           .constructSegmentLocation(StringUtil.join(File.separator, _dataDir, DELETED_SEGMENTS), rawTableName,
               segmentId);
-      pinotFS = PinotFSFactory.create(dataDirURI.getScheme());
+      PinotFS pinotFS = PinotFSFactory.create(fileToMoveURI.getScheme());
 
       try {
         if (pinotFS.exists(fileToMoveURI)) {
@@ -197,9 +198,7 @@ public class SegmentDeletionManager {
                 deletedSegmentDestURI.toString());
           }
         } else {
-          if (!SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
-            LOGGER.warn("Not found local segment file for segment {}" + fileToMoveURI.toString());
-          }
+          LOGGER.warn("Failed to find local segment file for segment {}", fileToMoveURI.toString());
         }
       } catch (IOException e) {
         LOGGER.warn("Could not move segment {} from {} to {}", segmentId, fileToMoveURI.toString(),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
index db23301..efc1640 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
@@ -105,14 +105,16 @@ public class StorageQuotaChecker {
 
     if (quotaConfig == null || Strings.isNullOrEmpty(quotaConfig.getStorage())) {
       // no quota configuration...so ignore for backwards compatibility
-      LOGGER.warn("Quota configuration not set for table: {}", tableNameWithType);
-      return success("Quota configuration not set for table: " + tableNameWithType);
+      LOGGER.info("Storage quota is not configured for table: {}, skipping the check", tableNameWithType);
+      return success("Storage quota is not configured for table: " + tableNameWithType);
     }
 
     long allowedStorageBytes = numReplicas * quotaConfig.storageSizeBytes();
-    if (allowedStorageBytes < 0) {
-      LOGGER.warn("Storage quota is not configured for table: {}", tableNameWithType);
-      return success("Storage quota is not configured for table: " + tableNameWithType);
+    if (allowedStorageBytes <= 0) {
+      LOGGER.warn("Invalid storage quota: {} for table: {}, skipping the check", quotaConfig.getStorage(),
+          tableNameWithType);
+      return success(
+          String.format("Invalid storage quota: %s for table: %s", quotaConfig.getStorage(), tableNameWithType));
     }
     _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_QUOTA, allowedStorageBytes);
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 2eb17f5..67fabc6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -71,7 +71,6 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
 
   protected final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
   protected final File _avroDir = new File(_tempDir, "avroDir");
-  protected final File _preprocessingDir = new File(_tempDir, "preprocessingDir");
   protected final File _segmentDir = new File(_tempDir, "segmentDir");
   protected final File _tarDir = new File(_tempDir, "tarDir");
   protected List<KafkaServerStartable> _kafkaStarters;
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 71598b9..75358b1 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -19,8 +19,8 @@
 package org.apache.pinot.integration.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
@@ -41,7 +41,6 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.core.plan.SelectionPlanNode;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -611,13 +610,16 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     pqlQuery = "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
     JsonNode response2 = postQuery(pqlQuery);
 
-    pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch + " OR timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
+    pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch
+        + " OR timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
     JsonNode response3 = postQuery(pqlQuery);
 
-    pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch + " AND timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
+    pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch
+        + " AND timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
     JsonNode response4 = postQuery(pqlQuery);
 
-    pqlQuery = "SELECT count(*) FROM mytable WHERE DIV(timeConvert(DaysSinceEpoch,'DAYS','SECONDS'),1) = " + secondsSinceEpoch;
+    pqlQuery =
+        "SELECT count(*) FROM mytable WHERE DIV(timeConvert(DaysSinceEpoch,'DAYS','SECONDS'),1) = " + secondsSinceEpoch;
     JsonNode response5 = postQuery(pqlQuery);
 
     double val1 = response1.get("aggregationResults").get(0).get("value").asDouble();
@@ -653,7 +655,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
       double val2 = response2.get("aggregationResults").get(0).get("value").asDouble();
       Assert.assertEquals(val1, val2);
     }
-
   }
 
   @AfterClass


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org