You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/06/24 20:37:26 UTC

[incubator-pinot] branch deletePush updated (d533759 -> 64a732d)

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a change to branch deletePush
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


 discard d533759  Adding util methods for controller
     new 64a732d  Adding util methods for controller

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (d533759)
            \
             N -- N -- N   refs/heads/deletePush (64a732d)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/pinot/hadoop/job/DefaultControllerRestApi.java  | 10 ----------
 1 file changed, 10 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Adding util methods for controller

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch deletePush
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 64a732dce4cba2db4217368f9a3f5cde52d0e729
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Mon Jun 24 13:34:20 2019 -0700

    Adding util methods for controller
---
 .../common/utils/FileUploadDownloadClient.java     | 39 ++++++++++++++++++
 .../apache/pinot/hadoop/job/ControllerRestApi.java |  9 +++++
 .../pinot/hadoop/job/DefaultControllerRestApi.java | 47 ++++++++++++++++++++++
 3 files changed, 95 insertions(+)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index de8c76a..4cf3210 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -26,8 +26,10 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URLEncoder;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
@@ -89,12 +91,14 @@ public class FileUploadDownloadClient implements Closeable {
 
   public static final int DEFAULT_SOCKET_TIMEOUT_MS = 600 * 1000; // 10 minutes
   public static final int GET_REQUEST_SOCKET_TIMEOUT_MS = 5 * 1000; // 5 seconds
+  public static final int DELETE_REQUEST_SOCKET_TIMEOUT_MS = 10 * 1000; // 10 seconds
 
   private static final String HTTP = "http";
   private static final String HTTPS = "https";
   private static final String SCHEMA_PATH = "/schemas";
   private static final String OLD_SEGMENT_PATH = "/segments";
   private static final String SEGMENT_PATH = "/v2/segments";
+  private static final String DELETE_SEGMENT_PATH = "/segments";
   private static final String SEGMENT_METADATA_PATH = "/segmentmetadata";
   private static final String TABLES_PATH = "/tables";
   private static final String TYPE_DELIMITER = "?type=";
@@ -127,6 +131,16 @@ public class FileUploadDownloadClient implements Closeable {
     return getURI(HTTP, host, port, TABLES_PATH + "/" + rawTableName);
   }
 
+  public static URI getDeleteSegmentHttpUri(String host, int port, String rawTableName, String segmentName,
+      String tableType)
+      throws URISyntaxException, UnsupportedEncodingException {
+    return getURI(HTTP, host, port, DELETE_SEGMENT_PATH + "/" + rawTableName + "/" + URLEncoder.encode(segmentName, "UTF-8") + "?type=" + tableType);
+  }
+
+  public static URI getRetrieveAllSegmentWithTableTypeHttpUri(String host, int port, String rawTableName, String tableType) throws URISyntaxException {
+    return getURI(HTTP, host, port, OLD_SEGMENT_PATH + "/" + rawTableName + "?type=" + tableType);
+  }
+
   public static URI getRetrieveSchemaHttpURI(String host, int port, String schemaName)
       throws URISyntaxException {
     return getURI(HTTP, host, port, SCHEMA_PATH + "/" + schemaName);
@@ -196,12 +210,32 @@ public class FileUploadDownloadClient implements Closeable {
     return requestBuilder.build();
   }
 
+  private static HttpUriRequest getDeleteFileRequest(String method, URI uri, ContentBody contentBody,
+      @Nullable List<Header> headers, @Nullable List<NameValuePair> parameters, int socketTimeoutMs) {
+    // Build the Http entity
+    HttpEntity entity = MultipartEntityBuilder.create().setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
+        .addPart(contentBody.getFilename(), contentBody).build();
+
+    // Build the request
+    RequestBuilder requestBuilder =
+        RequestBuilder.create(method).setVersion(HttpVersion.HTTP_1_1).setUri(uri).setEntity(entity);
+    addHeadersAndParameters(requestBuilder, headers, parameters);
+    setTimeout(requestBuilder, socketTimeoutMs);
+    return requestBuilder.build();
+  }
+
   private static HttpUriRequest constructGetRequest(URI uri) {
     RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
     setTimeout(requestBuilder, GET_REQUEST_SOCKET_TIMEOUT_MS);
     return requestBuilder.build();
   }
 
+  private static HttpUriRequest constructDeleteRequest(URI uri) {
+    RequestBuilder requestBuilder = RequestBuilder.delete(uri).setVersion(HttpVersion.HTTP_1_1);
+    setTimeout(requestBuilder, DELETE_REQUEST_SOCKET_TIMEOUT_MS);
+    return requestBuilder.build();
+  }
+
   private static HttpUriRequest getAddSchemaRequest(URI uri, String schemaName, File schemaFile) {
     return getUploadFileRequest(HttpPost.METHOD_NAME, uri, getContentBody(schemaName, schemaFile), null, null,
         DEFAULT_SOCKET_TIMEOUT_MS);
@@ -355,6 +389,11 @@ public class FileUploadDownloadClient implements Closeable {
     return sendRequest(constructGetRequest(uri));
   }
 
+  public SimpleHttpResponse sendDeleteRequest(URI uri)
+      throws IOException, HttpErrorStatusException {
+    return sendRequest(constructDeleteRequest(uri));
+  }
+
   /**
    * Add schema.
    *
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
index 319b55b..79f00d5 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
@@ -35,4 +35,13 @@ public interface ControllerRestApi extends Closeable {
   void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths);
 
   void sendSegmentUris(List<String> segmentUris);
+
+  /**
+   * Delete extra segments after push during REFRESH use cases. Also used in APPEND use cases where
+   * a day that has been re-pushed has extra segments.
+   * @param segmentUris
+   */
+  void deleteExtraSegmentUris(List<String> segmentUris);
+
+  List<String> getAllSegments(String tableType);
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
index 089ea6e..aec06eb 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
@@ -135,8 +135,55 @@ public class DefaultControllerRestApi implements ControllerRestApi {
   }
 
   @Override
+  public void deleteExtraSegmentUris(List<String> segmentUris) {
+    LOGGER.info("Start deleting segment URIs: {} to locations: {}", segmentUris, _pushLocations);
+    for (String segmentUri : segmentUris) {
+      for (PushLocation pushLocation : _pushLocations) {
+        LOGGER.info("Sending deleting segment URI: {} to location: {}", segmentUri, pushLocation);
+        try {
+          SimpleHttpResponse response = _fileUploadDownloadClient.sendDeleteRequest(
+              FileUploadDownloadClient.getDeleteSegmentHttpUri(pushLocation.getHost(), pushLocation.getPort(), _rawTableName,
+              segmentUri, "OFFLINE"));
+          LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while deleting segment URI: {} to location: {}", segmentUri, pushLocation, e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public List<String> getAllSegments(String tableType) {
+    LOGGER.info("Getting all segments");
+    for (PushLocation pushLocation : _pushLocations) {
+      try {
+        SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(
+            FileUploadDownloadClient.getRetrieveAllSegmentWithTableTypeHttpUri(pushLocation.getHost(), pushLocation.getPort(),
+                _rawTableName, tableType));
+        JsonNode segmentList = getSegmentsFromJsonSegmentAPI(response.getResponse(), tableType);
+        return segmentList.findValuesAsText(tableType);
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while getting all {} segments for table: {} from push location: {}", tableType, _rawTableName,
+            pushLocation, e);
+      }
+    }
+    String errorMessage =
+        String.format("Failed to get a list of all segments from push locations: %s for table: %s", _pushLocations,
+            _rawTableName);
+    LOGGER.error(errorMessage);
+    throw new RuntimeException(errorMessage);
+
+  }
+
+  @Override
   public void close()
       throws IOException {
     _fileUploadDownloadClient.close();
   }
+
+  private JsonNode getSegmentsFromJsonSegmentAPI(String json, String type)
+      throws Exception {
+    return JsonUtils.stringToJsonNode(json).get(0).get(type);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org