You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/06/18 17:54:24 UTC
[incubator-pinot] 01/01: Deleting extra segments after push
This is an automated email from the ASF dual-hosted git repository.
jenniferdai pushed a commit to branch deletePush
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 618e475005abe42165e3260241193d87118a39a5
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Tue Jun 18 10:54:07 2019 -0700
Deleting extra segments after push
---
.../common/utils/FileUploadDownloadClient.java | 39 +++++++++++++++
.../apache/pinot/hadoop/job/ControllerRestApi.java | 9 ++++
.../pinot/hadoop/job/DefaultControllerRestApi.java | 57 ++++++++++++++++++++++
.../apache/pinot/hadoop/job/SegmentTarPushJob.java | 7 +++
4 files changed, 112 insertions(+)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index de8c76a..4cf3210 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -26,8 +26,10 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URLEncoder;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
@@ -89,12 +91,14 @@ public class FileUploadDownloadClient implements Closeable {
public static final int DEFAULT_SOCKET_TIMEOUT_MS = 600 * 1000; // 10 minutes
public static final int GET_REQUEST_SOCKET_TIMEOUT_MS = 5 * 1000; // 5 seconds
+ public static final int DELETE_REQUEST_SOCKET_TIMEOUT_MS = 10 * 1000; // 10 seconds
private static final String HTTP = "http";
private static final String HTTPS = "https";
private static final String SCHEMA_PATH = "/schemas";
private static final String OLD_SEGMENT_PATH = "/segments";
private static final String SEGMENT_PATH = "/v2/segments";
+ private static final String DELETE_SEGMENT_PATH = "/segments";
private static final String SEGMENT_METADATA_PATH = "/segmentmetadata";
private static final String TABLES_PATH = "/tables";
private static final String TYPE_DELIMITER = "?type=";
@@ -127,6 +131,16 @@ public class FileUploadDownloadClient implements Closeable {
return getURI(HTTP, host, port, TABLES_PATH + "/" + rawTableName);
}
+ public static URI getDeleteSegmentHttpUri(String host, int port, String rawTableName, String segmentName,
+ String tableType)
+ throws URISyntaxException, UnsupportedEncodingException {
+ return getURI(HTTP, host, port, DELETE_SEGMENT_PATH + "/" + rawTableName + "/" + URLEncoder.encode(segmentName, "UTF-8") + "?type=" + tableType);
+ }
+
+ public static URI getRetrieveAllSegmentWithTableTypeHttpUri(String host, int port, String rawTableName, String tableType) throws URISyntaxException {
+ return getURI(HTTP, host, port, OLD_SEGMENT_PATH + "/" + rawTableName + "?type=" + tableType);
+ }
+
public static URI getRetrieveSchemaHttpURI(String host, int port, String schemaName)
throws URISyntaxException {
return getURI(HTTP, host, port, SCHEMA_PATH + "/" + schemaName);
@@ -196,12 +210,32 @@ public class FileUploadDownloadClient implements Closeable {
return requestBuilder.build();
}
+ private static HttpUriRequest getDeleteFileRequest(String method, URI uri, ContentBody contentBody,
+ @Nullable List<Header> headers, @Nullable List<NameValuePair> parameters, int socketTimeoutMs) {
+ // Build the Http entity
+ HttpEntity entity = MultipartEntityBuilder.create().setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
+ .addPart(contentBody.getFilename(), contentBody).build();
+
+ // Build the request
+ RequestBuilder requestBuilder =
+ RequestBuilder.create(method).setVersion(HttpVersion.HTTP_1_1).setUri(uri).setEntity(entity);
+ addHeadersAndParameters(requestBuilder, headers, parameters);
+ setTimeout(requestBuilder, socketTimeoutMs);
+ return requestBuilder.build();
+ }
+
private static HttpUriRequest constructGetRequest(URI uri) {
RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
setTimeout(requestBuilder, GET_REQUEST_SOCKET_TIMEOUT_MS);
return requestBuilder.build();
}
+ private static HttpUriRequest constructDeleteRequest(URI uri) {
+ RequestBuilder requestBuilder = RequestBuilder.delete(uri).setVersion(HttpVersion.HTTP_1_1);
+ setTimeout(requestBuilder, DELETE_REQUEST_SOCKET_TIMEOUT_MS);
+ return requestBuilder.build();
+ }
+
private static HttpUriRequest getAddSchemaRequest(URI uri, String schemaName, File schemaFile) {
return getUploadFileRequest(HttpPost.METHOD_NAME, uri, getContentBody(schemaName, schemaFile), null, null,
DEFAULT_SOCKET_TIMEOUT_MS);
@@ -355,6 +389,11 @@ public class FileUploadDownloadClient implements Closeable {
return sendRequest(constructGetRequest(uri));
}
+ public SimpleHttpResponse sendDeleteRequest(URI uri)
+ throws IOException, HttpErrorStatusException {
+ return sendRequest(constructDeleteRequest(uri));
+ }
+
/**
* Add schema.
*
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
index 319b55b..79f00d5 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
@@ -35,4 +35,13 @@ public interface ControllerRestApi extends Closeable {
void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths);
void sendSegmentUris(List<String> segmentUris);
+
+ /**
+ * Delete extra segments after push during REFRESH use cases. Also used in APPEND use cases where
+ * a day that has been re-pushed has extra segments.
+ * @param segmentUris
+ */
+ void deleteExtraSegmentUris(List<String> segmentUris);
+
+ List<String> getAllSegments(String tableType);
}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
index 089ea6e..e60dbfe 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -135,8 +136,64 @@ public class DefaultControllerRestApi implements ControllerRestApi {
}
@Override
+ public void deleteExtraSegmentUris(List<String> segmentUris) {
+ LOGGER.info("Start deleting segment URIs: {} to locations: {}", segmentUris, _pushLocations);
+ for (String segmentUri : segmentUris) {
+ for (PushLocation pushLocation : _pushLocations) {
+ LOGGER.info("Sending deleting segment URI: {} to location: {}", segmentUri, pushLocation);
+ try {
+ SimpleHttpResponse response = _fileUploadDownloadClient.sendDeleteRequest(
+ FileUploadDownloadClient.getDeleteSegmentHttpUri(pushLocation.getHost(), pushLocation.getPort(), _rawTableName,
+ segmentUri, "OFFLINE"));
+ LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while deleting segment URI: {} to location: {}", segmentUri, pushLocation, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public List<String> getAllSegments(String tableType) {
+ LOGGER.info("Getting all segments");
+ for (PushLocation pushLocation : _pushLocations) {
+ try {
+ SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(
+ FileUploadDownloadClient.getRetrieveAllSegmentWithTableTypeHttpUri(pushLocation.getHost(), pushLocation.getPort(),
+ _rawTableName, tableType));
+ JsonNode segmentList = getSegmentsFromJsonSegmentAPI(response.getResponse(), tableType);
+ return segmentList.findValuesAsText(tableType);
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while getting all {} segments for table: {} from push location: {}", tableType, _rawTableName,
+ pushLocation, e);
+ }
+ }
+ String errorMessage =
+ String.format("Failed to get a list of all segments from push locations: %s for table: %s", _pushLocations,
+ _rawTableName);
+ LOGGER.error(errorMessage);
+ throw new RuntimeException(errorMessage);
+
+ }
+
+ @Override
public void close()
throws IOException {
_fileUploadDownloadClient.close();
}
+
+ private JsonNode getSegmentsFromJsonSegmentAPI(String json, String type)
+ throws Exception {
+ return JsonUtils.stringToJsonNode(json).get(0).get(type);
+ }
+
+ public static void main (String[] args) {
+ List<PushLocation> pushLocations = new ArrayList<>();
+ PushLocation pushLocation = new PushLocation("lva1-app011", 11984);
+ pushLocations.add(pushLocation);
+
+ DefaultControllerRestApi defaultControllerRestApi = new DefaultControllerRestApi(pushLocations, "myTable");
+ defaultControllerRestApi.getAllSegments("OFFLINE");
+ }
}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
index 7c71fd8..e8d650e 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
@@ -31,6 +31,8 @@ public class SegmentTarPushJob extends BaseSegmentJob {
private final Path _segmentPattern;
private final List<PushLocation> _pushLocations;
+ private static final String TABLE_TYPE = "OFFLINE";
+
public SegmentTarPushJob(Properties properties) {
super(properties);
_segmentPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT));
@@ -48,7 +50,12 @@ public class SegmentTarPushJob extends BaseSegmentJob {
throws Exception {
FileSystem fileSystem = FileSystem.get(_conf);
try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+ List<String> segmentsBeforePush = controllerRestApi.getAllSegments(TABLE_TYPE);
controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern));
+ List<String> segmentsAfterPush = controllerRestApi.getAllSegments(TABLE_TYPE);
+
+ segmentsBeforePush.removeAll(segmentsAfterPush);
+ controllerRestApi.deleteExtraSegmentUris(segmentsBeforePush);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org