You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/06/18 17:54:23 UTC

[incubator-pinot] branch deletePush created (now 618e475)

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a change to branch deletePush
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 618e475  Deleting extra segments after push

This branch includes the following new commits:

     new 618e475  Deleting extra segments after push

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Deleting extra segments after push

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch deletePush
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 618e475005abe42165e3260241193d87118a39a5
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Tue Jun 18 10:54:07 2019 -0700

    Deleting extra segments after push
---
 .../common/utils/FileUploadDownloadClient.java     | 39 +++++++++++++++
 .../apache/pinot/hadoop/job/ControllerRestApi.java |  9 ++++
 .../pinot/hadoop/job/DefaultControllerRestApi.java | 57 ++++++++++++++++++++++
 .../apache/pinot/hadoop/job/SegmentTarPushJob.java |  7 +++
 4 files changed, 112 insertions(+)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index de8c76a..4cf3210 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -26,8 +26,10 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URLEncoder;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
@@ -89,12 +91,14 @@ public class FileUploadDownloadClient implements Closeable {
 
   public static final int DEFAULT_SOCKET_TIMEOUT_MS = 600 * 1000; // 10 minutes
   public static final int GET_REQUEST_SOCKET_TIMEOUT_MS = 5 * 1000; // 5 seconds
+  public static final int DELETE_REQUEST_SOCKET_TIMEOUT_MS = 10 * 1000; // 10 seconds
 
   private static final String HTTP = "http";
   private static final String HTTPS = "https";
   private static final String SCHEMA_PATH = "/schemas";
   private static final String OLD_SEGMENT_PATH = "/segments";
   private static final String SEGMENT_PATH = "/v2/segments";
+  private static final String DELETE_SEGMENT_PATH = "/segments";
   private static final String SEGMENT_METADATA_PATH = "/segmentmetadata";
   private static final String TABLES_PATH = "/tables";
   private static final String TYPE_DELIMITER = "?type=";
@@ -127,6 +131,16 @@ public class FileUploadDownloadClient implements Closeable {
     return getURI(HTTP, host, port, TABLES_PATH + "/" + rawTableName);
   }
 
+  public static URI getDeleteSegmentHttpUri(String host, int port, String rawTableName, String segmentName,
+      String tableType)
+      throws URISyntaxException, UnsupportedEncodingException {
+    return getURI(HTTP, host, port, DELETE_SEGMENT_PATH + "/" + rawTableName + "/" + URLEncoder.encode(segmentName, "UTF-8") + "?type=" + tableType);
+  }
+
+  public static URI getRetrieveAllSegmentWithTableTypeHttpUri(String host, int port, String rawTableName, String tableType) throws URISyntaxException {
+    return getURI(HTTP, host, port, OLD_SEGMENT_PATH + "/" + rawTableName + "?type=" + tableType);
+  }
+
   public static URI getRetrieveSchemaHttpURI(String host, int port, String schemaName)
       throws URISyntaxException {
     return getURI(HTTP, host, port, SCHEMA_PATH + "/" + schemaName);
@@ -196,12 +210,32 @@ public class FileUploadDownloadClient implements Closeable {
     return requestBuilder.build();
   }
 
+  private static HttpUriRequest getDeleteFileRequest(String method, URI uri, ContentBody contentBody,
+      @Nullable List<Header> headers, @Nullable List<NameValuePair> parameters, int socketTimeoutMs) {
+    // Build the Http entity
+    HttpEntity entity = MultipartEntityBuilder.create().setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
+        .addPart(contentBody.getFilename(), contentBody).build();
+
+    // Build the request
+    RequestBuilder requestBuilder =
+        RequestBuilder.create(method).setVersion(HttpVersion.HTTP_1_1).setUri(uri).setEntity(entity);
+    addHeadersAndParameters(requestBuilder, headers, parameters);
+    setTimeout(requestBuilder, socketTimeoutMs);
+    return requestBuilder.build();
+  }
+
   private static HttpUriRequest constructGetRequest(URI uri) {
     RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
     setTimeout(requestBuilder, GET_REQUEST_SOCKET_TIMEOUT_MS);
     return requestBuilder.build();
   }
 
+  private static HttpUriRequest constructDeleteRequest(URI uri) {
+    RequestBuilder requestBuilder = RequestBuilder.delete(uri).setVersion(HttpVersion.HTTP_1_1);
+    setTimeout(requestBuilder, DELETE_REQUEST_SOCKET_TIMEOUT_MS);
+    return requestBuilder.build();
+  }
+
   private static HttpUriRequest getAddSchemaRequest(URI uri, String schemaName, File schemaFile) {
     return getUploadFileRequest(HttpPost.METHOD_NAME, uri, getContentBody(schemaName, schemaFile), null, null,
         DEFAULT_SOCKET_TIMEOUT_MS);
@@ -355,6 +389,11 @@ public class FileUploadDownloadClient implements Closeable {
     return sendRequest(constructGetRequest(uri));
   }
 
+  public SimpleHttpResponse sendDeleteRequest(URI uri)
+      throws IOException, HttpErrorStatusException {
+    return sendRequest(constructDeleteRequest(uri));
+  }
+
   /**
    * Add schema.
    *
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
index 319b55b..79f00d5 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
@@ -35,4 +35,13 @@ public interface ControllerRestApi extends Closeable {
   void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths);
 
   void sendSegmentUris(List<String> segmentUris);
+
+  /**
+   * Delete extra segments after push during REFRESH use cases. Also used in APPEND use cases where
+   * a day that has been re-pushed has extra segments.
+   * @param segmentUris
+   */
+  void deleteExtraSegmentUris(List<String> segmentUris);
+
+  List<String> getAllSegments(String tableType);
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
index 089ea6e..e60dbfe 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -135,8 +136,64 @@ public class DefaultControllerRestApi implements ControllerRestApi {
   }
 
   @Override
+  public void deleteExtraSegmentUris(List<String> segmentUris) {
+    LOGGER.info("Start deleting segment URIs: {} to locations: {}", segmentUris, _pushLocations);
+    for (String segmentUri : segmentUris) {
+      for (PushLocation pushLocation : _pushLocations) {
+        LOGGER.info("Sending deleting segment URI: {} to location: {}", segmentUri, pushLocation);
+        try {
+          SimpleHttpResponse response = _fileUploadDownloadClient.sendDeleteRequest(
+              FileUploadDownloadClient.getDeleteSegmentHttpUri(pushLocation.getHost(), pushLocation.getPort(), _rawTableName,
+              segmentUri, "OFFLINE"));
+          LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while deleting segment URI: {} to location: {}", segmentUri, pushLocation, e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public List<String> getAllSegments(String tableType) {
+    LOGGER.info("Getting all segments");
+    for (PushLocation pushLocation : _pushLocations) {
+      try {
+        SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(
+            FileUploadDownloadClient.getRetrieveAllSegmentWithTableTypeHttpUri(pushLocation.getHost(), pushLocation.getPort(),
+                _rawTableName, tableType));
+        JsonNode segmentList = getSegmentsFromJsonSegmentAPI(response.getResponse(), tableType);
+        return segmentList.findValuesAsText(tableType);
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while getting all {} segments for table: {} from push location: {}", tableType, _rawTableName,
+            pushLocation, e);
+      }
+    }
+    String errorMessage =
+        String.format("Failed to get a list of all segments from push locations: %s for table: %s", _pushLocations,
+            _rawTableName);
+    LOGGER.error(errorMessage);
+    throw new RuntimeException(errorMessage);
+
+  }
+
+  @Override
   public void close()
       throws IOException {
     _fileUploadDownloadClient.close();
   }
+
+  private JsonNode getSegmentsFromJsonSegmentAPI(String json, String type)
+      throws Exception {
+    return JsonUtils.stringToJsonNode(json).get(0).get(type);
+  }
+
+  public static void main (String[] args) {
+    List<PushLocation> pushLocations = new ArrayList<>();
+    PushLocation pushLocation = new PushLocation("lva1-app011", 11984);
+    pushLocations.add(pushLocation);
+
+    DefaultControllerRestApi defaultControllerRestApi = new DefaultControllerRestApi(pushLocations, "myTable");
+    defaultControllerRestApi.getAllSegments("OFFLINE");
+  }
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
index 7c71fd8..e8d650e 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
@@ -31,6 +31,8 @@ public class SegmentTarPushJob extends BaseSegmentJob {
   private final Path _segmentPattern;
   private final List<PushLocation> _pushLocations;
 
+  private static final String TABLE_TYPE = "OFFLINE";
+
   public SegmentTarPushJob(Properties properties) {
     super(properties);
     _segmentPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT));
@@ -48,7 +50,12 @@ public class SegmentTarPushJob extends BaseSegmentJob {
       throws Exception {
     FileSystem fileSystem = FileSystem.get(_conf);
     try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+      List<String> segmentsBeforePush = controllerRestApi.getAllSegments(TABLE_TYPE);
       controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern));
+      List<String> segmentsAfterPush = controllerRestApi.getAllSegments(TABLE_TYPE);
+
+      segmentsBeforePush.removeAll(segmentsAfterPush);
+      controllerRestApi.deleteExtraSegmentUris(segmentsBeforePush);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org