You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/09/03 07:37:31 UTC

[GitHub] [incubator-pinot] fx19880617 opened a new pull request #5967: Adding push job type of segment metadata only mode

fx19880617 opened a new pull request #5967:
URL: https://github.com/apache/incubator-pinot/pull/5967


   ## Description
   Add new pinot push job types: SegmentMetadataPush and SegmentCreationAndMetadataPush
   
   This job will upload pinot segment metadata along with download uri.
   So it will bypass controller downloading segment path.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5967: Adding push job type of segment metadata only mode

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5967:
URL: https://github.com/apache/incubator-pinot/pull/5967#discussion_r496376478



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -182,7 +182,8 @@ public Response downloadSegment(
   }
 
   private SuccessResponse uploadSegment(@Nullable String tableName, FormDataMultiPart multiPart,
-      boolean enableParallelPushProtection, HttpHeaders headers, Request request, boolean moveSegmentToFinalLocation) {
+      boolean enableParallelPushProtection, HttpHeaders headers, Request request, boolean moveSegmentToFinalLocation,

Review comment:
       Should we add a new `FileUploadType`: `METADATA` instead of using this extra boolean flag?

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +187,125 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriToTarPathMap(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    LOGGER.info("Start pushing segments: {}... to locations: {} for table {}",
+        Arrays.toString(segmentUriToTarPathMap.entrySet().toArray()),
+        Arrays.toString(spec.getPinotClusterSpecs()), tableName);
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+      String fileName = new File(tarFilePath).getName();
+      Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
+      String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length());
+      File segmentMetadataFile = new File(FileUtils.getTempDirectory(), "segmentMetadataFile-" + System.nanoTime() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+      generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath), segmentMetadataFile);
+      try {
+        for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+          URI controllerURI;
+          try {
+            controllerURI = new URI(pinotClusterSpec.getControllerURI());
+          } catch (URISyntaxException e) {
+            throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'");
+          }
+          LOGGER.info("Pushing segment: {} to location: {} for table {}", segmentName, controllerURI, tableName);
+          int attempts = 1;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushAttempts() > 0) {
+            attempts = spec.getPushJobSpec().getPushAttempts();
+          }
+          long retryWaitMs = 1000L;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+            retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+          }
+          RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> {
+            try {
+              List<Header> headers = ImmutableList.of(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath));
+              // Add table name as a request parameter
+              NameValuePair tableNameValuePair =
+                  new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName);
+              List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
+              SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentMetadataURI(controllerURI),
+                  segmentName, segmentMetadataFile, headers, parameters, FILE_UPLOAD_DOWNLOAD_CLIENT.DEFAULT_SOCKET_TIMEOUT_MS);
+              LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName,
+                  controllerURI, response.getStatusCode(), response.getResponse());
+              return true;
+            } catch (HttpErrorStatusException e) {
+              int statusCode = e.getStatusCode();
+              if (statusCode >= 500) {
+                // Temporary exception
+                LOGGER
+                    .warn("Caught temporary exception while pushing table: {} segment: {} to {}, will retry", tableName,
+                        segmentName, controllerURI, e);
+                return false;
+              } else {
+                // Permanent exception
+                LOGGER.error("Caught permanent exception while pushing table: {} segment: {} to {}, won't retry",
+                    tableName, segmentName, controllerURI, e);
+                throw e;
+              }
+            }
+          });
+        }
+      } finally {
+        FileUtils.deleteQuietly(segmentMetadataFile);
+      }
+    }
+  }
+
+  public static Map<String, String> getSegmentUriToTarPathMap(URI outputDirURI, String uriPrefix, String uriSuffix, String[] files) {
+    Map<String, String> segmentUriToTarPathMap = new HashMap<>();
+    for (String file : files) {
+      URI uri = URI.create(file);
+      if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
+        URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputDirURI, uri, uriPrefix,uriSuffix);
+        segmentUriToTarPathMap.put(updatedURI.toString(), file);
+      }
+    }
+    return segmentUriToTarPathMap;
+  }
+
+  /**
+   * Generate a segment metadata only tar file, which contains only metadata.properties and creation.meta file.
+   * The purpose of this is to create a lean tar to push to Pinot controller for adding segments without downloading
+   * the complete segment and untar the segment tarball.
+   *
+   * 1. Download segment tar file to temp dir;
+   * 2. Extract only metadata.properties and creation.meta files from the segment tar file;
+   * 3. Tar both files into a segment metadata file.
+   *
+   */
+  private static boolean generateSegmentMetadataFile(PinotFS fileSystem, URI tarFileURI, File segmentMetadataTarFile)

Review comment:
       Returns the generated file path instead of passing in the file path?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -454,7 +465,41 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
       @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) boolean enableParallelPushProtection,
       @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) {
     try {
-      asyncResponse.resume(uploadSegment(tableName, multiPart, enableParallelPushProtection, headers, request, true));
+      asyncResponse.resume(uploadSegment(tableName, multiPart, enableParallelPushProtection, headers, request, true, false));
+    } catch (Throwable t) {
+      asyncResponse.resume(t);
+    }
+  }
+
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Path("/segmentmetadata")

Review comment:
       Suggest reusing the existing APIs and use headers to differentiate segment/metadata upload.
   Using `POST /segmentmetadata` to upload segments seems counter-intuitive to me.

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +187,125 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriToTarPathMap(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    LOGGER.info("Start pushing segments: {}... to locations: {} for table {}",
+        Arrays.toString(segmentUriToTarPathMap.entrySet().toArray()),
+        Arrays.toString(spec.getPinotClusterSpecs()), tableName);
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+      String fileName = new File(tarFilePath).getName();
+      Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
+      String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length());
+      File segmentMetadataFile = new File(FileUtils.getTempDirectory(), "segmentMetadataFile-" + System.nanoTime() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+      generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath), segmentMetadataFile);
+      try {
+        for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+          URI controllerURI;
+          try {
+            controllerURI = new URI(pinotClusterSpec.getControllerURI());
+          } catch (URISyntaxException e) {
+            throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'");
+          }
+          LOGGER.info("Pushing segment: {} to location: {} for table {}", segmentName, controllerURI, tableName);
+          int attempts = 1;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushAttempts() > 0) {
+            attempts = spec.getPushJobSpec().getPushAttempts();
+          }
+          long retryWaitMs = 1000L;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+            retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+          }
+          RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> {
+            try {
+              List<Header> headers = ImmutableList.of(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath));
+              // Add table name as a request parameter
+              NameValuePair tableNameValuePair =
+                  new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName);
+              List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
+              SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentMetadataURI(controllerURI),
+                  segmentName, segmentMetadataFile, headers, parameters, FILE_UPLOAD_DOWNLOAD_CLIENT.DEFAULT_SOCKET_TIMEOUT_MS);
+              LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName,
+                  controllerURI, response.getStatusCode(), response.getResponse());
+              return true;
+            } catch (HttpErrorStatusException e) {
+              int statusCode = e.getStatusCode();
+              if (statusCode >= 500) {
+                // Temporary exception
+                LOGGER
+                    .warn("Caught temporary exception while pushing table: {} segment: {} to {}, will retry", tableName,
+                        segmentName, controllerURI, e);
+                return false;
+              } else {
+                // Permanent exception
+                LOGGER.error("Caught permanent exception while pushing table: {} segment: {} to {}, won't retry",
+                    tableName, segmentName, controllerURI, e);
+                throw e;
+              }
+            }
+          });
+        }
+      } finally {
+        FileUtils.deleteQuietly(segmentMetadataFile);
+      }
+    }
+  }
+
+  public static Map<String, String> getSegmentUriToTarPathMap(URI outputDirURI, String uriPrefix, String uriSuffix, String[] files) {
+    Map<String, String> segmentUriToTarPathMap = new HashMap<>();
+    for (String file : files) {
+      URI uri = URI.create(file);
+      if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
+        URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputDirURI, uri, uriPrefix,uriSuffix);
+        segmentUriToTarPathMap.put(updatedURI.toString(), file);
+      }
+    }
+    return segmentUriToTarPathMap;
+  }
+
+  /**
+   * Generate a segment metadata only tar file, which contains only metadata.properties and creation.meta file.
+   * The purpose of this is to create a lean tar to push to Pinot controller for adding segments without downloading
+   * the complete segment and untar the segment tarball.
+   *
+   * 1. Download segment tar file to temp dir;
+   * 2. Extract only metadata.properties and creation.meta files from the segment tar file;
+   * 3. Tar both files into a segment metadata file.
+   *
+   */
+  private static boolean generateSegmentMetadataFile(PinotFS fileSystem, URI tarFileURI, File segmentMetadataTarFile)
+      throws Exception {
+    long currentTime = System.nanoTime();
+    File tarFile = new File(FileUtils.getTempDirectory(), "segmentTar-" + currentTime + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION).getAbsoluteFile();
+    File segmentMetadataDir = new File(FileUtils.getTempDirectory(), "segmentMetadataDir-" + currentTime).getAbsoluteFile();

Review comment:
       ```suggestion
       File tarFile = new File(FileUtils.getTempDirectory(), "segmentTar-" + currentTime + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
       File segmentMetadataDir = new File(FileUtils.getTempDirectory(), "segmentMetadataDir-" + currentTime);
   ```

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +187,125 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriToTarPathMap(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)

Review comment:
       Rename this to reflect it is sending the segment metadata instead of the segments?

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +187,125 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriToTarPathMap(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    LOGGER.info("Start pushing segments: {}... to locations: {} for table {}",

Review comment:
       ```suggestion
       LOGGER.info("Start pushing segment metadata: {} to locations: {} for table {}",
   ```

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +187,125 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriToTarPathMap(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    LOGGER.info("Start pushing segments: {}... to locations: {} for table {}",
+        Arrays.toString(segmentUriToTarPathMap.entrySet().toArray()),

Review comment:
       ```suggestion
           segmentUriToTarPathMap,
   ```

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +187,125 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriToTarPathMap(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    LOGGER.info("Start pushing segments: {}... to locations: {} for table {}",
+        Arrays.toString(segmentUriToTarPathMap.entrySet().toArray()),
+        Arrays.toString(spec.getPinotClusterSpecs()), tableName);
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+      String fileName = new File(tarFilePath).getName();
+      Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
+      String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length());
+      File segmentMetadataFile = new File(FileUtils.getTempDirectory(), "segmentMetadataFile-" + System.nanoTime() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+      generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath), segmentMetadataFile);
+      try {
+        for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+          URI controllerURI;
+          try {
+            controllerURI = new URI(pinotClusterSpec.getControllerURI());
+          } catch (URISyntaxException e) {
+            throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'");
+          }
+          LOGGER.info("Pushing segment: {} to location: {} for table {}", segmentName, controllerURI, tableName);
+          int attempts = 1;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushAttempts() > 0) {
+            attempts = spec.getPushJobSpec().getPushAttempts();
+          }
+          long retryWaitMs = 1000L;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+            retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+          }
+          RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> {
+            try {
+              List<Header> headers = ImmutableList.of(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath));
+              // Add table name as a request parameter
+              NameValuePair tableNameValuePair =
+                  new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName);
+              List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
+              SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentMetadataURI(controllerURI),
+                  segmentName, segmentMetadataFile, headers, parameters, FILE_UPLOAD_DOWNLOAD_CLIENT.DEFAULT_SOCKET_TIMEOUT_MS);
+              LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName,
+                  controllerURI, response.getStatusCode(), response.getResponse());
+              return true;
+            } catch (HttpErrorStatusException e) {
+              int statusCode = e.getStatusCode();
+              if (statusCode >= 500) {
+                // Temporary exception
+                LOGGER
+                    .warn("Caught temporary exception while pushing table: {} segment: {} to {}, will retry", tableName,
+                        segmentName, controllerURI, e);
+                return false;
+              } else {
+                // Permanent exception
+                LOGGER.error("Caught permanent exception while pushing table: {} segment: {} to {}, won't retry",
+                    tableName, segmentName, controllerURI, e);
+                throw e;
+              }
+            }
+          });
+        }
+      } finally {
+        FileUtils.deleteQuietly(segmentMetadataFile);
+      }
+    }
+  }
+
+  public static Map<String, String> getSegmentUriToTarPathMap(URI outputDirURI, String uriPrefix, String uriSuffix, String[] files) {
+    Map<String, String> segmentUriToTarPathMap = new HashMap<>();
+    for (String file : files) {
+      URI uri = URI.create(file);
+      if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
+        URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputDirURI, uri, uriPrefix,uriSuffix);
+        segmentUriToTarPathMap.put(updatedURI.toString(), file);
+      }
+    }
+    return segmentUriToTarPathMap;
+  }
+
+  /**
+   * Generate a segment metadata only tar file, which contains only metadata.properties and creation.meta file.
+   * The purpose of this is to create a lean tar to push to Pinot controller for adding segments without downloading
+   * the complete segment and untar the segment tarball.
+   *
+   * 1. Download segment tar file to temp dir;
+   * 2. Extract only metadata.properties and creation.meta files from the segment tar file;
+   * 3. Tar both files into a segment metadata file.
+   *
+   */
+  private static boolean generateSegmentMetadataFile(PinotFS fileSystem, URI tarFileURI, File segmentMetadataTarFile)
+      throws Exception {
+    long currentTime = System.nanoTime();

Review comment:
       Suggest using UUID




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #5967: Adding push job type of segment metadata only mode

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #5967:
URL: https://github.com/apache/incubator-pinot/pull/5967#discussion_r496974735



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -460,6 +465,40 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
     }
   }
 
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Path("/segments/metadata")
+  @ApiOperation(value = "Upload a segment with metadata", notes = "Upload a segment using segment metadata")
+  public void uploadSegmentMetadataAsJson(String segmentJsonStr,
+      @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String tableName,
+      @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) boolean enableParallelPushProtection,
+      @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) {
+    try {
+      asyncResponse.resume(uploadSegment(tableName, null, enableParallelPushProtection, headers, request, false));
+    } catch (Throwable t) {
+      asyncResponse.resume(t);
+    }
+  }
+
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  @Path("/segments/metadata")
+  @ApiOperation(value = "Upload a segment with metadata", notes = "Upload a segment using segment metadata")
+  public void uploadSegmentMetadataAsMultiPart(FormDataMultiPart multiPart,
+      @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String tableName,
+      @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) boolean enableParallelPushProtection,
+      @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) {
+    try {
+      asyncResponse.resume(uploadSegment(tableName, multiPart, enableParallelPushProtection, headers, request, false));
+    } catch (Throwable t) {
+      asyncResponse.resume(t);
+    }
+  }
+

Review comment:
       make sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #5967: Adding push job type of segment metadata only mode

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #5967:
URL: https://github.com/apache/incubator-pinot/pull/5967#discussion_r496973089



##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +188,127 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    LOGGER.info("Start pushing segment metadata: {}... to locations: {} for table {}",

Review comment:
       Ah, I think the purpose of this log is to only put at most say 5 segments to avoid flushing the log. 
   But I feel it should be ok to list all the segments.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] yupeng9 commented on pull request #5967: Adding push job type of segment metadata only mode

Posted by GitBox <gi...@apache.org>.
yupeng9 commented on pull request #5967:
URL: https://github.com/apache/incubator-pinot/pull/5967#issuecomment-686816556


   Curious, why is this mode needed? Could this lead to inconsistency between metadata and segments?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on pull request #5967: Adding push job type of segment metadata only mode

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on pull request #5967:
URL: https://github.com/apache/incubator-pinot/pull/5967#issuecomment-686824951


   > Curious, why is this mode needed? Could this lead to inconsistency between metadata and segments?
   
   This mode should be used in caution. 
   We observed for some users, it takes 10 hours to push 20 TB segments to controller to download all segments, load metadata and add segments.
   This is mostly useful for data bootstrapping to reduce total push time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 merged pull request #5967: Adding push job type of segment metadata only mode

Posted by GitBox <gi...@apache.org>.
fx19880617 merged pull request #5967:
URL: https://github.com/apache/incubator-pinot/pull/5967


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #5967: Adding push job type of segment metadata only mode

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #5967:
URL: https://github.com/apache/incubator-pinot/pull/5967#discussion_r496974974



##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +188,127 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    LOGGER.info("Start pushing segment metadata: {}... to locations: {} for table {}",
+        segmentUriToTarPathMap,
+        Arrays.toString(spec.getPinotClusterSpecs()), tableName);
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+      String fileName = new File(tarFilePath).getName();
+      Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
+      String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length());
+      File segmentMetadataFile = generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath));
+      try {
+        for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+          URI controllerURI;
+          try {
+            controllerURI = new URI(pinotClusterSpec.getControllerURI());
+          } catch (URISyntaxException e) {
+            throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'");
+          }
+          LOGGER.info("Pushing segment: {} to location: {} for table {}", segmentName, controllerURI, tableName);
+          int attempts = 1;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushAttempts() > 0) {
+            attempts = spec.getPushJobSpec().getPushAttempts();
+          }
+          long retryWaitMs = 1000L;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+            retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+          }
+          RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> {
+            try {
+              List<Header> headers = ImmutableList.of(
+                  new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath),
+                  new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.URI.toString()));

Review comment:
       good catch




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] yupeng9 commented on pull request #5967: Adding push job type of segment metadata only mode

Posted by GitBox <gi...@apache.org>.
yupeng9 commented on pull request #5967:
URL: https://github.com/apache/incubator-pinot/pull/5967#issuecomment-687256508


   > > Curious, why is this mode needed? Could this lead to inconsistency between metadata and segments?
   > 
   > This mode should be used in caution.
   > We observed for some users, it takes 10 hours to push 20 TB segments to controller to download all segments, load metadata and add segments.
   > This is mostly useful for data bootstrapping to reduce total push time.
   
   I see. I can see this can be a useful tool for issue mitigation. It'll be good to add some description of the scenarios that it targets in the PR description.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #5967: Adding push job type of segment metadata only mode

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #5967:
URL: https://github.com/apache/incubator-pinot/pull/5967#discussion_r496906820



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -182,7 +182,8 @@ public Response downloadSegment(
   }
 
   private SuccessResponse uploadSegment(@Nullable String tableName, FormDataMultiPart multiPart,
-      boolean enableParallelPushProtection, HttpHeaders headers, Request request, boolean moveSegmentToFinalLocation) {
+      boolean enableParallelPushProtection, HttpHeaders headers, Request request, boolean moveSegmentToFinalLocation,

Review comment:
       added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5967: Adding push job type of segment metadata only mode

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5967:
URL: https://github.com/apache/incubator-pinot/pull/5967#discussion_r496966149



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -460,6 +465,40 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
     }
   }
 
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Path("/segments/metadata")
+  @ApiOperation(value = "Upload a segment with metadata", notes = "Upload a segment using segment metadata")
+  public void uploadSegmentMetadataAsJson(String segmentJsonStr,
+      @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String tableName,
+      @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) boolean enableParallelPushProtection,
+      @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) {
+    try {
+      asyncResponse.resume(uploadSegment(tableName, null, enableParallelPushProtection, headers, request, false));
+    } catch (Throwable t) {
+      asyncResponse.resume(t);
+    }
+  }
+
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  @Path("/segments/metadata")
+  @ApiOperation(value = "Upload a segment with metadata", notes = "Upload a segment using segment metadata")
+  public void uploadSegmentMetadataAsMultiPart(FormDataMultiPart multiPart,
+      @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String tableName,
+      @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) boolean enableParallelPushProtection,
+      @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) {
+    try {
+      asyncResponse.resume(uploadSegment(tableName, multiPart, enableParallelPushProtection, headers, request, false));
+    } catch (Throwable t) {
+      asyncResponse.resume(t);
+    }
+  }
+

Review comment:
       I don't think we need to add these 2 new APIs, we can still use the existing segment upload API but with metadata upload in the header. I feel it is better to use the same path for segment upload. 

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +188,127 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)

Review comment:
       Add some javadoc?

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +188,127 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    LOGGER.info("Start pushing segment metadata: {}... to locations: {} for table {}",
+        segmentUriToTarPathMap,
+        Arrays.toString(spec.getPinotClusterSpecs()), tableName);
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+      String fileName = new File(tarFilePath).getName();
+      Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
+      String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length());
+      File segmentMetadataFile = generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath));
+      try {
+        for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+          URI controllerURI;
+          try {
+            controllerURI = new URI(pinotClusterSpec.getControllerURI());
+          } catch (URISyntaxException e) {
+            throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'");
+          }
+          LOGGER.info("Pushing segment: {} to location: {} for table {}", segmentName, controllerURI, tableName);
+          int attempts = 1;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushAttempts() > 0) {
+            attempts = spec.getPushJobSpec().getPushAttempts();
+          }
+          long retryWaitMs = 1000L;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+            retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+          }
+          RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> {
+            try {
+              List<Header> headers = ImmutableList.of(
+                  new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath),
+                  new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.URI.toString()));

Review comment:
       (Critical) Shouldn't this be `METADATA`?

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +188,127 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    LOGGER.info("Start pushing segment metadata: {}... to locations: {} for table {}",

Review comment:
       (nit) Why having the `...` here? We are not omitting any segment here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org