You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/07/30 17:04:12 UTC
[incubator-pinot] branch master updated: Delete extra refreshed
segments after segment push (#4464)
This is an automated email from the ASF dual-hosted git repository.
jenniferdai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ad6ff6b Delete extra refreshed segments after segment push (#4464)
ad6ff6b is described below
commit ad6ff6b8de06c82ee82cc46a1e346e0e41b4ad77
Author: Jennifer Dai <je...@users.noreply.github.com>
AuthorDate: Tue Jul 30 10:04:05 2019 -0700
Delete extra refreshed segments after segment push (#4464)
* Adds a config to make possible deleting extra segments that are pushed within the same configured time segment or refreshed
---
.../pinot/hadoop/job/JobConfigConstants.java | 4 +
.../apache/pinot/hadoop/job/SegmentTarPushJob.java | 57 +++++++++++-
.../hadoop/io/DeleteExtraPushedSegmentsTest.java | 102 +++++++++++++++++++++
3 files changed, 162 insertions(+), 1 deletion(-)
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
index 19b350c..3c263cd 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
@@ -57,4 +57,8 @@ public class JobConfigConstants {
public static final String ENABLE_PARTITIONING = "enable.partitioning";
public static final String ENABLE_SORTING = "enable.sorting";
public static final String ENABLE_RESIZING = "enable.resizing";
+
+ // This setting should be used if you will generate less # of segments after
+ // push. In preprocessing, this is likely because we resize segments.
+ public static final String DELETE_EXTRA_SEGMENTS = "delete.extra.segments";
}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
index 49bccc9..5385aaf 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
@@ -19,8 +19,13 @@
package org.apache.pinot.hadoop.job;
import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -31,6 +36,7 @@ public class SegmentTarPushJob extends BaseSegmentJob {
private final Path _segmentPattern;
private final List<PushLocation> _pushLocations;
private final String _rawTableName;
+ private final boolean _deleteExtraSegments;
public SegmentTarPushJob(Properties properties) {
super(properties);
@@ -39,6 +45,7 @@ public class SegmentTarPushJob extends BaseSegmentJob {
int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
_pushLocations = PushLocation.getPushLocations(hosts, port);
_rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
+ _deleteExtraSegments = Boolean.parseBoolean(properties.getProperty(JobConfigConstants.DELETE_EXTRA_SEGMENTS, "false"));
}
@Override
@@ -49,9 +56,57 @@ public class SegmentTarPushJob extends BaseSegmentJob {
public void run()
throws Exception {
FileSystem fileSystem = FileSystem.get(_conf);
+ List<Path> segmentsToPush = getDataFilePaths(_segmentPattern);
try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
- controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern));
+ // TODO: Deal with invalid prefixes in the future
+
+ List<String> currentSegments = controllerRestApi.getAllSegments("OFFLINE");
+
+ controllerRestApi.pushSegments(fileSystem, segmentsToPush);
+
+ if (_deleteExtraSegments) {
+ controllerRestApi.deleteSegmentUris(getSegmentsToDelete(currentSegments, segmentsToPush));
+ }
+ }
+ }
+
+ /**
+ * Deletes extra segments after pushing to the controller
+ * @param allSegments all segments on the controller for the table
+ * @param segmentsToPush segments that will be pushed to the controller
+ * @throws IOException
+ */
+ public List<String> getSegmentsToDelete(List<String> allSegments, List<Path> segmentsToPush) {
+ Set<String> uniqueSegmentPrefixes = new HashSet<>();
+
+ // Get all relevant segment prefixes that we are planning on pushing
+ List<String> segmentNamesToPush = segmentsToPush.stream().map(s -> s.getName()).collect(Collectors.toList());
+ for (String segmentName : segmentNamesToPush) {
+ String segmentNamePrefix = removeSequenceId(segmentName);
+ uniqueSegmentPrefixes.add(segmentNamePrefix);
+ }
+
+ List<String> relevantSegments = new ArrayList<>();
+ // Get relevant segments already pushed that we are planning on refreshing
+ for (String segmentName : allSegments) {
+ if (uniqueSegmentPrefixes.contains(removeSequenceId(segmentName))) {
+ relevantSegments.add(segmentName);
+ }
}
+
+ relevantSegments.removeAll(segmentNamesToPush);
+ return relevantSegments;
+ }
+
+ /**
+ * Remove trailing sequence id
+ * eg: If segment name is mytable_12, it will return mytable_
+ * If segment name is mytable_20190809_20190809_12, it will return mytable_20190809_20190809_
+ * @param segmentName
+ * @return
+ */
+ private String removeSequenceId(String segmentName) {
+ return segmentName.replaceAll("\\d*$", "");
}
protected ControllerRestApi getControllerRestApi() {
diff --git a/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/io/DeleteExtraPushedSegmentsTest.java b/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/io/DeleteExtraPushedSegmentsTest.java
new file mode 100644
index 0000000..14dcea5
--- /dev/null
+++ b/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/io/DeleteExtraPushedSegmentsTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.io;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.hadoop.job.JobConfigConstants;
+import org.apache.pinot.hadoop.job.SegmentTarPushJob;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests logic to delete extra segments within the same time unit for APPEND or extra segments during REFRESH cases.
+ */
+public class DeleteExtraPushedSegmentsTest {
+ Properties _defaultProperties = new Properties();
+
+ @BeforeClass
+ private void setup() {
+ _defaultProperties.setProperty(JobConfigConstants.PUSH_TO_HOSTS, "sample_host");
+ _defaultProperties.setProperty(JobConfigConstants.PUSH_TO_PORT, "1234");
+ _defaultProperties.setProperty(JobConfigConstants.PATH_TO_OUTPUT, "sample_output_path");
+ _defaultProperties.setProperty(JobConfigConstants.SEGMENT_TABLE_NAME, "myTable");
+ }
+
+ @Test
+ public void checkDelete() {
+ List<String> allSegmentsInCluster = new ArrayList<>();
+ allSegmentsInCluster.add("mytable_2018-09-10_2018-09-10_0");
+ allSegmentsInCluster.add("mytable_2018-09-10_2018-09-10_1");
+ allSegmentsInCluster.add("mytable_2018-09-10_2018-09-10_2");
+
+ List<Path> currentSegments = new ArrayList<>();
+ currentSegments.add(new Path("mytable_2018-09-10_2018-09-10_0"));
+ SegmentTarPushJob segmentTarPushJob = new SegmentTarPushJob(_defaultProperties);
+ List<String> segmentsToDelete = segmentTarPushJob.getSegmentsToDelete(allSegmentsInCluster, currentSegments);
+ Assert.assertEquals(segmentsToDelete.size(), 2);
+ Assert.assertFalse(segmentsToDelete.contains("mytable_2018-09-10_2018-09-10_0"));
+ }
+
+ @Test
+ public void checkDeleteWithRefresh() {
+ List<String> allSegmentsInCluster = new ArrayList<>();
+ allSegmentsInCluster.add("mytable_0");
+ allSegmentsInCluster.add("mytable_1");
+ allSegmentsInCluster.add("mytable_2");
+
+ List<Path> currentSegments = new ArrayList<>();
+ currentSegments.add(new Path("mytable_0"));
+ SegmentTarPushJob segmentTarPushJob = new SegmentTarPushJob(_defaultProperties);
+ List<String> segmentsToDelete = segmentTarPushJob.getSegmentsToDelete(allSegmentsInCluster, currentSegments);
+ Assert.assertEquals(segmentsToDelete.size(), 2);
+ Assert.assertFalse(segmentsToDelete.contains("mytable_0"));
+ }
+
+ @Test
+ public void checkDeleteWithDoubleDigitSequenceIds() {
+ List<String> allSegmentsInCluster = new ArrayList<>();
+ allSegmentsInCluster.add("mytable_02");
+ allSegmentsInCluster.add("mytable_12");
+ allSegmentsInCluster.add("mytable_23");
+
+ List<Path> currentSegments = new ArrayList<>();
+ currentSegments.add(new Path("mytable_02"));
+ SegmentTarPushJob segmentTarPushJob = new SegmentTarPushJob(_defaultProperties);
+ List<String> segmentsToDelete = segmentTarPushJob.getSegmentsToDelete(allSegmentsInCluster, currentSegments);
+ Assert.assertEquals(segmentsToDelete.size(), 2);
+ Assert.assertFalse(segmentsToDelete.contains("mytable_02"));
+ }
+
+ @Test
+ public void checkDeleteWithoutSequenceIds() {
+ List<String> allSegmentsInCluster = new ArrayList<>();
+ allSegmentsInCluster.add("mytable");
+
+ List<Path> currentSegments = new ArrayList<>();
+ currentSegments.add(new Path("mytable"));
+ SegmentTarPushJob segmentTarPushJob = new SegmentTarPushJob(_defaultProperties);
+ List<String> segmentsToDelete = segmentTarPushJob.getSegmentsToDelete(allSegmentsInCluster, currentSegments);
+ Assert.assertEquals(segmentsToDelete.size(), 0);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org