You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/03/15 18:05:42 UTC

[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6653: add uploadLLCSegment endpoint in TableResource

Jackie-Jiang commented on a change in pull request #6653:
URL: https://github.com/apache/incubator-pinot/pull/6653#discussion_r594564224



##########
File path: pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
##########
@@ -244,6 +243,62 @@ public Response downloadSegment(
     }
   }
 
+  // Upload a low level consumer segment to segment store and return the segment download url
+  @POST
+  @Path("/segments/{realtimeTableName}/{segmentName}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Upload a low level consumer segment to segment store and return the segment download url", notes = "Upload a low level consumer segment to segment store and return the segment download url")
+  @ApiResponses(value = {@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error", response = ErrorInfo.class), @ApiResponse(code = 404, message = "Table or segment not found", response = ErrorInfo.class), @ApiResponse(code = 400, message = "Bad request", response = ErrorInfo.class)})
+  public String uploadLLCSegment(
+          @ApiParam(value = "Name of the REALTIME table", required = true) @PathParam("realtimeTableName") String realtimeTableName,
+          @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") String segmentName) throws Exception {
+    LOGGER.info("Received a request to upload low level consumer segment {} for table {}", segmentName, realtimeTableName);
+
+    // Check it's realtime table
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+    if (TableType.OFFLINE == tableType) {
+      throw new WebApplicationException(
+              String.format("Cannot upload low level consumer segment for OFFLINE table: %s", realtimeTableName),
+              Response.Status.BAD_REQUEST);
+    }
+
+    // Check the segment is low level consumer segment
+    if (!LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
+      throw new WebApplicationException(
+              String.format("Segment %s is not a low level consumer segment", segmentName),
+              Response.Status.BAD_REQUEST);
+    }
+
+    String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName);
+    TableDataManager tableDataManager = checkGetTableDataManager(tableNameWithType);
+    SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
+    if (segmentDataManager == null) {
+      throw new WebApplicationException(
+              String.format("Table %s segment %s does not exist", realtimeTableName, segmentName),
+              Response.Status.NOT_FOUND);
+    }
+
+    File segmentTarFile = null;
+    try {
+      // Create the tar.gz segment file in the server's segmentTarUploadDir folder with a unique file name.
+      File segmentTarUploadDir =
+              new File(_serverInstance.getInstanceDataManager().getSegmentFileDirectory(), SEGMENT_UPLOAD_DIR);
+      segmentTarUploadDir.mkdir();
+
+      segmentTarFile = new File(segmentTarUploadDir, tableNameWithType + "_" + segmentName + "_" + UUID.randomUUID()
+              + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+      TarGzCompressionUtils.createTarGzFile(new File(tableDataManager.getTableDataDir(), segmentName), segmentTarFile);
+
+      // Use segment uploader to upload the segment tar file to segment store and return the segment download url.
+      SegmentUploader segmentUploader = _serverInstance.getInstanceDataManager().getSegmentUploader();
+      URI segmentDownloadUrl = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(segmentName));
+      return segmentDownloadUrl == null ? Strings.EMPTY : segmentDownloadUrl.getPath();

Review comment:
       +1

##########
File path: pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
##########
@@ -50,6 +50,8 @@
   public static final String INSTANCE_SEGMENT_TAR_DIR = "segmentTarDir";
   // Key of segment directory
   public static final String INSTANCE_BOOTSTRAP_SEGMENT_DIR = "bootstrap.segment.dir";
+  // Key of segment store uri
+  public static final String SEGMENT_STORE_URI = "segment.store.uri";

Review comment:
       +1

##########
File path: pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
##########
@@ -24,36 +24,34 @@
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import java.io.File;
+import java.net.URI;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import javax.inject.Inject;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.Encoded;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.*;

Review comment:
       (Code style) Please use the `Pinot Style`: https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#setup-ide

##########
File path: pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
##########
@@ -24,36 +24,34 @@
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import java.io.File;
+import java.net.URI;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import javax.inject.Inject;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.Encoded;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.*;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
+
+import joptsimple.internal.Strings;

Review comment:
       Don't import this random library




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org