You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/07/23 21:11:59 UTC

[incubator-pinot] branch deleteExtras updated (000255a -> 0660d00)

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a change to branch deleteExtras
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


 discard 000255a  Delete extra segments that are pushed
     new 0660d00  Delete extra segments that are pushed

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (000255a)
            \
             N -- N -- N   refs/heads/deleteExtras (0660d00)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java    | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Delete extra segments that are pushed

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch deleteExtras
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 0660d007158414bd559f039754ed6a3babd3e9a4
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Tue Jul 23 13:59:19 2019 -0700

    Delete extra segments that are pushed
---
 .../pinot/hadoop/job/JobConfigConstants.java       |  2 +
 .../apache/pinot/hadoop/job/SegmentTarPushJob.java | 43 +++++++++++++++++++++-
 2 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
index 19b350c..46d3d5b 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
@@ -57,4 +57,6 @@ public class JobConfigConstants {
   public static final String ENABLE_PARTITIONING = "enable.partitioning";
   public static final String ENABLE_SORTING = "enable.sorting";
   public static final String ENABLE_RESIZING = "enable.resizing";
+
+  public static final String DELETE_EXTRA_REFRESHED_SEGMENTS = "delete.extra.refreshed.segments";
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
index 7c71fd8..9527b43 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
@@ -19,8 +19,12 @@
 package org.apache.pinot.hadoop.job;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -30,6 +34,7 @@ import org.apache.pinot.hadoop.utils.PushLocation;
 public class SegmentTarPushJob extends BaseSegmentJob {
   private final Path _segmentPattern;
   private final List<PushLocation> _pushLocations;
+  private final boolean _deleteExtraSegments;
 
   public SegmentTarPushJob(Properties properties) {
     super(properties);
@@ -37,6 +42,7 @@ public class SegmentTarPushJob extends BaseSegmentJob {
     String[] hosts = StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), ',');
     int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
     _pushLocations = PushLocation.getPushLocations(hosts, port);
+    _deleteExtraSegments = Boolean.parseBoolean(properties.getProperty(JobConfigConstants.DELETE_EXTRA_REFRESHED_SEGMENTS, "false"));
   }
 
   @Override
@@ -48,10 +54,45 @@ public class SegmentTarPushJob extends BaseSegmentJob {
       throws Exception {
     FileSystem fileSystem = FileSystem.get(_conf);
     try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
-      controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern));
+      // TODO: Deal with invalid prefixes in the future
+      if (_deleteExtraSegments) {
+        List<String> allSegments = controllerRestApi.getAllSegments("OFFLINE");
+        Set<String> uniqueSegmentPrefixes = new HashSet<>();
+
+        // Get all relevant segment prefixes that we are planning on pushing
+        List<Path> segmentsToPushPaths = getDataFilePaths(_segmentPattern);
+        List<String> segmentsToPushNames = segmentsToPushPaths.stream().map(s -> s.getName()).collect(Collectors.toList());
+        for (String segmentName : segmentsToPushNames) {
+          String segmentNamePrefix = removeSequenceId(segmentName);
+          uniqueSegmentPrefixes.add(segmentNamePrefix);
+        }
+
+        List<String> relevantSegments = new ArrayList<>();
+        // Get relevant segments already pushed that we are planning on refreshing
+        for (String segmentName : allSegments) {
+          if (uniqueSegmentPrefixes.contains(removeSequenceId(segmentName))) {
+            relevantSegments.add(segmentName);
+          }
+        }
+
+        relevantSegments.removeAll(segmentsToPushNames);
+        controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern));
+        controllerRestApi.deleteSegmentUris(relevantSegments);
+      } else {
+        controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern));
+      }
     }
   }
 
+  /**
+   * Remove trailing sequence id
+   * @param segmentName
+   * @return
+   */
+  private String removeSequenceId(String segmentName) {
+    return segmentName.replaceAll("\\d*$", "");
+  }
+
   protected ControllerRestApi getControllerRestApi() {
     return new DefaultControllerRestApi(_pushLocations, null);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org