You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/02/26 19:25:01 UTC

[incubator-pinot] branch master updated: Check assigned instances for uploading new segment (#3856)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9269f5e  Check assigned instances for uploading new segment (#3856)
9269f5e is described below

commit 9269f5e02cfd868bd9448c6c4df9107e5776e60c
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Tue Feb 26 11:24:56 2019 -0800

    Check assigned instances for uploading new segment (#3856)
    
    * Improve segment validator to check assigned instances for uploading new segment
    * Add Precondition on assignedInstances before updating zk
---
 .../PinotSegmentUploadRestletResource.java         | 13 +++--
 .../controller/api/upload/SegmentValidator.java    | 19 +++++++-
 .../api/upload/SegmentValidatorResponse.java       | 55 ++++++++++++++++++++++
 .../pinot/controller/api/upload/ZKOperator.java    | 12 ++---
 .../helix/core/PinotHelixResourceManager.java      | 39 +++++++++------
 .../tests/PinotURIUploadIntegrationTest.java       | 47 ++++++++++++++----
 6 files changed, 151 insertions(+), 34 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
index 1460139..0bbb4b0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
@@ -57,6 +57,7 @@ import javax.ws.rs.core.Response;
 import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -72,6 +73,7 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.access.AccessControl;
 import org.apache.pinot.controller.api.access.AccessControlFactory;
 import org.apache.pinot.controller.api.upload.SegmentValidator;
+import org.apache.pinot.controller.api.upload.SegmentValidatorResponse;
 import org.apache.pinot.controller.api.upload.ZKOperator;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
@@ -315,12 +317,13 @@ public class PinotSegmentUploadRestletResource {
               clientAddress);
 
       // Validate segment
-      new SegmentValidator(_pinotHelixResourceManager, _controllerConf, _executor, _connectionManager,
-          _controllerMetrics).validateSegment(segmentMetadata, tempSegmentDir);
+      SegmentValidatorResponse segmentValidatorResponse =
+          new SegmentValidator(_pinotHelixResourceManager, _controllerConf, _executor, _connectionManager,
+              _controllerMetrics).validateSegment(segmentMetadata, tempSegmentDir);
 
       // Zk operations
       completeZkOperations(enableParallelPushProtection, headers, tempEncryptedFile, provider, segmentMetadata,
-          segmentName, zkDownloadUri, moveSegmentToFinalLocation);
+          segmentName, zkDownloadUri, moveSegmentToFinalLocation, segmentValidatorResponse);
 
       return new SuccessResponse(
           "Successfully uploaded segment: " + segmentMetadata.getName() + " of table: " + segmentMetadata
@@ -380,7 +383,7 @@ public class PinotSegmentUploadRestletResource {
 
   private void completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers, File tempDecryptedFile,
       FileUploadPathProvider provider, SegmentMetadata segmentMetadata, String segmentName, String zkDownloadURI,
-      boolean moveSegmentToFinalLocation)
+      boolean moveSegmentToFinalLocation, SegmentValidatorResponse segmentValidatorResponse)
       throws Exception {
     String finalSegmentPath = StringUtil
         .join("/", provider.getBaseDataDirURI().toString(), segmentMetadata.getTableName(),
@@ -388,7 +391,7 @@ public class PinotSegmentUploadRestletResource {
     URI finalSegmentLocationURI = new URI(finalSegmentPath);
     ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics);
     zkOperator.completeSegmentOperations(segmentMetadata, finalSegmentLocationURI, tempDecryptedFile,
-        enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation);
+        enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, segmentValidatorResponse);
   }
 
   private void decryptFile(String crypterClassHeader, File tempEncryptedFile, File tempDecryptedFile) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
index e4f9d42..a7dc32e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
@@ -20,10 +20,12 @@ package org.apache.pinot.controller.api.upload;
 
 import java.io.File;
 import java.util.Date;
+import java.util.List;
 import java.util.concurrent.Executor;
 import javax.annotation.Nonnull;
 import javax.ws.rs.core.Response;
 import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.helix.ZNRecord;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.exception.InvalidConfigException;
@@ -62,7 +64,7 @@ public class SegmentValidator {
     _controllerMetrics = controllerMetrics;
   }
 
-  public void validateSegment(SegmentMetadata segmentMetadata, File tempSegmentDir) {
+  public SegmentValidatorResponse validateSegment(SegmentMetadata segmentMetadata, File tempSegmentDir) {
     String rawTableName = segmentMetadata.getTableName();
     String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
     String segmentName = segmentMetadata.getName();
@@ -74,6 +76,19 @@ public class SegmentValidator {
           Response.Status.NOT_FOUND);
     }
 
+    // Verifies whether there's server assigned to this segment when uploading a new segment.
+    List<String> assignedInstances = null;
+    ZNRecord segmentMetadataZnRecord =
+        _pinotHelixResourceManager.getSegmentMetadataZnRecord(offlineTableName, segmentName);
+    // Checks whether it's a new segment or an existing one.
+    if (segmentMetadataZnRecord == null) {
+      assignedInstances = _pinotHelixResourceManager.getAssignedInstancesForSegment(segmentMetadata);
+      if (assignedInstances.isEmpty()) {
+        throw new ControllerApplicationException(LOGGER, "No assigned Instances for Segment: " + segmentName
+            + ". Please check whether the table config is misconfigured.", Response.Status.INTERNAL_SERVER_ERROR);
+      }
+    }
+
     StorageQuotaChecker.QuotaCheckerResponse quotaResponse;
     try {
       quotaResponse = checkStorageQuota(tempSegmentDir, segmentMetadata, offlineTableConfig);
@@ -95,6 +110,8 @@ public class SegmentValidator {
           "Invalid segment start/end time for segment: " + segmentName + " of table: " + offlineTableName,
           Response.Status.NOT_ACCEPTABLE);
     }
+
+    return new SegmentValidatorResponse(offlineTableConfig, segmentMetadataZnRecord, assignedInstances, quotaResponse);
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidatorResponse.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidatorResponse.java
new file mode 100644
index 0000000..b153291
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidatorResponse.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.upload;
+
+import java.util.List;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.controller.validation.StorageQuotaChecker;
+
+
+public class SegmentValidatorResponse {
+  private final TableConfig _offlineTableConfig;
+  private final ZNRecord _segmentMetadataZnRecord;
+  private final List<String> _assignedInstances;
+  private final StorageQuotaChecker.QuotaCheckerResponse _quotaCheckerResponse;
+
+  public SegmentValidatorResponse(TableConfig offlineTableConfig, ZNRecord segmentMetadataZnRecord, List<String> assignedInstances, StorageQuotaChecker.QuotaCheckerResponse quotaCheckerResponse) {
+    _offlineTableConfig = offlineTableConfig;
+    _segmentMetadataZnRecord = segmentMetadataZnRecord;
+    _assignedInstances = assignedInstances;
+    _quotaCheckerResponse = quotaCheckerResponse;
+  }
+
+  public TableConfig getOfflineTableConfig() {
+    return _offlineTableConfig;
+  }
+
+  public ZNRecord getSegmentMetadataZnRecord() {
+    return _segmentMetadataZnRecord;
+  }
+
+  public List<String> getAssignedInstances() {
+    return _assignedInstances;
+  }
+
+  public StorageQuotaChecker.QuotaCheckerResponse getQuotaCheckerResponse() {
+    return _quotaCheckerResponse;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 5ef32f1..1bf5e8e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.api.upload;
 
 import java.io.File;
 import java.net.URI;
+import java.util.List;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
 import org.apache.helix.ZNRecord;
@@ -58,20 +59,19 @@ public class ZKOperator {
 
   public void completeSegmentOperations(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
       File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI,
-      boolean moveSegmentToFinalLocation)
+      boolean moveSegmentToFinalLocation, SegmentValidatorResponse segmentValidatorResponse)
       throws Exception {
     String rawTableName = segmentMetadata.getTableName();
     String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
     String segmentName = segmentMetadata.getName();
 
-    ZNRecord znRecord = _pinotHelixResourceManager.getSegmentMetadataZnRecord(offlineTableName, segmentName);
-
     // Brand new segment, not refresh, directly add the segment
+    ZNRecord znRecord = segmentValidatorResponse.getSegmentMetadataZnRecord();
     if (znRecord == null) {
       LOGGER.info("Adding new segment {} from table {}", segmentName, rawTableName);
       String crypter = headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER);
       processNewSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, zkDownloadURI, crypter,
-          rawTableName, segmentName, moveSegmentToFinalLocation);
+          rawTableName, segmentName, moveSegmentToFinalLocation, segmentValidatorResponse.getAssignedInstances());
       return;
     }
 
@@ -203,7 +203,7 @@ public class ZKOperator {
 
   private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
       File currentSegmentLocation, String zkDownloadURI, String crypter, String rawTableName, String segmentName,
-      boolean moveSegmentToFinalLocation) {
+      boolean moveSegmentToFinalLocation, List<String> assignedInstances) {
     // For v1 segment uploads, we will not move the segment
     if (moveSegmentToFinalLocation) {
       try {
@@ -219,7 +219,7 @@ public class ZKOperator {
       LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, rawTableName,
           zkDownloadURI);
     }
-    _pinotHelixResourceManager.addNewSegment(segmentMetadata, zkDownloadURI, crypter);
+    _pinotHelixResourceManager.addNewSegment(segmentMetadata, zkDownloadURI, crypter, assignedInstances);
   }
 
   private void moveSegmentToPermanentDirectory(File currentSegmentLocation, URI finalSegmentLocationURI)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 4e59582..ef119aa 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1461,10 +1461,13 @@ public class PinotHelixResourceManager {
   }
 
   public void addNewSegment(@Nonnull SegmentMetadata segmentMetadata, @Nonnull String downloadUrl) {
-    addNewSegment(segmentMetadata, downloadUrl, null);
+    List<String> assignedInstances = getAssignedInstancesForSegment(segmentMetadata);
+    addNewSegment(segmentMetadata, downloadUrl, null, assignedInstances);
   }
 
-  public void addNewSegment(@Nonnull SegmentMetadata segmentMetadata, @Nonnull String downloadUrl, String crypter) {
+  public void addNewSegment(@Nonnull SegmentMetadata segmentMetadata, @Nonnull String downloadUrl, String crypter,
+      @Nonnull List<String> assignedInstances) {
+    Preconditions.checkNotNull(assignedInstances, "Assigned Instances should not be null!");
     String segmentName = segmentMetadata.getName();
     String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(segmentMetadata.getTableName());
 
@@ -1481,7 +1484,7 @@ public class PinotHelixResourceManager {
     }
     LOGGER.info("Added segment: {} of table: {} to property store", segmentName, offlineTableName);
 
-    addNewOfflineSegment(segmentMetadata);
+    addNewOfflineSegment(segmentMetadata, assignedInstances);
     LOGGER.info("Added segment: {} of table: {} to ideal state", segmentName, offlineTableName);
   }
 
@@ -1687,6 +1690,24 @@ public class PinotHelixResourceManager {
   }
 
   /**
+   * Gets assigned instances for uploading new segment.
+   * @param segmentMetadata segment metadata
+   * @return a list of assigned instances.
+   */
+  public List<String> getAssignedInstancesForSegment(SegmentMetadata segmentMetadata) {
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(segmentMetadata.getTableName());
+    TableConfig offlineTableConfig = ZKMetadataProvider.getOfflineTableConfig(_propertyStore, offlineTableName);
+    Preconditions.checkNotNull(offlineTableConfig);
+    int numReplicas = Integer.parseInt(offlineTableConfig.getValidationConfig().getReplication());
+    String serverTenant = TagNameUtils.getOfflineTagForTenant(offlineTableConfig.getTenantConfig().getServer());
+    SegmentAssignmentStrategy segmentAssignmentStrategy = SegmentAssignmentStrategyFactory
+        .getSegmentAssignmentStrategy(offlineTableConfig.getValidationConfig().getSegmentAssignmentStrategy());
+    return segmentAssignmentStrategy
+        .getAssignedInstances(_helixZkManager, _helixAdmin, _propertyStore, _helixClusterName, segmentMetadata,
+            numReplicas, serverTenant);
+  }
+
+  /**
    * Helper method to add the passed in offline segment to the helix cluster.
    * - Gets the segment name and the table name from the passed in segment meta-data.
    * - Identifies the instance set onto which the segment needs to be added, based on
@@ -1697,21 +1718,11 @@ public class PinotHelixResourceManager {
    * @param segmentMetadata Meta-data for the segment, used to access segmentName and tableName.
    */
   // NOTE: method should be thread-safe
-  private void addNewOfflineSegment(SegmentMetadata segmentMetadata) {
+  private void addNewOfflineSegment(SegmentMetadata segmentMetadata, List<String> assignedInstances) {
     String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(segmentMetadata.getTableName());
     String segmentName = segmentMetadata.getName();
 
     // Assign new segment to instances
-    TableConfig offlineTableConfig = ZKMetadataProvider.getOfflineTableConfig(_propertyStore, offlineTableName);
-    Preconditions.checkNotNull(offlineTableConfig);
-    int numReplicas = Integer.parseInt(offlineTableConfig.getValidationConfig().getReplication());
-    String serverTenant = TagNameUtils.getOfflineTagForTenant(offlineTableConfig.getTenantConfig().getServer());
-    SegmentAssignmentStrategy segmentAssignmentStrategy = SegmentAssignmentStrategyFactory
-        .getSegmentAssignmentStrategy(offlineTableConfig.getValidationConfig().getSegmentAssignmentStrategy());
-    List<String> assignedInstances = segmentAssignmentStrategy
-        .getAssignedInstances(_helixZkManager, _helixAdmin, _propertyStore, _helixClusterName, segmentMetadata,
-            numReplicas, serverTenant);
-
     HelixHelper.addSegmentToIdealState(_helixZkManager, offlineTableName, segmentName, assignedInstances);
   }
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java
index fb86af2..bfdb354 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java
@@ -48,6 +48,7 @@ import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.http.util.EntityUtils;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
@@ -67,7 +68,7 @@ import org.testng.annotations.Test;
 /**
  * Tests the URI upload path through a local file uri.
  */
-public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTest {
+public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTestSet {
   private static final Logger LOGGER = LoggerFactory.getLogger(PinotURIUploadIntegrationTest.class);
   private String _tableName;
   private File _metadataDir = new File(_segmentDir, "tmpMeta");
@@ -110,8 +111,7 @@ public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTest {
     }
   }
 
-  protected void generateAndUploadRandomSegment(String segmentName, int rowCount)
-      throws Exception {
+  private File generateRandomSegment(String segmentName, int rowCount) throws Exception {
     ThreadLocalRandom random = ThreadLocalRandom.current();
     Schema schema = new Schema.Parser()
         .parse(new File(TestUtils.getFileFromResourceUrl(getClass().getClassLoader().getResource("dummy.avsc"))));
@@ -139,10 +139,8 @@ public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTest {
     executor.shutdown();
     executor.awaitTermination(1L, TimeUnit.MINUTES);
 
-    uploadSegmentsDirectly(segmentTarDir);
-
     FileUtils.forceDelete(avroFile);
-    FileUtils.forceDelete(segmentTarDir);
+    return new File(_tarDir, segmentName);
   }
 
   @DataProvider(name = "configProvider")
@@ -156,8 +154,10 @@ public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTest {
       throws Exception {
     final String segment6 = "segmentToBeRefreshed_6";
     final int nRows1 = 69;
-    generateAndUploadRandomSegment(segment6, nRows1);
+    File segmentTarDir = generateRandomSegment(segment6, nRows1);
+    uploadSegmentsDirectly(segmentTarDir);
     verifyNRows(0, nRows1);
+    FileUtils.forceDelete(segmentTarDir);
   }
 
   // Verify that the number of rows is either the initial value or the final value but not something else.
@@ -186,6 +186,37 @@ public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTest {
     Assert.fail("Failed to get from " + currentNrows + " to " + finalNrows);
   }
 
+  @Test(dataProvider = "configProvider")
+  public void testSegmentValidator(String tableName, SegmentVersion version)
+      throws Exception {
+    completeTableConfiguration();
+    String serverInstanceId = "Server_localhost_" + CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT;
+
+    // Disable server instance.
+    _helixAdmin.enableInstance(getHelixClusterName(), serverInstanceId, false);
+
+    final String segment6 = "segmentToBeRefreshed_6";
+    final int nRows1 = 69;
+    File segmentTarDir = generateRandomSegment(segment6, nRows1);
+    try {
+      uploadSegmentsDirectly(segmentTarDir);
+      Assert.fail("Uploading segments should fail.");
+    } catch (Exception e) {
+      //
+    }
+
+    // Re-enable the server instance.
+    _helixAdmin.enableInstance(getHelixClusterName(), serverInstanceId, true);
+
+    try {
+      uploadSegmentsDirectly(segmentTarDir);
+    } catch (Exception e) {
+      Assert.fail("Uploading segments should succeed.");
+    }
+
+    FileUtils.forceDelete(segmentTarDir);
+  }
+
   @AfterClass
   public void tearDown() {
     stopServer();
@@ -202,7 +233,7 @@ public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTest {
    *
    * @param segmentDir Segment directory
    */
-  protected void uploadSegmentsDirectly(@Nonnull File segmentDir)
+  private void uploadSegmentsDirectly(@Nonnull File segmentDir)
       throws Exception {
     String[] segmentNames = segmentDir.list();
     Assert.assertNotNull(segmentNames);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org