You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/06/26 18:08:40 UTC

[incubator-pinot] branch master updated: Adding util methods for controller that will be used in offline push (#4360)

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 866e29a  Adding util methods for controller that will be used in offline push (#4360)
866e29a is described below

commit 866e29a0cd0e23ceaee19841167601e8b3b982cc
Author: Jennifer Dai <je...@users.noreply.github.com>
AuthorDate: Wed Jun 26 11:08:35 2019 -0700

    Adding util methods for controller that will be used in offline push (#4360)
    
    See issue #4353 for final design
---
 .../common/utils/FileUploadDownloadClient.java     | 41 ++++++++++++++++++
 .../apache/pinot/hadoop/job/ControllerRestApi.java |  4 ++
 .../pinot/hadoop/job/DefaultControllerRestApi.java | 50 ++++++++++++++++++++++
 .../DeleteAPIHybridClusterIntegrationTest.java     | 23 ++++++++++
 4 files changed, 118 insertions(+)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index de8c76a..d3d02cd 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -26,13 +26,16 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URLEncoder;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.http.Header;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHeaders;
@@ -89,6 +92,7 @@ public class FileUploadDownloadClient implements Closeable {
 
   public static final int DEFAULT_SOCKET_TIMEOUT_MS = 600 * 1000; // 10 minutes
   public static final int GET_REQUEST_SOCKET_TIMEOUT_MS = 5 * 1000; // 5 seconds
+  public static final int DELETE_REQUEST_SOCKET_TIMEOUT_MS = 10 * 1000; // 10 seconds
 
   private static final String HTTP = "http";
   private static final String HTTPS = "https";
@@ -127,6 +131,18 @@ public class FileUploadDownloadClient implements Closeable {
     return getURI(HTTP, host, port, TABLES_PATH + "/" + rawTableName);
   }
 
+  public static URI getDeleteSegmentHttpUri(String host, int port, String rawTableName, String segmentName,
+      String tableType)
+      throws URISyntaxException, UnsupportedEncodingException {
+    return new URI(StringUtil.join("/", StringUtils.chomp(HTTP + "://" + host + ":" + port, "/"),
+        OLD_SEGMENT_PATH, rawTableName + "/" + URLEncoder.encode(segmentName, "UTF-8") + TYPE_DELIMITER + tableType));
+  }
+
+  public static URI getRetrieveAllSegmentWithTableTypeHttpUri(String host, int port, String rawTableName, String tableType) throws URISyntaxException {
+    return new URI(StringUtil.join("/", StringUtils.chomp(HTTP + "://" + host + ":" + port, "/"),
+        OLD_SEGMENT_PATH, rawTableName + TYPE_DELIMITER + tableType));
+  }
+
   public static URI getRetrieveSchemaHttpURI(String host, int port, String schemaName)
       throws URISyntaxException {
     return getURI(HTTP, host, port, SCHEMA_PATH + "/" + schemaName);
@@ -196,12 +212,32 @@ public class FileUploadDownloadClient implements Closeable {
     return requestBuilder.build();
   }
 
+  private static HttpUriRequest getDeleteFileRequest(String method, URI uri, ContentBody contentBody,
+      @Nullable List<Header> headers, @Nullable List<NameValuePair> parameters, int socketTimeoutMs) {
+    // Build the Http entity
+    HttpEntity entity = MultipartEntityBuilder.create().setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
+        .addPart(contentBody.getFilename(), contentBody).build();
+
+    // Build the request
+    RequestBuilder requestBuilder =
+        RequestBuilder.create(method).setVersion(HttpVersion.HTTP_1_1).setUri(uri).setEntity(entity);
+    addHeadersAndParameters(requestBuilder, headers, parameters);
+    setTimeout(requestBuilder, socketTimeoutMs);
+    return requestBuilder.build();
+  }
+
   private static HttpUriRequest constructGetRequest(URI uri) {
     RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
     setTimeout(requestBuilder, GET_REQUEST_SOCKET_TIMEOUT_MS);
     return requestBuilder.build();
   }
 
+  private static HttpUriRequest constructDeleteRequest(URI uri) {
+    RequestBuilder requestBuilder = RequestBuilder.delete(uri).setVersion(HttpVersion.HTTP_1_1);
+    setTimeout(requestBuilder, DELETE_REQUEST_SOCKET_TIMEOUT_MS);
+    return requestBuilder.build();
+  }
+
   private static HttpUriRequest getAddSchemaRequest(URI uri, String schemaName, File schemaFile) {
     return getUploadFileRequest(HttpPost.METHOD_NAME, uri, getContentBody(schemaName, schemaFile), null, null,
         DEFAULT_SOCKET_TIMEOUT_MS);
@@ -355,6 +391,11 @@ public class FileUploadDownloadClient implements Closeable {
     return sendRequest(constructGetRequest(uri));
   }
 
+  public SimpleHttpResponse sendDeleteRequest(URI uri)
+      throws IOException, HttpErrorStatusException {
+    return sendRequest(constructDeleteRequest(uri));
+  }
+
   /**
    * Add schema.
    *
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
index 319b55b..ddcdcaa 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
@@ -35,4 +35,8 @@ public interface ControllerRestApi extends Closeable {
   void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths);
 
   void sendSegmentUris(List<String> segmentUris);
+
+  void deleteSegmentUris(List<String> segmentUris);
+
+  List<String> getAllSegments(String tableType);
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
index 089ea6e..92e0dee 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
@@ -19,9 +19,11 @@
 package org.apache.pinot.hadoop.job;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -135,8 +137,56 @@ public class DefaultControllerRestApi implements ControllerRestApi {
   }
 
   @Override
+  public void deleteSegmentUris(List<String> segmentUris) {
+    LOGGER.info("Start deleting segment URIs: {} to locations: {}", segmentUris, _pushLocations);
+    for (String segmentUri : segmentUris) {
+      for (PushLocation pushLocation : _pushLocations) {
+        LOGGER.info("Sending deleting segment URI: {} to location: {}", segmentUri, pushLocation);
+        try {
+          SimpleHttpResponse response = _fileUploadDownloadClient.sendDeleteRequest(
+              FileUploadDownloadClient.getDeleteSegmentHttpUri(pushLocation.getHost(), pushLocation.getPort(), _rawTableName,
+              segmentUri, "OFFLINE"));
+          LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse());
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while deleting segment URI: {} to location: {}", segmentUri, pushLocation, e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public List<String> getAllSegments(String tableType) {
+    LOGGER.info("Getting all segments of table {}", _rawTableName);
+    ObjectMapper objectMapper = new ObjectMapper();
+    for (PushLocation pushLocation : _pushLocations) {
+      try {
+        SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(
+            FileUploadDownloadClient.getRetrieveAllSegmentWithTableTypeHttpUri(pushLocation.getHost(), pushLocation.getPort(),
+                _rawTableName, tableType));
+        JsonNode segmentList = getSegmentsFromJsonSegmentAPI(response.getResponse(), tableType);
+        return objectMapper.convertValue(segmentList, ArrayList.class);
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while getting all {} segments for table: {} from push location: {}", tableType, _rawTableName,
+            pushLocation, e);
+      }
+    }
+    String errorMessage =
+        String.format("Failed to get a list of all segments from push locations: %s for table: %s", _pushLocations,
+            _rawTableName);
+    LOGGER.error(errorMessage);
+    throw new RuntimeException(errorMessage);
+
+  }
+
+  @Override
   public void close()
       throws IOException {
     _fileUploadDownloadClient.close();
   }
+
+  private JsonNode getSegmentsFromJsonSegmentAPI(String json, String tableType)
+      throws Exception {
+    return JsonUtils.stringToJsonNode(json).get(0).get(tableType);
+  }
 }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DeleteAPIHybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DeleteAPIHybridClusterIntegrationTest.java
index 173561e..7ee7077 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DeleteAPIHybridClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DeleteAPIHybridClusterIntegrationTest.java
@@ -19,13 +19,18 @@
 package org.apache.pinot.integration.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import junit.framework.Assert;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.hadoop.job.ControllerRestApi;
+import org.apache.pinot.hadoop.job.DefaultControllerRestApi;
+import org.apache.pinot.hadoop.utils.PushLocation;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -237,9 +242,27 @@ public class DeleteAPIHybridClusterIntegrationTest extends HybridClusterIntegrat
         forSegmentListAPIWithTableType(TABLE_NAME, CommonConstants.Helix.TableType.OFFLINE.toString()));
     JsonNode offlineSegmentsListReturn =
         getSegmentsFromJsonSegmentAPI(postDeleteSegmentList, CommonConstants.Helix.TableType.OFFLINE.toString());
+
+    // Get all segments
+    PushLocation pushLocation = new PushLocation("localhost", 18998);
+    List<PushLocation> pushLocations = new ArrayList<>();
+    pushLocations.add(pushLocation);
+    ControllerRestApi controllerRestApi = new DefaultControllerRestApi(pushLocations, "mytable");
+    List<String> allSegments = controllerRestApi.getAllSegments("OFFLINE");
+    Assert.assertEquals(allSegments.size(), offlineSegmentsListReturn.size());
+
     removeValue(offlineSegmentsList, removedSegment);
     Assert.assertEquals(offlineSegmentsListReturn, offlineSegmentsList);
 
+    // Test Delete one more segment
+    String segmentUri = allSegments.get(0);
+    List<String> deleteSegmentUris = new ArrayList<>();
+    deleteSegmentUris.add(segmentUri);
+    controllerRestApi.deleteSegmentUris(deleteSegmentUris);
+    allSegments.remove(0);
+    List<String> postDelete = controllerRestApi.getAllSegments("OFFLINE");
+    Assert.assertEquals(postDelete.size(), allSegments.size());
+
     // Testing Delete All API here
     sendGetRequest(_controllerRequestURLBuilder.
         forDeleteAllSegmentsWithTypeWithGetAPI(TABLE_NAME, CommonConstants.Helix.TableType.OFFLINE.toString()));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org