You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2019/02/26 15:27:01 UTC

[GitHub] chenboat commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.

chenboat commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r260338299
 
 

 ##########
 File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##########
 @@ -276,6 +280,79 @@ public String segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE
     return response;
   }
 
+  @POST
+  @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT_END_METADATA)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  public String segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String instanceId,
+                                             @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
+                                             @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String segmentLocation,
+                                             @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+                                             @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long memoryUsedBytes,
+                                             @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long buildTimeMillis,
+                                             @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long waitTimeMillis,
+                                             @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
+                                             @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long segmentSizeBytes,
+                                             FormDataMultiPart metadataFiles) {
+    if (instanceId == null || segmentName == null || offset == -1 || segmentLocation == null || metadataFiles == null) {
+      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, segmentLocation={}", offset, segmentName,
+              instanceId, segmentLocation);
+      // TODO: memoryUsedInBytes = 0 if not present in params. Add validation when we start using it
+      return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+    }
+
+    SegmentMetadataImpl segmentMetadata = extractSegmentMetadata(metadataFiles, segmentName);
+
+    SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params();
+    requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
+            .withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
+            .withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
+            .withMemoryUsedBytes(memoryUsedBytes).withSegmentMetadata(segmentMetadata);
+    LOGGER.info("Processing segmentCommitEnd:{}", requestParams.toString());
+
+
+    final boolean isSuccess = true;
+    final boolean isSplitCommit = true;
+
+    SegmentCompletionProtocol.Response response =
+            SegmentCompletionManager.getInstance().segmentCommitEnd(requestParams, isSuccess, isSplitCommit);
+    final String responseStr = response.toJsonString();
+    LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
+    return responseStr;
+  }
+
+  private SegmentMetadataImpl extractSegmentMetadata(FormDataMultiPart metadataFiles, String segmentNameStr) {
+    String tempMetadataDirStr = StringUtil.join("/", _controllerConf.getLocalTempDir(), segmentNameStr + METADATA_TEMP_DIR_SUFFIX);
+    File tempMetadataDir = new File(tempMetadataDirStr);
+
+    try {
+      Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create directory: %s", tempMetadataDirStr);
+      // Extract metadata.properties from the metadataFiles.
+      extractMetadataFromInputForm(metadataFiles, tempMetadataDirStr, V1Constants.MetadataKeys.METADATA_FILE_NAME);
+      // Extract creation.meta from the metadataFiles.
+      extractMetadataFromInputForm(metadataFiles, tempMetadataDirStr, V1Constants.SEGMENT_CREATION_META);
+      // Load segment metadata
+      return new SegmentMetadataImpl(tempMetadataDir);
+    } catch (Exception e) {
+      throw new RuntimeException("Exception extracting and reading segment metadata for " + segmentNameStr, e);
+    } finally {
+      FileUtils.deleteQuietly(tempMetadataDir);
+    }
+  }
+
+  private void extractMetadataFromInputForm(FormDataMultiPart metadataFiles, String tempMetadataDirStr,
+                                            String metaFileName) throws IOException {
+    FormDataBodyPart metadataFilesField = metadataFiles.getField(metaFileName);
+    Preconditions.checkNotNull(metadataFilesField, "The metadata input field %s does not exist.",
+            metaFileName);
+    InputStream metadataPropertiesInputStream = metadataFilesField.getValueAs(InputStream.class);
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org