You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/07/23 21:05:03 UTC

[incubator-pinot] branch deleteExtras created (now 3eeb7fd)

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a change to branch deleteExtras
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 3eeb7fd  Delete extra segments that are pushed

This branch includes the following new commits:

     new 3eeb7fd  Delete extra segments that are pushed

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Delete extra segments that are pushed

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch deleteExtras
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 3eeb7fdb8516f2895afc8205ade9ccaa6044b410
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Tue Jul 23 13:59:19 2019 -0700

    Delete extra segments that are pushed
---
 .../pinot/hadoop/job/JobConfigConstants.java       |  2 ++
 .../apache/pinot/hadoop/job/SegmentTarPushJob.java | 34 +++++++++++++++++++++-
 2 files changed, 35 insertions(+), 1 deletion(-)

diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
index 19b350c..46d3d5b 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
@@ -57,4 +57,6 @@ public class JobConfigConstants {
   public static final String ENABLE_PARTITIONING = "enable.partitioning";
   public static final String ENABLE_SORTING = "enable.sorting";
   public static final String ENABLE_RESIZING = "enable.resizing";
+
+  public static final String DELETE_EXTRA_REFRESHED_SEGMENTS = "delete.extra.refreshed.segments";
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
index 7c71fd8..1b31107 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
@@ -19,8 +19,12 @@
 package org.apache.pinot.hadoop.job;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -30,6 +34,7 @@ import org.apache.pinot.hadoop.utils.PushLocation;
 public class SegmentTarPushJob extends BaseSegmentJob {
   private final Path _segmentPattern;
   private final List<PushLocation> _pushLocations;
+  private final boolean _deleteExtraSegments;
 
   public SegmentTarPushJob(Properties properties) {
     super(properties);
@@ -37,6 +42,7 @@ public class SegmentTarPushJob extends BaseSegmentJob {
     String[] hosts = StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), ',');
     int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
     _pushLocations = PushLocation.getPushLocations(hosts, port);
+    _deleteExtraSegments = Boolean.parseBoolean(properties.getProperty(JobConfigConstants.DELETE_EXTRA_REFRESHED_SEGMENTS, "false"));
   }
 
   @Override
@@ -48,7 +54,33 @@ public class SegmentTarPushJob extends BaseSegmentJob {
       throws Exception {
     FileSystem fileSystem = FileSystem.get(_conf);
     try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
-      controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern));
+      // TODO: Deal with invalid prefixes in the future
+      if (_deleteExtraSegments) {
+        List<String> allSegments = controllerRestApi.getAllSegments("OFFLINE");
+        Set<String> uniqueSegmentPrefixes = new HashSet<>();
+
+        // Get all relevant segment prefixes that we are planning on pushing
+        List<Path> segmentsToPushPaths = getDataFilePaths(_segmentPattern);
+        List<String> segmentsToPushNames = segmentsToPushPaths.stream().map(s -> s.getName()).collect(Collectors.toList());
+        for (String segmentName : segmentsToPushNames) {
+          String segmentNamePrefix = segmentName.substring(segmentName.length() - 1);
+          uniqueSegmentPrefixes.add(segmentNamePrefix);
+        }
+
+        List<String> relevantSegments = new ArrayList<>();
+        // Get relevant segments already pushed that we are planning on refreshing
+        for (String segmentName : allSegments) {
+          if (uniqueSegmentPrefixes.contains(segmentName.substring(segmentName.length() - 1))) {
+            relevantSegments.add(segmentName);
+          }
+        }
+
+        relevantSegments.removeAll(segmentsToPushNames);
+        controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern));
+        controllerRestApi.deleteSegmentUris(relevantSegments);
+      } else {
+        controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern));
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org