You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/09/29 04:39:29 UTC

[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5967: Adding push job type of segment metadata only mode

Jackie-Jiang commented on a change in pull request #5967:
URL: https://github.com/apache/incubator-pinot/pull/5967#discussion_r496376478



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -182,7 +182,8 @@ public Response downloadSegment(
   }
 
   private SuccessResponse uploadSegment(@Nullable String tableName, FormDataMultiPart multiPart,
-      boolean enableParallelPushProtection, HttpHeaders headers, Request request, boolean moveSegmentToFinalLocation) {
+      boolean enableParallelPushProtection, HttpHeaders headers, Request request, boolean moveSegmentToFinalLocation,

Review comment:
       Should we add a new `FileUploadType`: `METADATA` instead of using this extra boolean flag?

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +187,125 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriToTarPathMap(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    LOGGER.info("Start pushing segments: {}... to locations: {} for table {}",
+        Arrays.toString(segmentUriToTarPathMap.entrySet().toArray()),
+        Arrays.toString(spec.getPinotClusterSpecs()), tableName);
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+      String fileName = new File(tarFilePath).getName();
+      Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
+      String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length());
+      File segmentMetadataFile = new File(FileUtils.getTempDirectory(), "segmentMetadataFile-" + System.nanoTime() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+      generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath), segmentMetadataFile);
+      try {
+        for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+          URI controllerURI;
+          try {
+            controllerURI = new URI(pinotClusterSpec.getControllerURI());
+          } catch (URISyntaxException e) {
+            throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'");
+          }
+          LOGGER.info("Pushing segment: {} to location: {} for table {}", segmentName, controllerURI, tableName);
+          int attempts = 1;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushAttempts() > 0) {
+            attempts = spec.getPushJobSpec().getPushAttempts();
+          }
+          long retryWaitMs = 1000L;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+            retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+          }
+          RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> {
+            try {
+              List<Header> headers = ImmutableList.of(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath));
+              // Add table name as a request parameter
+              NameValuePair tableNameValuePair =
+                  new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName);
+              List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
+              SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentMetadataURI(controllerURI),
+                  segmentName, segmentMetadataFile, headers, parameters, FILE_UPLOAD_DOWNLOAD_CLIENT.DEFAULT_SOCKET_TIMEOUT_MS);
+              LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName,
+                  controllerURI, response.getStatusCode(), response.getResponse());
+              return true;
+            } catch (HttpErrorStatusException e) {
+              int statusCode = e.getStatusCode();
+              if (statusCode >= 500) {
+                // Temporary exception
+                LOGGER
+                    .warn("Caught temporary exception while pushing table: {} segment: {} to {}, will retry", tableName,
+                        segmentName, controllerURI, e);
+                return false;
+              } else {
+                // Permanent exception
+                LOGGER.error("Caught permanent exception while pushing table: {} segment: {} to {}, won't retry",
+                    tableName, segmentName, controllerURI, e);
+                throw e;
+              }
+            }
+          });
+        }
+      } finally {
+        FileUtils.deleteQuietly(segmentMetadataFile);
+      }
+    }
+  }
+
+  public static Map<String, String> getSegmentUriToTarPathMap(URI outputDirURI, String uriPrefix, String uriSuffix, String[] files) {
+    Map<String, String> segmentUriToTarPathMap = new HashMap<>();
+    for (String file : files) {
+      URI uri = URI.create(file);
+      if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
+        URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputDirURI, uri, uriPrefix,uriSuffix);
+        segmentUriToTarPathMap.put(updatedURI.toString(), file);
+      }
+    }
+    return segmentUriToTarPathMap;
+  }
+
+  /**
+   * Generate a segment metadata only tar file, which contains only metadata.properties and creation.meta file.
+   * The purpose of this is to create a lean tar to push to Pinot controller for adding segments without downloading
+   * the complete segment and untar the segment tarball.
+   *
+   * 1. Download segment tar file to temp dir;
+   * 2. Extract only metadata.properties and creation.meta files from the segment tar file;
+   * 3. Tar both files into a segment metadata file.
+   *
+   */
+  private static boolean generateSegmentMetadataFile(PinotFS fileSystem, URI tarFileURI, File segmentMetadataTarFile)

Review comment:
       Returns the generated file path instead of passing in the file path?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -454,7 +465,41 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
       @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) boolean enableParallelPushProtection,
       @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) {
     try {
-      asyncResponse.resume(uploadSegment(tableName, multiPart, enableParallelPushProtection, headers, request, true));
+      asyncResponse.resume(uploadSegment(tableName, multiPart, enableParallelPushProtection, headers, request, true, false));
+    } catch (Throwable t) {
+      asyncResponse.resume(t);
+    }
+  }
+
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Path("/segmentmetadata")

Review comment:
       Suggest reusing the existing APIs and use headers to differentiate segment/metadata upload.
   Using `POST /segmentmetadata` to upload segments seems counter-intuitive to me.

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +187,125 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriToTarPathMap(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    LOGGER.info("Start pushing segments: {}... to locations: {} for table {}",
+        Arrays.toString(segmentUriToTarPathMap.entrySet().toArray()),
+        Arrays.toString(spec.getPinotClusterSpecs()), tableName);
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+      String fileName = new File(tarFilePath).getName();
+      Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
+      String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length());
+      File segmentMetadataFile = new File(FileUtils.getTempDirectory(), "segmentMetadataFile-" + System.nanoTime() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+      generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath), segmentMetadataFile);
+      try {
+        for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+          URI controllerURI;
+          try {
+            controllerURI = new URI(pinotClusterSpec.getControllerURI());
+          } catch (URISyntaxException e) {
+            throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'");
+          }
+          LOGGER.info("Pushing segment: {} to location: {} for table {}", segmentName, controllerURI, tableName);
+          int attempts = 1;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushAttempts() > 0) {
+            attempts = spec.getPushJobSpec().getPushAttempts();
+          }
+          long retryWaitMs = 1000L;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+            retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+          }
+          RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> {
+            try {
+              List<Header> headers = ImmutableList.of(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath));
+              // Add table name as a request parameter
+              NameValuePair tableNameValuePair =
+                  new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName);
+              List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
+              SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentMetadataURI(controllerURI),
+                  segmentName, segmentMetadataFile, headers, parameters, FILE_UPLOAD_DOWNLOAD_CLIENT.DEFAULT_SOCKET_TIMEOUT_MS);
+              LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName,
+                  controllerURI, response.getStatusCode(), response.getResponse());
+              return true;
+            } catch (HttpErrorStatusException e) {
+              int statusCode = e.getStatusCode();
+              if (statusCode >= 500) {
+                // Temporary exception
+                LOGGER
+                    .warn("Caught temporary exception while pushing table: {} segment: {} to {}, will retry", tableName,
+                        segmentName, controllerURI, e);
+                return false;
+              } else {
+                // Permanent exception
+                LOGGER.error("Caught permanent exception while pushing table: {} segment: {} to {}, won't retry",
+                    tableName, segmentName, controllerURI, e);
+                throw e;
+              }
+            }
+          });
+        }
+      } finally {
+        FileUtils.deleteQuietly(segmentMetadataFile);
+      }
+    }
+  }
+
+  public static Map<String, String> getSegmentUriToTarPathMap(URI outputDirURI, String uriPrefix, String uriSuffix, String[] files) {
+    Map<String, String> segmentUriToTarPathMap = new HashMap<>();
+    for (String file : files) {
+      URI uri = URI.create(file);
+      if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
+        URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputDirURI, uri, uriPrefix,uriSuffix);
+        segmentUriToTarPathMap.put(updatedURI.toString(), file);
+      }
+    }
+    return segmentUriToTarPathMap;
+  }
+
+  /**
+   * Generate a segment metadata only tar file, which contains only metadata.properties and creation.meta file.
+   * The purpose of this is to create a lean tar to push to Pinot controller for adding segments without downloading
+   * the complete segment and untar the segment tarball.
+   *
+   * 1. Download segment tar file to temp dir;
+   * 2. Extract only metadata.properties and creation.meta files from the segment tar file;
+   * 3. Tar both files into a segment metadata file.
+   *
+   */
+  private static boolean generateSegmentMetadataFile(PinotFS fileSystem, URI tarFileURI, File segmentMetadataTarFile)
+      throws Exception {
+    long currentTime = System.nanoTime();
+    File tarFile = new File(FileUtils.getTempDirectory(), "segmentTar-" + currentTime + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION).getAbsoluteFile();
+    File segmentMetadataDir = new File(FileUtils.getTempDirectory(), "segmentMetadataDir-" + currentTime).getAbsoluteFile();

Review comment:
       ```suggestion
       File tarFile = new File(FileUtils.getTempDirectory(), "segmentTar-" + currentTime + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
       File segmentMetadataDir = new File(FileUtils.getTempDirectory(), "segmentMetadataDir-" + currentTime);
   ```

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +187,125 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriToTarPathMap(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)

Review comment:
       Rename this to reflect it is sending the segment metadata instead of the segments?

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +187,125 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriToTarPathMap(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    LOGGER.info("Start pushing segments: {}... to locations: {} for table {}",

Review comment:
       ```suggestion
       LOGGER.info("Start pushing segment metadata: {} to locations: {} for table {}",
   ```

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +187,125 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriToTarPathMap(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    LOGGER.info("Start pushing segments: {}... to locations: {} for table {}",
+        Arrays.toString(segmentUriToTarPathMap.entrySet().toArray()),

Review comment:
       ```suggestion
           segmentUriToTarPathMap,
   ```

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +187,125 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriToTarPathMap(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    LOGGER.info("Start pushing segments: {}... to locations: {} for table {}",
+        Arrays.toString(segmentUriToTarPathMap.entrySet().toArray()),
+        Arrays.toString(spec.getPinotClusterSpecs()), tableName);
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+      String fileName = new File(tarFilePath).getName();
+      Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
+      String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length());
+      File segmentMetadataFile = new File(FileUtils.getTempDirectory(), "segmentMetadataFile-" + System.nanoTime() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+      generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath), segmentMetadataFile);
+      try {
+        for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+          URI controllerURI;
+          try {
+            controllerURI = new URI(pinotClusterSpec.getControllerURI());
+          } catch (URISyntaxException e) {
+            throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'");
+          }
+          LOGGER.info("Pushing segment: {} to location: {} for table {}", segmentName, controllerURI, tableName);
+          int attempts = 1;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushAttempts() > 0) {
+            attempts = spec.getPushJobSpec().getPushAttempts();
+          }
+          long retryWaitMs = 1000L;
+          if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+            retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+          }
+          RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> {
+            try {
+              List<Header> headers = ImmutableList.of(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath));
+              // Add table name as a request parameter
+              NameValuePair tableNameValuePair =
+                  new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName);
+              List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
+              SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentMetadataURI(controllerURI),
+                  segmentName, segmentMetadataFile, headers, parameters, FILE_UPLOAD_DOWNLOAD_CLIENT.DEFAULT_SOCKET_TIMEOUT_MS);
+              LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName,
+                  controllerURI, response.getStatusCode(), response.getResponse());
+              return true;
+            } catch (HttpErrorStatusException e) {
+              int statusCode = e.getStatusCode();
+              if (statusCode >= 500) {
+                // Temporary exception
+                LOGGER
+                    .warn("Caught temporary exception while pushing table: {} segment: {} to {}, will retry", tableName,
+                        segmentName, controllerURI, e);
+                return false;
+              } else {
+                // Permanent exception
+                LOGGER.error("Caught permanent exception while pushing table: {} segment: {} to {}, won't retry",
+                    tableName, segmentName, controllerURI, e);
+                throw e;
+              }
+            }
+          });
+        }
+      } finally {
+        FileUtils.deleteQuietly(segmentMetadataFile);
+      }
+    }
+  }
+
+  public static Map<String, String> getSegmentUriToTarPathMap(URI outputDirURI, String uriPrefix, String uriSuffix, String[] files) {
+    Map<String, String> segmentUriToTarPathMap = new HashMap<>();
+    for (String file : files) {
+      URI uri = URI.create(file);
+      if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
+        URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputDirURI, uri, uriPrefix,uriSuffix);
+        segmentUriToTarPathMap.put(updatedURI.toString(), file);
+      }
+    }
+    return segmentUriToTarPathMap;
+  }
+
+  /**
+   * Generate a segment metadata only tar file, which contains only metadata.properties and creation.meta file.
+   * The purpose of this is to create a lean tar to push to Pinot controller for adding segments without downloading
+   * the complete segment and untar the segment tarball.
+   *
+   * 1. Download segment tar file to temp dir;
+   * 2. Extract only metadata.properties and creation.meta files from the segment tar file;
+   * 3. Tar both files into a segment metadata file.
+   *
+   */
+  private static boolean generateSegmentMetadataFile(PinotFS fileSystem, URI tarFileURI, File segmentMetadataTarFile)
+      throws Exception {
+    long currentTime = System.nanoTime();

Review comment:
       Suggest using UUID




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org