You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/06/18 17:54:24 UTC

[incubator-pinot] 01/01: Deleting extra segments after push

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch deletePush
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 618e475005abe42165e3260241193d87118a39a5
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Tue Jun 18 10:54:07 2019 -0700

    Deleting extra segments after push
---
 .../common/utils/FileUploadDownloadClient.java     | 39 +++++++++++++++
 .../apache/pinot/hadoop/job/ControllerRestApi.java |  9 ++++
 .../pinot/hadoop/job/DefaultControllerRestApi.java | 57 ++++++++++++++++++++++
 .../apache/pinot/hadoop/job/SegmentTarPushJob.java |  7 +++
 4 files changed, 112 insertions(+)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index de8c76a..4cf3210 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -26,8 +26,10 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URLEncoder;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
@@ -89,12 +91,14 @@ public class FileUploadDownloadClient implements Closeable {
 
   public static final int DEFAULT_SOCKET_TIMEOUT_MS = 600 * 1000; // 10 minutes
   public static final int GET_REQUEST_SOCKET_TIMEOUT_MS = 5 * 1000; // 5 seconds
+  public static final int DELETE_REQUEST_SOCKET_TIMEOUT_MS = 10 * 1000; // 10 seconds
 
   private static final String HTTP = "http";
   private static final String HTTPS = "https";
   private static final String SCHEMA_PATH = "/schemas";
   private static final String OLD_SEGMENT_PATH = "/segments";
   private static final String SEGMENT_PATH = "/v2/segments";
+  private static final String DELETE_SEGMENT_PATH = "/segments";
   private static final String SEGMENT_METADATA_PATH = "/segmentmetadata";
   private static final String TABLES_PATH = "/tables";
   private static final String TYPE_DELIMITER = "?type=";
@@ -127,6 +131,16 @@ public class FileUploadDownloadClient implements Closeable {
     return getURI(HTTP, host, port, TABLES_PATH + "/" + rawTableName);
   }
 
+  public static URI getDeleteSegmentHttpUri(String host, int port, String rawTableName, String segmentName,
+      String tableType)
+      throws URISyntaxException, UnsupportedEncodingException {
+    return getURI(HTTP, host, port, DELETE_SEGMENT_PATH + "/" + rawTableName + "/" + URLEncoder.encode(segmentName, "UTF-8") + "?type=" + tableType);
+  }
+
+  public static URI getRetrieveAllSegmentWithTableTypeHttpUri(String host, int port, String rawTableName, String tableType) throws URISyntaxException {
+    return getURI(HTTP, host, port, OLD_SEGMENT_PATH + "/" + rawTableName + "?type=" + tableType);
+  }
+
   public static URI getRetrieveSchemaHttpURI(String host, int port, String schemaName)
       throws URISyntaxException {
     return getURI(HTTP, host, port, SCHEMA_PATH + "/" + schemaName);
@@ -196,12 +210,32 @@ public class FileUploadDownloadClient implements Closeable {
     return requestBuilder.build();
   }
 
+  private static HttpUriRequest getDeleteFileRequest(String method, URI uri, ContentBody contentBody,
+      @Nullable List<Header> headers, @Nullable List<NameValuePair> parameters, int socketTimeoutMs) {
+    // Build the Http entity
+    HttpEntity entity = MultipartEntityBuilder.create().setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
+        .addPart(contentBody.getFilename(), contentBody).build();
+
+    // Build the request
+    RequestBuilder requestBuilder =
+        RequestBuilder.create(method).setVersion(HttpVersion.HTTP_1_1).setUri(uri).setEntity(entity);
+    addHeadersAndParameters(requestBuilder, headers, parameters);
+    setTimeout(requestBuilder, socketTimeoutMs);
+    return requestBuilder.build();
+  }
+
   private static HttpUriRequest constructGetRequest(URI uri) {
     RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
     setTimeout(requestBuilder, GET_REQUEST_SOCKET_TIMEOUT_MS);
     return requestBuilder.build();
   }
 
+  private static HttpUriRequest constructDeleteRequest(URI uri) {
+    RequestBuilder requestBuilder = RequestBuilder.delete(uri).setVersion(HttpVersion.HTTP_1_1);
+    setTimeout(requestBuilder, DELETE_REQUEST_SOCKET_TIMEOUT_MS);
+    return requestBuilder.build();
+  }
+
   private static HttpUriRequest getAddSchemaRequest(URI uri, String schemaName, File schemaFile) {
     return getUploadFileRequest(HttpPost.METHOD_NAME, uri, getContentBody(schemaName, schemaFile), null, null,
         DEFAULT_SOCKET_TIMEOUT_MS);
@@ -355,6 +389,11 @@ public class FileUploadDownloadClient implements Closeable {
     return sendRequest(constructGetRequest(uri));
   }
 
+  public SimpleHttpResponse sendDeleteRequest(URI uri)
+      throws IOException, HttpErrorStatusException {
+    return sendRequest(constructDeleteRequest(uri));
+  }
+
   /**
    * Add schema.
    *
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
index 319b55b..79f00d5 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
@@ -35,4 +35,13 @@ public interface ControllerRestApi extends Closeable {
   void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths);
 
   void sendSegmentUris(List<String> segmentUris);
+
+  /**
+   * Delete extra segments after push during REFRESH use cases. Also used in APPEND use cases where
+   * a day that has been re-pushed has extra segments.
+   * @param segmentUris
+   */
+  void deleteExtraSegmentUris(List<String> segmentUris);
+
+  List<String> getAllSegments(String tableType);
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
index 089ea6e..e60dbfe 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -135,8 +136,64 @@ public class DefaultControllerRestApi implements ControllerRestApi {
   }
 
   @Override
+  public void deleteExtraSegmentUris(List<String> segmentUris) {
+    LOGGER.info("Start deleting segment URIs: {} to locations: {}", segmentUris, _pushLocations);
+    for (String segmentUri : segmentUris) {
+      for (PushLocation pushLocation : _pushLocations) {
+        LOGGER.info("Sending deleting segment URI: {} to location: {}", segmentUri, pushLocation);
+        try {
+          SimpleHttpResponse response = _fileUploadDownloadClient.sendDeleteRequest(
+              FileUploadDownloadClient.getDeleteSegmentHttpUri(pushLocation.getHost(), pushLocation.getPort(), _rawTableName,
+              segmentUri, "OFFLINE"));
+          LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while deleting segment URI: {} to location: {}", segmentUri, pushLocation, e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public List<String> getAllSegments(String tableType) {
+    LOGGER.info("Getting all segments");
+    for (PushLocation pushLocation : _pushLocations) {
+      try {
+        SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(
+            FileUploadDownloadClient.getRetrieveAllSegmentWithTableTypeHttpUri(pushLocation.getHost(), pushLocation.getPort(),
+                _rawTableName, tableType));
+        JsonNode segmentList = getSegmentsFromJsonSegmentAPI(response.getResponse(), tableType);
+        return segmentList.findValuesAsText(tableType);
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while getting all {} segments for table: {} from push location: {}", tableType, _rawTableName,
+            pushLocation, e);
+      }
+    }
+    String errorMessage =
+        String.format("Failed to get a list of all segments from push locations: %s for table: %s", _pushLocations,
+            _rawTableName);
+    LOGGER.error(errorMessage);
+    throw new RuntimeException(errorMessage);
+
+  }
+
+  @Override
   public void close()
       throws IOException {
     _fileUploadDownloadClient.close();
   }
+
+  private JsonNode getSegmentsFromJsonSegmentAPI(String json, String type)
+      throws Exception {
+    return JsonUtils.stringToJsonNode(json).get(0).get(type);
+  }
+
+  public static void main (String[] args) {
+    List<PushLocation> pushLocations = new ArrayList<>();
+    PushLocation pushLocation = new PushLocation("lva1-app011", 11984);
+    pushLocations.add(pushLocation);
+
+    DefaultControllerRestApi defaultControllerRestApi = new DefaultControllerRestApi(pushLocations, "myTable");
+    defaultControllerRestApi.getAllSegments("OFFLINE");
+  }
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
index 7c71fd8..e8d650e 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
@@ -31,6 +31,8 @@ public class SegmentTarPushJob extends BaseSegmentJob {
   private final Path _segmentPattern;
   private final List<PushLocation> _pushLocations;
 
+  private static final String TABLE_TYPE = "OFFLINE";
+
   public SegmentTarPushJob(Properties properties) {
     super(properties);
     _segmentPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT));
@@ -48,7 +50,12 @@ public class SegmentTarPushJob extends BaseSegmentJob {
       throws Exception {
     FileSystem fileSystem = FileSystem.get(_conf);
     try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+      List<String> segmentsBeforePush = controllerRestApi.getAllSegments(TABLE_TYPE);
       controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern));
+      List<String> segmentsAfterPush = controllerRestApi.getAllSegments(TABLE_TYPE);
+
+      segmentsBeforePush.removeAll(segmentsAfterPush);
+      controllerRestApi.deleteExtraSegmentUris(segmentsBeforePush);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org