You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/02/26 19:25:01 UTC
[incubator-pinot] branch master updated: Check assigned instances
for uploading new segment (#3856)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9269f5e Check assigned instances for uploading new segment (#3856)
9269f5e is described below
commit 9269f5e02cfd868bd9448c6c4df9107e5776e60c
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Tue Feb 26 11:24:56 2019 -0800
Check assigned instances for uploading new segment (#3856)
* Improve segment validator to check assigned instances for uploading new segment
* Add Precondition on assignedInstances before updating zk
---
.../PinotSegmentUploadRestletResource.java | 13 +++--
.../controller/api/upload/SegmentValidator.java | 19 +++++++-
.../api/upload/SegmentValidatorResponse.java | 55 ++++++++++++++++++++++
.../pinot/controller/api/upload/ZKOperator.java | 12 ++---
.../helix/core/PinotHelixResourceManager.java | 39 +++++++++------
.../tests/PinotURIUploadIntegrationTest.java | 47 ++++++++++++++----
6 files changed, 151 insertions(+), 34 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
index 1460139..0bbb4b0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
@@ -57,6 +57,7 @@ import javax.ws.rs.core.Response;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.helix.ZNRecord;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metrics.ControllerMeter;
@@ -72,6 +73,7 @@ import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.access.AccessControl;
import org.apache.pinot.controller.api.access.AccessControlFactory;
import org.apache.pinot.controller.api.upload.SegmentValidator;
+import org.apache.pinot.controller.api.upload.SegmentValidatorResponse;
import org.apache.pinot.controller.api.upload.ZKOperator;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
@@ -315,12 +317,13 @@ public class PinotSegmentUploadRestletResource {
clientAddress);
// Validate segment
- new SegmentValidator(_pinotHelixResourceManager, _controllerConf, _executor, _connectionManager,
- _controllerMetrics).validateSegment(segmentMetadata, tempSegmentDir);
+ SegmentValidatorResponse segmentValidatorResponse =
+ new SegmentValidator(_pinotHelixResourceManager, _controllerConf, _executor, _connectionManager,
+ _controllerMetrics).validateSegment(segmentMetadata, tempSegmentDir);
// Zk operations
completeZkOperations(enableParallelPushProtection, headers, tempEncryptedFile, provider, segmentMetadata,
- segmentName, zkDownloadUri, moveSegmentToFinalLocation);
+ segmentName, zkDownloadUri, moveSegmentToFinalLocation, segmentValidatorResponse);
return new SuccessResponse(
"Successfully uploaded segment: " + segmentMetadata.getName() + " of table: " + segmentMetadata
@@ -380,7 +383,7 @@ public class PinotSegmentUploadRestletResource {
private void completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers, File tempDecryptedFile,
FileUploadPathProvider provider, SegmentMetadata segmentMetadata, String segmentName, String zkDownloadURI,
- boolean moveSegmentToFinalLocation)
+ boolean moveSegmentToFinalLocation, SegmentValidatorResponse segmentValidatorResponse)
throws Exception {
String finalSegmentPath = StringUtil
.join("/", provider.getBaseDataDirURI().toString(), segmentMetadata.getTableName(),
@@ -388,7 +391,7 @@ public class PinotSegmentUploadRestletResource {
URI finalSegmentLocationURI = new URI(finalSegmentPath);
ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics);
zkOperator.completeSegmentOperations(segmentMetadata, finalSegmentLocationURI, tempDecryptedFile,
- enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation);
+ enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, segmentValidatorResponse);
}
private void decryptFile(String crypterClassHeader, File tempEncryptedFile, File tempDecryptedFile) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
index e4f9d42..a7dc32e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
@@ -20,10 +20,12 @@ package org.apache.pinot.controller.api.upload;
import java.io.File;
import java.util.Date;
+import java.util.List;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.ws.rs.core.Response;
import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.helix.ZNRecord;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.exception.InvalidConfigException;
@@ -62,7 +64,7 @@ public class SegmentValidator {
_controllerMetrics = controllerMetrics;
}
- public void validateSegment(SegmentMetadata segmentMetadata, File tempSegmentDir) {
+ public SegmentValidatorResponse validateSegment(SegmentMetadata segmentMetadata, File tempSegmentDir) {
String rawTableName = segmentMetadata.getTableName();
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
String segmentName = segmentMetadata.getName();
@@ -74,6 +76,19 @@ public class SegmentValidator {
Response.Status.NOT_FOUND);
}
+ // Verifies whether there's server assigned to this segment when uploading a new segment.
+ List<String> assignedInstances = null;
+ ZNRecord segmentMetadataZnRecord =
+ _pinotHelixResourceManager.getSegmentMetadataZnRecord(offlineTableName, segmentName);
+ // Checks whether it's a new segment or an existing one.
+ if (segmentMetadataZnRecord == null) {
+ assignedInstances = _pinotHelixResourceManager.getAssignedInstancesForSegment(segmentMetadata);
+ if (assignedInstances.isEmpty()) {
+ throw new ControllerApplicationException(LOGGER, "No assigned Instances for Segment: " + segmentName
+ + ". Please check whether the table config is misconfigured.", Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
StorageQuotaChecker.QuotaCheckerResponse quotaResponse;
try {
quotaResponse = checkStorageQuota(tempSegmentDir, segmentMetadata, offlineTableConfig);
@@ -95,6 +110,8 @@ public class SegmentValidator {
"Invalid segment start/end time for segment: " + segmentName + " of table: " + offlineTableName,
Response.Status.NOT_ACCEPTABLE);
}
+
+ return new SegmentValidatorResponse(offlineTableConfig, segmentMetadataZnRecord, assignedInstances, quotaResponse);
}
/**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidatorResponse.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidatorResponse.java
new file mode 100644
index 0000000..b153291
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidatorResponse.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.upload;
+
+import java.util.List;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.controller.validation.StorageQuotaChecker;
+
+
+public class SegmentValidatorResponse {
+ private final TableConfig _offlineTableConfig;
+ private final ZNRecord _segmentMetadataZnRecord;
+ private final List<String> _assignedInstances;
+ private final StorageQuotaChecker.QuotaCheckerResponse _quotaCheckerResponse;
+
+ public SegmentValidatorResponse(TableConfig offlineTableConfig, ZNRecord segmentMetadataZnRecord, List<String> assignedInstances, StorageQuotaChecker.QuotaCheckerResponse quotaCheckerResponse) {
+ _offlineTableConfig = offlineTableConfig;
+ _segmentMetadataZnRecord = segmentMetadataZnRecord;
+ _assignedInstances = assignedInstances;
+ _quotaCheckerResponse = quotaCheckerResponse;
+ }
+
+ public TableConfig getOfflineTableConfig() {
+ return _offlineTableConfig;
+ }
+
+ public ZNRecord getSegmentMetadataZnRecord() {
+ return _segmentMetadataZnRecord;
+ }
+
+ public List<String> getAssignedInstances() {
+ return _assignedInstances;
+ }
+
+ public StorageQuotaChecker.QuotaCheckerResponse getQuotaCheckerResponse() {
+ return _quotaCheckerResponse;
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 5ef32f1..1bf5e8e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.api.upload;
import java.io.File;
import java.net.URI;
+import java.util.List;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.helix.ZNRecord;
@@ -58,20 +59,19 @@ public class ZKOperator {
public void completeSegmentOperations(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI,
- boolean moveSegmentToFinalLocation)
+ boolean moveSegmentToFinalLocation, SegmentValidatorResponse segmentValidatorResponse)
throws Exception {
String rawTableName = segmentMetadata.getTableName();
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
String segmentName = segmentMetadata.getName();
- ZNRecord znRecord = _pinotHelixResourceManager.getSegmentMetadataZnRecord(offlineTableName, segmentName);
-
// Brand new segment, not refresh, directly add the segment
+ ZNRecord znRecord = segmentValidatorResponse.getSegmentMetadataZnRecord();
if (znRecord == null) {
LOGGER.info("Adding new segment {} from table {}", segmentName, rawTableName);
String crypter = headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER);
processNewSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, zkDownloadURI, crypter,
- rawTableName, segmentName, moveSegmentToFinalLocation);
+ rawTableName, segmentName, moveSegmentToFinalLocation, segmentValidatorResponse.getAssignedInstances());
return;
}
@@ -203,7 +203,7 @@ public class ZKOperator {
private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
File currentSegmentLocation, String zkDownloadURI, String crypter, String rawTableName, String segmentName,
- boolean moveSegmentToFinalLocation) {
+ boolean moveSegmentToFinalLocation, List<String> assignedInstances) {
// For v1 segment uploads, we will not move the segment
if (moveSegmentToFinalLocation) {
try {
@@ -219,7 +219,7 @@ public class ZKOperator {
LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, rawTableName,
zkDownloadURI);
}
- _pinotHelixResourceManager.addNewSegment(segmentMetadata, zkDownloadURI, crypter);
+ _pinotHelixResourceManager.addNewSegment(segmentMetadata, zkDownloadURI, crypter, assignedInstances);
}
private void moveSegmentToPermanentDirectory(File currentSegmentLocation, URI finalSegmentLocationURI)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 4e59582..ef119aa 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1461,10 +1461,13 @@ public class PinotHelixResourceManager {
}
public void addNewSegment(@Nonnull SegmentMetadata segmentMetadata, @Nonnull String downloadUrl) {
- addNewSegment(segmentMetadata, downloadUrl, null);
+ List<String> assignedInstances = getAssignedInstancesForSegment(segmentMetadata);
+ addNewSegment(segmentMetadata, downloadUrl, null, assignedInstances);
}
- public void addNewSegment(@Nonnull SegmentMetadata segmentMetadata, @Nonnull String downloadUrl, String crypter) {
+ public void addNewSegment(@Nonnull SegmentMetadata segmentMetadata, @Nonnull String downloadUrl, String crypter,
+ @Nonnull List<String> assignedInstances) {
+ Preconditions.checkNotNull(assignedInstances, "Assigned Instances should not be null!");
String segmentName = segmentMetadata.getName();
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(segmentMetadata.getTableName());
@@ -1481,7 +1484,7 @@ public class PinotHelixResourceManager {
}
LOGGER.info("Added segment: {} of table: {} to property store", segmentName, offlineTableName);
- addNewOfflineSegment(segmentMetadata);
+ addNewOfflineSegment(segmentMetadata, assignedInstances);
LOGGER.info("Added segment: {} of table: {} to ideal state", segmentName, offlineTableName);
}
@@ -1687,6 +1690,24 @@ public class PinotHelixResourceManager {
}
/**
+ * Gets assigned instances for uploading new segment.
+ * @param segmentMetadata segment metadata
+ * @return a list of assigned instances.
+ */
+ public List<String> getAssignedInstancesForSegment(SegmentMetadata segmentMetadata) {
+ String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(segmentMetadata.getTableName());
+ TableConfig offlineTableConfig = ZKMetadataProvider.getOfflineTableConfig(_propertyStore, offlineTableName);
+ Preconditions.checkNotNull(offlineTableConfig);
+ int numReplicas = Integer.parseInt(offlineTableConfig.getValidationConfig().getReplication());
+ String serverTenant = TagNameUtils.getOfflineTagForTenant(offlineTableConfig.getTenantConfig().getServer());
+ SegmentAssignmentStrategy segmentAssignmentStrategy = SegmentAssignmentStrategyFactory
+ .getSegmentAssignmentStrategy(offlineTableConfig.getValidationConfig().getSegmentAssignmentStrategy());
+ return segmentAssignmentStrategy
+ .getAssignedInstances(_helixZkManager, _helixAdmin, _propertyStore, _helixClusterName, segmentMetadata,
+ numReplicas, serverTenant);
+ }
+
+ /**
* Helper method to add the passed in offline segment to the helix cluster.
* - Gets the segment name and the table name from the passed in segment meta-data.
* - Identifies the instance set onto which the segment needs to be added, based on
@@ -1697,21 +1718,11 @@ public class PinotHelixResourceManager {
* @param segmentMetadata Meta-data for the segment, used to access segmentName and tableName.
*/
// NOTE: method should be thread-safe
- private void addNewOfflineSegment(SegmentMetadata segmentMetadata) {
+ private void addNewOfflineSegment(SegmentMetadata segmentMetadata, List<String> assignedInstances) {
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(segmentMetadata.getTableName());
String segmentName = segmentMetadata.getName();
// Assign new segment to instances
- TableConfig offlineTableConfig = ZKMetadataProvider.getOfflineTableConfig(_propertyStore, offlineTableName);
- Preconditions.checkNotNull(offlineTableConfig);
- int numReplicas = Integer.parseInt(offlineTableConfig.getValidationConfig().getReplication());
- String serverTenant = TagNameUtils.getOfflineTagForTenant(offlineTableConfig.getTenantConfig().getServer());
- SegmentAssignmentStrategy segmentAssignmentStrategy = SegmentAssignmentStrategyFactory
- .getSegmentAssignmentStrategy(offlineTableConfig.getValidationConfig().getSegmentAssignmentStrategy());
- List<String> assignedInstances = segmentAssignmentStrategy
- .getAssignedInstances(_helixZkManager, _helixAdmin, _propertyStore, _helixClusterName, segmentMetadata,
- numReplicas, serverTenant);
-
HelixHelper.addSegmentToIdealState(_helixZkManager, offlineTableName, segmentName, assignedInstances);
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java
index fb86af2..bfdb354 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java
@@ -48,6 +48,7 @@ import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
+import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.JsonUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
@@ -67,7 +68,7 @@ import org.testng.annotations.Test;
/**
* Tests the URI upload path through a local file uri.
*/
-public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTest {
+public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTestSet {
private static final Logger LOGGER = LoggerFactory.getLogger(PinotURIUploadIntegrationTest.class);
private String _tableName;
private File _metadataDir = new File(_segmentDir, "tmpMeta");
@@ -110,8 +111,7 @@ public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTest {
}
}
- protected void generateAndUploadRandomSegment(String segmentName, int rowCount)
- throws Exception {
+ private File generateRandomSegment(String segmentName, int rowCount) throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
Schema schema = new Schema.Parser()
.parse(new File(TestUtils.getFileFromResourceUrl(getClass().getClassLoader().getResource("dummy.avsc"))));
@@ -139,10 +139,8 @@ public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTest {
executor.shutdown();
executor.awaitTermination(1L, TimeUnit.MINUTES);
- uploadSegmentsDirectly(segmentTarDir);
-
FileUtils.forceDelete(avroFile);
- FileUtils.forceDelete(segmentTarDir);
+ return new File(_tarDir, segmentName);
}
@DataProvider(name = "configProvider")
@@ -156,8 +154,10 @@ public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTest {
throws Exception {
final String segment6 = "segmentToBeRefreshed_6";
final int nRows1 = 69;
- generateAndUploadRandomSegment(segment6, nRows1);
+ File segmentTarDir = generateRandomSegment(segment6, nRows1);
+ uploadSegmentsDirectly(segmentTarDir);
verifyNRows(0, nRows1);
+ FileUtils.forceDelete(segmentTarDir);
}
// Verify that the number of rows is either the initial value or the final value but not something else.
@@ -186,6 +186,37 @@ public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTest {
Assert.fail("Failed to get from " + currentNrows + " to " + finalNrows);
}
+ @Test(dataProvider = "configProvider")
+ public void testSegmentValidator(String tableName, SegmentVersion version)
+ throws Exception {
+ completeTableConfiguration();
+ String serverInstanceId = "Server_localhost_" + CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT;
+
+ // Disable server instance.
+ _helixAdmin.enableInstance(getHelixClusterName(), serverInstanceId, false);
+
+ final String segment6 = "segmentToBeRefreshed_6";
+ final int nRows1 = 69;
+ File segmentTarDir = generateRandomSegment(segment6, nRows1);
+ try {
+ uploadSegmentsDirectly(segmentTarDir);
+ Assert.fail("Uploading segments should fail.");
+ } catch (Exception e) {
+ //
+ }
+
+ // Re-enable the server instance.
+ _helixAdmin.enableInstance(getHelixClusterName(), serverInstanceId, true);
+
+ try {
+ uploadSegmentsDirectly(segmentTarDir);
+ } catch (Exception e) {
+ Assert.fail("Uploading segments should succeed.");
+ }
+
+ FileUtils.forceDelete(segmentTarDir);
+ }
+
@AfterClass
public void tearDown() {
stopServer();
@@ -202,7 +233,7 @@ public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTest {
*
* @param segmentDir Segment directory
*/
- protected void uploadSegmentsDirectly(@Nonnull File segmentDir)
+ private void uploadSegmentsDirectly(@Nonnull File segmentDir)
throws Exception {
String[] segmentNames = segmentDir.list();
Assert.assertNotNull(segmentNames);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org