You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/07/30 17:04:12 UTC

[incubator-pinot] branch master updated: Delete extra refreshed segments after segment push (#4464)

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ad6ff6b  Delete extra refreshed segments after segment push (#4464)
ad6ff6b is described below

commit ad6ff6b8de06c82ee82cc46a1e346e0e41b4ad77
Author: Jennifer Dai <je...@users.noreply.github.com>
AuthorDate: Tue Jul 30 10:04:05 2019 -0700

    Delete extra refreshed segments after segment push (#4464)
    
    * Adds a config to make possible deleting extra segments that are pushed within the same configured time segment or refreshed
---
 .../pinot/hadoop/job/JobConfigConstants.java       |   4 +
 .../apache/pinot/hadoop/job/SegmentTarPushJob.java |  57 +++++++++++-
 .../hadoop/io/DeleteExtraPushedSegmentsTest.java   | 102 +++++++++++++++++++++
 3 files changed, 162 insertions(+), 1 deletion(-)

diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
index 19b350c..3c263cd 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
@@ -57,4 +57,8 @@ public class JobConfigConstants {
   public static final String ENABLE_PARTITIONING = "enable.partitioning";
   public static final String ENABLE_SORTING = "enable.sorting";
   public static final String ENABLE_RESIZING = "enable.resizing";
+
+  // This setting should be used if you will generate less # of segments after
+  // push. In preprocessing, this is likely because we resize segments.
+  public static final String DELETE_EXTRA_SEGMENTS = "delete.extra.segments";
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
index 49bccc9..5385aaf 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
@@ -19,8 +19,13 @@
 package org.apache.pinot.hadoop.job;
 
 import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -31,6 +36,7 @@ public class SegmentTarPushJob extends BaseSegmentJob {
   private final Path _segmentPattern;
   private final List<PushLocation> _pushLocations;
   private final String _rawTableName;
+  private final boolean _deleteExtraSegments;
 
   public SegmentTarPushJob(Properties properties) {
     super(properties);
@@ -39,6 +45,7 @@ public class SegmentTarPushJob extends BaseSegmentJob {
     int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
     _pushLocations = PushLocation.getPushLocations(hosts, port);
     _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
+    _deleteExtraSegments = Boolean.parseBoolean(properties.getProperty(JobConfigConstants.DELETE_EXTRA_SEGMENTS, "false"));
   }
 
   @Override
@@ -49,9 +56,57 @@ public class SegmentTarPushJob extends BaseSegmentJob {
   public void run()
       throws Exception {
     FileSystem fileSystem = FileSystem.get(_conf);
+    List<Path> segmentsToPush = getDataFilePaths(_segmentPattern);
     try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
-      controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern));
+      // TODO: Deal with invalid prefixes in the future
+
+      List<String> currentSegments = controllerRestApi.getAllSegments("OFFLINE");
+
+      controllerRestApi.pushSegments(fileSystem, segmentsToPush);
+
+      if (_deleteExtraSegments) {
+        controllerRestApi.deleteSegmentUris(getSegmentsToDelete(currentSegments, segmentsToPush));
+      }
+    }
+  }
+
+  /**
+   * Deletes extra segments after pushing to the controller
+   * @param allSegments all segments on the controller for the table
+   * @param segmentsToPush segments that will be pushed to the controller
+   * @throws IOException
+   */
+  public List<String> getSegmentsToDelete(List<String> allSegments, List<Path> segmentsToPush) {
+    Set<String> uniqueSegmentPrefixes = new HashSet<>();
+
+    // Get all relevant segment prefixes that we are planning on pushing
+    List<String> segmentNamesToPush = segmentsToPush.stream().map(s -> s.getName()).collect(Collectors.toList());
+    for (String segmentName : segmentNamesToPush) {
+      String segmentNamePrefix = removeSequenceId(segmentName);
+      uniqueSegmentPrefixes.add(segmentNamePrefix);
+    }
+
+    List<String> relevantSegments = new ArrayList<>();
+    // Get relevant segments already pushed that we are planning on refreshing
+    for (String segmentName : allSegments) {
+      if (uniqueSegmentPrefixes.contains(removeSequenceId(segmentName))) {
+        relevantSegments.add(segmentName);
+      }
     }
+
+    relevantSegments.removeAll(segmentNamesToPush);
+    return relevantSegments;
+  }
+
+  /**
+   * Remove trailing sequence id
+   * eg: If segment name is mytable_12, it will return mytable_
+   * If segment name is mytable_20190809_20190809_12, it will return mytable_20190809_20190809_
+   * @param segmentName
+   * @return
+   */
+  private String removeSequenceId(String segmentName) {
+    return segmentName.replaceAll("\\d*$", "");
   }
 
   protected ControllerRestApi getControllerRestApi() {
diff --git a/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/io/DeleteExtraPushedSegmentsTest.java b/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/io/DeleteExtraPushedSegmentsTest.java
new file mode 100644
index 0000000..14dcea5
--- /dev/null
+++ b/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/io/DeleteExtraPushedSegmentsTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.io;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.hadoop.job.JobConfigConstants;
+import org.apache.pinot.hadoop.job.SegmentTarPushJob;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests logic to delete extra segments within the same time unit for APPEND or extra segments during REFRESH cases.
+ */
+public class DeleteExtraPushedSegmentsTest {
+  Properties _defaultProperties = new Properties();
+
+  @BeforeClass
+  private void setup() {
+    _defaultProperties.setProperty(JobConfigConstants.PUSH_TO_HOSTS, "sample_host");
+    _defaultProperties.setProperty(JobConfigConstants.PUSH_TO_PORT, "1234");
+    _defaultProperties.setProperty(JobConfigConstants.PATH_TO_OUTPUT, "sample_output_path");
+    _defaultProperties.setProperty(JobConfigConstants.SEGMENT_TABLE_NAME, "myTable");
+  }
+
+  @Test
+  public void checkDelete() {
+    List<String> allSegmentsInCluster = new ArrayList<>();
+    allSegmentsInCluster.add("mytable_2018-09-10_2018-09-10_0");
+    allSegmentsInCluster.add("mytable_2018-09-10_2018-09-10_1");
+    allSegmentsInCluster.add("mytable_2018-09-10_2018-09-10_2");
+
+    List<Path> currentSegments = new ArrayList<>();
+    currentSegments.add(new Path("mytable_2018-09-10_2018-09-10_0"));
+    SegmentTarPushJob segmentTarPushJob = new SegmentTarPushJob(_defaultProperties);
+    List<String> segmentsToDelete = segmentTarPushJob.getSegmentsToDelete(allSegmentsInCluster, currentSegments);
+    Assert.assertEquals(segmentsToDelete.size(), 2);
+    Assert.assertFalse(segmentsToDelete.contains("mytable_2018-09-10_2018-09-10_0"));
+  }
+
+  @Test
+  public void checkDeleteWithRefresh() {
+    List<String> allSegmentsInCluster = new ArrayList<>();
+    allSegmentsInCluster.add("mytable_0");
+    allSegmentsInCluster.add("mytable_1");
+    allSegmentsInCluster.add("mytable_2");
+
+    List<Path> currentSegments = new ArrayList<>();
+    currentSegments.add(new Path("mytable_0"));
+    SegmentTarPushJob segmentTarPushJob = new SegmentTarPushJob(_defaultProperties);
+    List<String> segmentsToDelete = segmentTarPushJob.getSegmentsToDelete(allSegmentsInCluster, currentSegments);
+    Assert.assertEquals(segmentsToDelete.size(), 2);
+    Assert.assertFalse(segmentsToDelete.contains("mytable_0"));
+  }
+
+  @Test
+  public void checkDeleteWithDoubleDigitSequenceIds() {
+    List<String> allSegmentsInCluster = new ArrayList<>();
+    allSegmentsInCluster.add("mytable_02");
+    allSegmentsInCluster.add("mytable_12");
+    allSegmentsInCluster.add("mytable_23");
+
+    List<Path> currentSegments = new ArrayList<>();
+    currentSegments.add(new Path("mytable_02"));
+    SegmentTarPushJob segmentTarPushJob = new SegmentTarPushJob(_defaultProperties);
+    List<String> segmentsToDelete = segmentTarPushJob.getSegmentsToDelete(allSegmentsInCluster, currentSegments);
+    Assert.assertEquals(segmentsToDelete.size(), 2);
+    Assert.assertFalse(segmentsToDelete.contains("mytable_02"));
+  }
+
+  @Test
+  public void checkDeleteWithoutSequenceIds() {
+    List<String> allSegmentsInCluster = new ArrayList<>();
+    allSegmentsInCluster.add("mytable");
+
+    List<Path> currentSegments = new ArrayList<>();
+    currentSegments.add(new Path("mytable"));
+    SegmentTarPushJob segmentTarPushJob = new SegmentTarPushJob(_defaultProperties);
+    List<String> segmentsToDelete = segmentTarPushJob.getSegmentsToDelete(allSegmentsInCluster, currentSegments);
+    Assert.assertEquals(segmentsToDelete.size(), 0);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org