You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/06/26 18:08:40 UTC
[incubator-pinot] branch master updated: Adding util methods for
controller that will be used in offline push (#4360)
This is an automated email from the ASF dual-hosted git repository.
jenniferdai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 866e29a Adding util methods for controller that will be used in offline push (#4360)
866e29a is described below
commit 866e29a0cd0e23ceaee19841167601e8b3b982cc
Author: Jennifer Dai <je...@users.noreply.github.com>
AuthorDate: Wed Jun 26 11:08:35 2019 -0700
Adding util methods for controller that will be used in offline push (#4360)
See issue #4353 for final design
---
.../common/utils/FileUploadDownloadClient.java | 41 ++++++++++++++++++
.../apache/pinot/hadoop/job/ControllerRestApi.java | 4 ++
.../pinot/hadoop/job/DefaultControllerRestApi.java | 50 ++++++++++++++++++++++
.../DeleteAPIHybridClusterIntegrationTest.java | 23 ++++++++++
4 files changed, 118 insertions(+)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index de8c76a..d3d02cd 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -26,13 +26,16 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URLEncoder;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
@@ -89,6 +92,7 @@ public class FileUploadDownloadClient implements Closeable {
public static final int DEFAULT_SOCKET_TIMEOUT_MS = 600 * 1000; // 10 minutes
public static final int GET_REQUEST_SOCKET_TIMEOUT_MS = 5 * 1000; // 5 seconds
+ public static final int DELETE_REQUEST_SOCKET_TIMEOUT_MS = 10 * 1000; // 10 seconds
private static final String HTTP = "http";
private static final String HTTPS = "https";
@@ -127,6 +131,18 @@ public class FileUploadDownloadClient implements Closeable {
return getURI(HTTP, host, port, TABLES_PATH + "/" + rawTableName);
}
+ public static URI getDeleteSegmentHttpUri(String host, int port, String rawTableName, String segmentName,
+ String tableType)
+ throws URISyntaxException, UnsupportedEncodingException {
+ return new URI(StringUtil.join("/", StringUtils.chomp(HTTP + "://" + host + ":" + port, "/"),
+ OLD_SEGMENT_PATH, rawTableName + "/" + URLEncoder.encode(segmentName, "UTF-8") + TYPE_DELIMITER + tableType));
+ }
+
+ public static URI getRetrieveAllSegmentWithTableTypeHttpUri(String host, int port, String rawTableName, String tableType) throws URISyntaxException {
+ return new URI(StringUtil.join("/", StringUtils.chomp(HTTP + "://" + host + ":" + port, "/"),
+ OLD_SEGMENT_PATH, rawTableName + TYPE_DELIMITER + tableType));
+ }
+
public static URI getRetrieveSchemaHttpURI(String host, int port, String schemaName)
throws URISyntaxException {
return getURI(HTTP, host, port, SCHEMA_PATH + "/" + schemaName);
@@ -196,12 +212,32 @@ public class FileUploadDownloadClient implements Closeable {
return requestBuilder.build();
}
+ private static HttpUriRequest getDeleteFileRequest(String method, URI uri, ContentBody contentBody,
+ @Nullable List<Header> headers, @Nullable List<NameValuePair> parameters, int socketTimeoutMs) {
+ // Build the Http entity
+ HttpEntity entity = MultipartEntityBuilder.create().setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
+ .addPart(contentBody.getFilename(), contentBody).build();
+
+ // Build the request
+ RequestBuilder requestBuilder =
+ RequestBuilder.create(method).setVersion(HttpVersion.HTTP_1_1).setUri(uri).setEntity(entity);
+ addHeadersAndParameters(requestBuilder, headers, parameters);
+ setTimeout(requestBuilder, socketTimeoutMs);
+ return requestBuilder.build();
+ }
+
private static HttpUriRequest constructGetRequest(URI uri) {
RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
setTimeout(requestBuilder, GET_REQUEST_SOCKET_TIMEOUT_MS);
return requestBuilder.build();
}
+ private static HttpUriRequest constructDeleteRequest(URI uri) {
+ RequestBuilder requestBuilder = RequestBuilder.delete(uri).setVersion(HttpVersion.HTTP_1_1);
+ setTimeout(requestBuilder, DELETE_REQUEST_SOCKET_TIMEOUT_MS);
+ return requestBuilder.build();
+ }
+
private static HttpUriRequest getAddSchemaRequest(URI uri, String schemaName, File schemaFile) {
return getUploadFileRequest(HttpPost.METHOD_NAME, uri, getContentBody(schemaName, schemaFile), null, null,
DEFAULT_SOCKET_TIMEOUT_MS);
@@ -355,6 +391,11 @@ public class FileUploadDownloadClient implements Closeable {
return sendRequest(constructGetRequest(uri));
}
+ public SimpleHttpResponse sendDeleteRequest(URI uri)
+ throws IOException, HttpErrorStatusException {
+ return sendRequest(constructDeleteRequest(uri));
+ }
+
/**
* Add schema.
*
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
index 319b55b..ddcdcaa 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
@@ -35,4 +35,8 @@ public interface ControllerRestApi extends Closeable {
void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths);
void sendSegmentUris(List<String> segmentUris);
+
+ void deleteSegmentUris(List<String> segmentUris);
+
+ List<String> getAllSegments(String tableType);
}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
index 089ea6e..92e0dee 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
@@ -19,9 +19,11 @@
package org.apache.pinot.hadoop.job;
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -135,8 +137,56 @@ public class DefaultControllerRestApi implements ControllerRestApi {
}
@Override
+ public void deleteSegmentUris(List<String> segmentUris) {
+ LOGGER.info("Start deleting segment URIs: {} to locations: {}", segmentUris, _pushLocations);
+ for (String segmentUri : segmentUris) {
+ for (PushLocation pushLocation : _pushLocations) {
+ LOGGER.info("Sending deleting segment URI: {} to location: {}", segmentUri, pushLocation);
+ try {
+ SimpleHttpResponse response = _fileUploadDownloadClient.sendDeleteRequest(
+ FileUploadDownloadClient.getDeleteSegmentHttpUri(pushLocation.getHost(), pushLocation.getPort(), _rawTableName,
+ segmentUri, "OFFLINE"));
+ LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while deleting segment URI: {} to location: {}", segmentUri, pushLocation, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public List<String> getAllSegments(String tableType) {
+ LOGGER.info("Getting all segments of table {}", _rawTableName);
+ ObjectMapper objectMapper = new ObjectMapper();
+ for (PushLocation pushLocation : _pushLocations) {
+ try {
+ SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(
+ FileUploadDownloadClient.getRetrieveAllSegmentWithTableTypeHttpUri(pushLocation.getHost(), pushLocation.getPort(),
+ _rawTableName, tableType));
+ JsonNode segmentList = getSegmentsFromJsonSegmentAPI(response.getResponse(), tableType);
+ return objectMapper.convertValue(segmentList, ArrayList.class);
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while getting all {} segments for table: {} from push location: {}", tableType, _rawTableName,
+ pushLocation, e);
+ }
+ }
+ String errorMessage =
+ String.format("Failed to get a list of all segments from push locations: %s for table: %s", _pushLocations,
+ _rawTableName);
+ LOGGER.error(errorMessage);
+ throw new RuntimeException(errorMessage);
+
+ }
+
+ @Override
public void close()
throws IOException {
_fileUploadDownloadClient.close();
}
+
+ private JsonNode getSegmentsFromJsonSegmentAPI(String json, String tableType)
+ throws Exception {
+ return JsonUtils.stringToJsonNode(json).get(0).get(tableType);
+ }
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DeleteAPIHybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DeleteAPIHybridClusterIntegrationTest.java
index 173561e..7ee7077 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DeleteAPIHybridClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DeleteAPIHybridClusterIntegrationTest.java
@@ -19,13 +19,18 @@
package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import junit.framework.Assert;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.hadoop.job.ControllerRestApi;
+import org.apache.pinot.hadoop.job.DefaultControllerRestApi;
+import org.apache.pinot.hadoop.utils.PushLocation;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -237,9 +242,27 @@ public class DeleteAPIHybridClusterIntegrationTest extends HybridClusterIntegrat
forSegmentListAPIWithTableType(TABLE_NAME, CommonConstants.Helix.TableType.OFFLINE.toString()));
JsonNode offlineSegmentsListReturn =
getSegmentsFromJsonSegmentAPI(postDeleteSegmentList, CommonConstants.Helix.TableType.OFFLINE.toString());
+
+ // Get all segments
+ PushLocation pushLocation = new PushLocation("localhost", 18998);
+ List<PushLocation> pushLocations = new ArrayList<>();
+ pushLocations.add(pushLocation);
+ ControllerRestApi controllerRestApi = new DefaultControllerRestApi(pushLocations, "mytable");
+ List<String> allSegments = controllerRestApi.getAllSegments("OFFLINE");
+ Assert.assertEquals(allSegments.size(), offlineSegmentsListReturn.size());
+
removeValue(offlineSegmentsList, removedSegment);
Assert.assertEquals(offlineSegmentsListReturn, offlineSegmentsList);
+ // Test Delete one more segment
+ String segmentUri = allSegments.get(0);
+ List<String> deleteSegmentUris = new ArrayList<>();
+ deleteSegmentUris.add(segmentUri);
+ controllerRestApi.deleteSegmentUris(deleteSegmentUris);
+ allSegments.remove(0);
+ List<String> postDelete = controllerRestApi.getAllSegments("OFFLINE");
+ Assert.assertEquals(postDelete.size(), allSegments.size());
+
// Testing Delete All API here
sendGetRequest(_controllerRequestURLBuilder.
forDeleteAllSegmentsWithTypeWithGetAPI(TABLE_NAME, CommonConstants.Helix.TableType.OFFLINE.toString()));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org