You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by hi...@apache.org on 2019/08/05 20:38:47 UTC
[incubator-druid] branch master updated: Add shuffleSegmentPusher
for data shuffle (#8115)
This is an automated email from the ASF dual-hosted git repository.
himanshug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new ab5b3be Add shuffleSegmentPusher for data shuffle (#8115)
ab5b3be is described below
commit ab5b3be6c61d7be540f85385327a6a6489b363b1
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Mon Aug 5 13:38:35 2019 -0700
Add shuffleSegmentPusher for data shuffle (#8115)
* Fix race between canHandle() and addSegment() in StorageLocation
* add comment
* Add shuffleSegmentPusher which is a dataSegmentPusher used for writing shuffle data in local storage.
* add comments
* unused import
* add comments
* fix test
* address comments
* remove <p> tag from javadoc
* address comments
* comparingLong
* Address comments
* fix test
---
.../druid/indexing/common/config/TaskConfig.java | 5 +
.../indexing/worker/IntermediaryDataManager.java | 166 +++++++++++++--------
.../indexing/worker/ShuffleDataSegmentPusher.java | 77 ++++++++++
.../IntermediaryDataManagerAutoCleanupTest.java | 15 +-
...ermediaryDataManagerManualAddAndDeleteTest.java | 47 +++---
.../worker/ShuffleDataSegmentPusherTest.java | 138 +++++++++++++++++
.../druid/segment/loading/StorageLocation.java | 7 +-
...mentLoaderLocalCacheManagerConcurrencyTest.java | 2 -
.../druid/segment/loading/StorageLocationTest.java | 14 +-
9 files changed, 366 insertions(+), 105 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
index 31405fe..52bf083 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
@@ -126,6 +126,11 @@ public class TaskConfig
return new File(getTaskDir(taskId), "work");
}
+ public File getTaskTempDir(String taskId)
+ {
+ return new File(getTaskDir(taskId), "temp");
+ }
+
public File getTaskLockFile(String taskId)
{
return new File(getTaskDir(taskId), "lock");
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
index c47425a..95acb22 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
@@ -33,11 +33,13 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.StorageLocation;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.utils.CompressionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
@@ -45,6 +47,7 @@ import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
@@ -57,12 +60,14 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* This class manages intermediary segments for data shuffle between native parallel index tasks.
- * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers
+ * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers (or indexer)
* and phase 2 tasks read those files via HTTP.
*
* The directory where segment files are placed is structured as
@@ -75,11 +80,12 @@ import java.util.stream.Collectors;
@ManageLifecycle
public class IntermediaryDataManager
{
- private static final Logger log = new Logger(IntermediaryDataManager.class);
+ private static final Logger LOG = new Logger(IntermediaryDataManager.class);
private final long intermediaryPartitionDiscoveryPeriodSec;
private final long intermediaryPartitionCleanupPeriodSec;
private final Period intermediaryPartitionTimeout;
+ private final TaskConfig taskConfig;
private final List<StorageLocation> shuffleDataLocations;
private final IndexingServiceClient indexingServiceClient;
@@ -108,6 +114,7 @@ public class IntermediaryDataManager
this.intermediaryPartitionDiscoveryPeriodSec = workerConfig.getIntermediaryPartitionDiscoveryPeriodSec();
this.intermediaryPartitionCleanupPeriodSec = workerConfig.getIntermediaryPartitionCleanupPeriodSec();
this.intermediaryPartitionTimeout = workerConfig.getIntermediaryPartitionTimeout();
+ this.taskConfig = taskConfig;
this.shuffleDataLocations = taskConfig
.getShuffleDataLocations()
.stream()
@@ -119,6 +126,7 @@ public class IntermediaryDataManager
@LifecycleStart
public void start()
{
+ discoverSupervisorTaskPartitions();
supervisorTaskChecker = Execs.scheduledSingleThreaded("intermediary-data-manager-%d");
// Discover partitions for new supervisorTasks
supervisorTaskChecker.scheduleAtFixedRate(
@@ -127,7 +135,7 @@ public class IntermediaryDataManager
discoverSupervisorTaskPartitions();
}
catch (Exception e) {
- log.warn(e, "Error while discovering supervisorTasks");
+ LOG.warn(e, "Error while discovering supervisorTasks");
}
},
intermediaryPartitionDiscoveryPeriodSec,
@@ -141,10 +149,10 @@ public class IntermediaryDataManager
deleteExpiredSuprevisorTaskPartitionsIfNotRunning();
}
catch (InterruptedException e) {
- log.error(e, "Error while cleaning up partitions for expired supervisors");
+ LOG.error(e, "Error while cleaning up partitions for expired supervisors");
}
catch (Exception e) {
- log.warn(e, "Error while cleaning up partitions for expired supervisors");
+ LOG.warn(e, "Error while cleaning up partitions for expired supervisors");
}
},
intermediaryPartitionCleanupPeriodSec,
@@ -163,9 +171,13 @@ public class IntermediaryDataManager
supervisorTaskCheckTimes.clear();
}
+ /**
+ * IntermediaryDataManager periodically calls this method after it starts up to search for unknown intermediary data.
+ */
private void discoverSupervisorTaskPartitions()
{
for (StorageLocation location : shuffleDataLocations) {
+ final Path locationPath = location.getPath().toPath().toAbsolutePath();
final MutableInt numDiscovered = new MutableInt(0);
final File[] dirsPerSupervisorTask = location.getPath().listFiles();
if (dirsPerSupervisorTask != null) {
@@ -174,13 +186,32 @@ public class IntermediaryDataManager
supervisorTaskCheckTimes.computeIfAbsent(
supervisorTaskId,
k -> {
+ for (File eachFile : FileUtils.listFiles(supervisorTaskDir, null, true)) {
+ final String relativeSegmentPath = locationPath
+ .relativize(eachFile.toPath().toAbsolutePath())
+ .toString();
+ // StorageLocation keeps track of how much storage capacity is being used.
+ // Newly found files should be known to the StorageLocation to keep it up to date.
+ final File reservedFile = location.reserve(
+ relativeSegmentPath,
+ eachFile.getName(),
+ eachFile.length()
+ );
+ if (reservedFile == null) {
+ LOG.warn("Can't add a discovered partition[%s]", eachFile.getAbsolutePath());
+ }
+ }
numDiscovered.increment();
return DateTimes.nowUtc().plus(intermediaryPartitionTimeout);
}
);
}
}
- log.info("Discovered partitions for [%s] new supervisor tasks", numDiscovered.getValue());
+ LOG.info(
+ "Discovered partitions for [%s] new supervisor tasks under location[%s]",
+ numDiscovered.getValue(),
+ location.getPath()
+ );
}
}
@@ -203,7 +234,7 @@ public class IntermediaryDataManager
}
}
- log.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size());
+ LOG.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size());
final Map<String, TaskStatus> taskStatuses = indexingServiceClient.getTaskStatuses(expiredSupervisorTasks);
for (Entry<String, TaskStatus> entry : taskStatuses.entrySet()) {
@@ -215,7 +246,7 @@ public class IntermediaryDataManager
deletePartitions(supervisorTaskId);
}
catch (IOException e) {
- log.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId);
+ LOG.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId);
}
} else {
// If it's still running, update last access time.
@@ -227,17 +258,74 @@ public class IntermediaryDataManager
/**
* Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per
* supervisorTaskId.
- *
- * This method is only useful for the new Indexer model. Tasks running in the existing middleManager should the static
- * addSegment method.
*/
- public void addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentFile)
+ long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir)
+ throws IOException
{
+ // Get or create the location iterator for supervisorTask.
final Iterator<StorageLocation> iterator = locationIterators.computeIfAbsent(
supervisorTaskId,
- k -> Iterators.cycle(shuffleDataLocations)
+ k -> {
+ final Iterator<StorageLocation> cyclicIterator = Iterators.cycle(shuffleDataLocations);
+ // Random start of the iterator
+ final int random = ThreadLocalRandom.current().nextInt(shuffleDataLocations.size());
+ IntStream.range(0, random).forEach(i -> cyclicIterator.next());
+ return cyclicIterator;
+ }
);
- addSegment(iterator, shuffleDataLocations.size(), supervisorTaskId, subTaskId, segment, segmentFile);
+
+ // Create a zipped segment in a temp directory.
+ final File taskTempDir = taskConfig.getTaskTempDir(subTaskId);
+
+ try (final Closer resourceCloser = Closer.create()) {
+ if (taskTempDir.mkdirs()) {
+ resourceCloser.register(() -> {
+ try {
+ FileUtils.forceDelete(taskTempDir);
+ }
+ catch (IOException e) {
+ LOG.warn(e, "Failed to delete directory[%s]", taskTempDir.getAbsolutePath());
+ }
+ });
+ }
+
+ // Tempary compressed file. Will be removed when taskTempDir is deleted.
+ final File tempZippedFile = new File(taskTempDir, segment.getId().toString());
+ final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, tempZippedFile);
+ if (unzippedSizeBytes == 0) {
+ throw new IOE(
+ "Read 0 bytes from segmentDir[%s]",
+ segmentDir.getAbsolutePath()
+ );
+ }
+
+ // Try copying the zipped segment to one of storage locations
+ for (int i = 0; i < shuffleDataLocations.size(); i++) {
+ final StorageLocation location = iterator.next();
+ final String partitionFilePath = getPartitionFilePath(
+ supervisorTaskId,
+ subTaskId,
+ segment.getInterval(),
+ segment.getShardSpec().getPartitionNum()
+ );
+ final File destFile = location.reserve(partitionFilePath, segment.getId().toString(), tempZippedFile.length());
+ if (destFile != null) {
+ FileUtils.forceMkdirParent(destFile);
+ org.apache.druid.java.util.common.FileUtils.writeAtomically(
+ destFile,
+ out -> Files.asByteSource(tempZippedFile).copyTo(out)
+ );
+ LOG.info(
+ "Wrote intermediary segment for segment[%s] of subtask[%s] at [%s]",
+ segment.getId(),
+ subTaskId,
+ destFile
+ );
+ return unzippedSizeBytes;
+ }
+ }
+ throw new ISE("Can't find location to handle segment[%s]", segment);
+ }
}
public List<File> findPartitionFiles(String supervisorTaskId, Interval interval, int partitionId)
@@ -259,7 +347,7 @@ public class IntermediaryDataManager
for (StorageLocation location : shuffleDataLocations) {
final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId);
if (supervisorTaskPath.exists()) {
- log.info("Cleaning up [%s]", supervisorTaskPath);
+ LOG.info("Cleaning up [%s]", supervisorTaskPath);
for (File eachFile : FileUtils.listFiles(supervisorTaskPath, null, true)) {
location.removeFile(eachFile);
}
@@ -269,54 +357,6 @@ public class IntermediaryDataManager
supervisorTaskCheckTimes.remove(supervisorTaskId);
}
- /**
- * Iterate through the given storage locations to find one which can handle the given segment.
- */
- public static void addSegment(
- Iterator<StorageLocation> cyclicIterator,
- int numLocations,
- String supervisorTaskId,
- String subTaskId,
- DataSegment segment,
- File segmentFile
- )
- {
- for (int i = 0; i < numLocations; i++) {
- final StorageLocation location = cyclicIterator.next();
- final File destFile = location.reserve(
- getPartitionFilePath(
- supervisorTaskId,
- subTaskId,
- segment.getInterval(),
- segment.getShardSpec().getPartitionNum()
- ),
- segment.getId(),
- segmentFile.length()
- );
- if (destFile != null) {
- try {
- FileUtils.forceMkdirParent(destFile);
- final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile));
- if (copiedBytes == 0) {
- throw new IOE(
- "0 bytes copied after copying a segment file from [%s] to [%s]",
- segmentFile.getAbsolutePath(),
- destFile.getAbsolutePath()
- );
- } else {
- return;
- }
- }
- catch (IOException e) {
- // Only log here to try other locations as well.
- log.warn(e, "Failed to write segmentFile at [%s]", destFile);
- location.removeFile(segmentFile);
- }
- }
- }
- throw new ISE("Can't find location to handle segment[%s]", segment);
- }
-
private static String getPartitionFilePath(
String supervisorTaskId,
String subTaskId,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java
new file mode 100644
index 0000000..fcbdf9d
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker;
+
+import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.timeline.DataSegment;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * DataSegmentPusher used for storing intermediary data in local storage during data shuffle of native parallel
+ * indexing.
+ */
+public class ShuffleDataSegmentPusher implements DataSegmentPusher
+{
+ private final String supervisorTaskId;
+ private final String subTaskId;
+ private final IntermediaryDataManager intermediaryDataManager;
+
+ public ShuffleDataSegmentPusher(
+ String supervisorTaskId,
+ String subTaskId,
+ IntermediaryDataManager intermediaryDataManager
+ )
+ {
+ this.supervisorTaskId = supervisorTaskId;
+ this.subTaskId = subTaskId;
+ this.intermediaryDataManager = intermediaryDataManager;
+ }
+
+ @Override
+ public String getPathForHadoop(String dataSource)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getPathForHadoop()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException
+ {
+ final long unzippedSize = intermediaryDataManager.addSegment(supervisorTaskId, subTaskId, segment, file);
+ return segment.withSize(unzippedSize)
+ .withBinaryVersion(SegmentUtils.getVersionFromDir(file));
+ }
+
+ @Override
+ public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
+ {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java
index c158c0e..6a5f611 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.indexing.worker;
-import com.amazonaws.util.StringUtils;
import com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils;
import org.apache.druid.client.indexing.IndexingServiceClient;
@@ -43,6 +42,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -116,19 +116,20 @@ public class IntermediaryDataManagerAutoCleanupTest
{
final String supervisorTaskId = "supervisorTaskId";
final Interval interval = Intervals.of("2018/2019");
- final File segmentFile = generateSegmentFile();
+ final File segmentFile = generateSegmentDir("test");
final DataSegment segment = newSegment(interval, 0);
intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile);
- Thread.sleep(8000);
+ Thread.sleep(3000);
Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, 0).isEmpty());
}
- private File generateSegmentFile() throws IOException
+ private File generateSegmentDir(String fileName) throws IOException
{
- final File segmentFile = tempDir.newFile();
- FileUtils.write(segmentFile, "test data.", StringUtils.UTF8);
- return segmentFile;
+ // Each file size is 138 bytes after compression
+ final File segmentDir = tempDir.newFolder();
+ FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8);
+ return segmentDir;
}
private DataSegment newSegment(Interval interval, int partitionId)
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
index 7bd9e90..7d322d0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.indexing.worker;
-import com.amazonaws.util.StringUtils;
import com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils;
import org.apache.druid.client.indexing.IndexingServiceClient;
@@ -41,6 +40,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Comparator;
import java.util.List;
@@ -67,7 +67,7 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
false,
null,
null,
- ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 150L, null))
+ ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 600L, null))
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
@@ -83,15 +83,16 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
@Test
public void testAddSegmentFailure() throws IOException
{
- for (int i = 0; i < 15; i++) {
- File segmentFile = generateSegmentFile();
+ int i = 0;
+ for (; i < 4; i++) {
+ File segmentFile = generateSegmentDir("file_" + i);
DataSegment segment = newSegment(Intervals.of("2018/2019"), i);
intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
}
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Can't find location to handle segment");
- File segmentFile = generateSegmentFile();
- DataSegment segment = newSegment(Intervals.of("2018/2019"), 16);
+ File segmentFile = generateSegmentDir("file_" + i);
+ DataSegment segment = newSegment(Intervals.of("2018/2019"), 4);
intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
}
@@ -101,15 +102,15 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
final String supervisorTaskId = "supervisorTaskId";
final Interval interval = Intervals.of("2018/2019");
final int partitionId = 0;
- for (int i = 0; i < 10; i++) {
- final File segmentFile = generateSegmentFile();
+ for (int i = 0; i < 4; i++) {
+ final File segmentFile = generateSegmentDir("file_" + i);
final DataSegment segment = newSegment(interval, partitionId);
intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + i, segment, segmentFile);
}
final List<File> files = intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId);
- Assert.assertEquals(10, files.size());
+ Assert.assertEquals(4, files.size());
files.sort(Comparator.comparing(File::getName));
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 4; i++) {
Assert.assertEquals("subTaskId_" + i, files.get(i).getName());
}
}
@@ -119,9 +120,9 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
{
final String supervisorTaskId = "supervisorTaskId";
final Interval interval = Intervals.of("2018/2019");
- for (int partitionId = 0; partitionId < 5; partitionId++) {
- for (int subTaskId = 0; subTaskId < 3; subTaskId++) {
- final File segmentFile = generateSegmentFile();
+ for (int partitionId = 0; partitionId < 2; partitionId++) {
+ for (int subTaskId = 0; subTaskId < 2; subTaskId++) {
+ final File segmentFile = generateSegmentDir("file_" + partitionId + "_" + subTaskId);
final DataSegment segment = newSegment(interval, partitionId);
intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + subTaskId, segment, segmentFile);
}
@@ -129,7 +130,7 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
intermediaryDataManager.deletePartitions(supervisorTaskId);
- for (int partitionId = 0; partitionId < 5; partitionId++) {
+ for (int partitionId = 0; partitionId < 2; partitionId++) {
Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId).isEmpty());
}
}
@@ -139,22 +140,24 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
{
final String supervisorTaskId = "supervisorTaskId";
final Interval interval = Intervals.of("2018/2019");
- for (int i = 0; i < 15; i++) {
- File segmentFile = generateSegmentFile();
+ int i = 0;
+ for (; i < 4; i++) {
+ File segmentFile = generateSegmentDir("file_" + i);
DataSegment segment = newSegment(interval, i);
intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
}
intermediaryDataManager.deletePartitions(supervisorTaskId);
- File segmentFile = generateSegmentFile();
- DataSegment segment = newSegment(interval, 16);
+ File segmentFile = generateSegmentDir("file_" + i);
+ DataSegment segment = newSegment(interval, i);
intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile);
}
- private File generateSegmentFile() throws IOException
+ private File generateSegmentDir(String fileName) throws IOException
{
- final File segmentFile = tempDir.newFile();
- FileUtils.write(segmentFile, "test data.", StringUtils.UTF8);
- return segmentFile;
+ // Each file size is 138 bytes after compression
+ final File segmentDir = tempDir.newFolder();
+ FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8);
+ return segmentDir;
}
private DataSegment newSegment(Interval interval, int partitionId)
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java
new file mode 100644
index 0000000..c3dfcdd
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+import com.google.common.primitives.Ints;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.client.indexing.NoopIndexingServiceClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.FileUtils.FileCopyResult;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.utils.CompressionUtils;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+public class ShuffleDataSegmentPusherTest
+{
+ @Rule
+ public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private IntermediaryDataManager intermediaryDataManager;
+ private ShuffleDataSegmentPusher segmentPusher;
+
+ @Before
+ public void setup() throws IOException
+ {
+ final WorkerConfig workerConfig = new WorkerConfig();
+ final TaskConfig taskConfig = new TaskConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ false,
+ null,
+ null,
+ ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null))
+ );
+ final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
+ intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
+ intermediaryDataManager.start();
+ segmentPusher = new ShuffleDataSegmentPusher("supervisorTaskId", "subTaskId", intermediaryDataManager);
+ }
+
+ @After
+ public void teardown() throws InterruptedException
+ {
+ intermediaryDataManager.stop();
+ }
+
+ @Test
+ public void testPush() throws IOException
+ {
+ final File segmentDir = generateSegmentDir();
+ final DataSegment segment = newSegment(Intervals.of("2018/2019"));
+ final DataSegment pushed = segmentPusher.push(segmentDir, segment, true);
+
+ Assert.assertEquals(9, pushed.getBinaryVersion().intValue());
+ Assert.assertEquals(14, pushed.getSize()); // 10 bytes data + 4 bytes version
+
+ final List<File> files = intermediaryDataManager.findPartitionFiles(
+ "supervisorTaskId",
+ segment.getInterval(),
+ segment.getShardSpec().getPartitionNum()
+ );
+ Assert.assertEquals(1, files.size());
+ final File zippedSegment = files.get(0);
+ final File tempDir = temporaryFolder.newFolder();
+ final FileCopyResult result = CompressionUtils.unzip(zippedSegment, tempDir);
+ final List<File> unzippedFiles = new ArrayList<>(result.getFiles());
+ unzippedFiles.sort(Comparator.comparing(File::getName));
+ final File dataFile = unzippedFiles.get(0);
+ Assert.assertEquals("test", dataFile.getName());
+ Assert.assertEquals("test data.", Files.readFirstLine(dataFile, StandardCharsets.UTF_8));
+ final File versionFile = unzippedFiles.get(1);
+ Assert.assertEquals("version.bin", versionFile.getName());
+ Assert.assertArrayEquals(Ints.toByteArray(0x9), Files.toByteArray(versionFile));
+ }
+
+ private File generateSegmentDir() throws IOException
+ {
+ // Each file size is 138 bytes after compression
+ final File segmentDir = temporaryFolder.newFolder();
+ Files.asByteSink(new File(segmentDir, "version.bin")).write(Ints.toByteArray(0x9));
+ FileUtils.write(new File(segmentDir, "test"), "test data.", StandardCharsets.UTF_8);
+ return segmentDir;
+ }
+
+ private DataSegment newSegment(Interval interval)
+ {
+ return new DataSegment(
+ "dataSource",
+ interval,
+ "version",
+ null,
+ null,
+ null,
+ new NumberedShardSpec(0, 0),
+ 9,
+ 0
+ );
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
index 842295e..6ed86a3 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
@@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.FileUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.SegmentId;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
@@ -115,7 +114,7 @@ public class StorageLocation
@Nullable
public synchronized File reserve(String segmentDir, DataSegment segment)
{
- return reserve(segmentDir, segment.getId(), segment.getSize());
+ return reserve(segmentDir, segment.getId().toString(), segment.getSize());
}
/**
@@ -124,7 +123,7 @@ public class StorageLocation
* Returns null otherwise.
*/
@Nullable
- public synchronized File reserve(String segmentFilePathToAdd, SegmentId segmentId, long segmentSize)
+ public synchronized File reserve(String segmentFilePathToAdd, String segmentId, long segmentSize)
{
final File segmentFileToAdd = new File(path, segmentFilePathToAdd);
if (files.contains(segmentFileToAdd)) {
@@ -145,7 +144,7 @@ public class StorageLocation
*/
@VisibleForTesting
@GuardedBy("this")
- boolean canHandle(SegmentId segmentId, long segmentSize)
+ boolean canHandle(String segmentId, long segmentSize)
{
if (availableSizeBytes() < segmentSize) {
log.warn(
diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java
index cb08217..f617261 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java
@@ -153,8 +153,6 @@ public class SegmentLoaderLocalCacheManagerConcurrencyTest
for (Future future : futures) {
future.get();
}
-
- System.out.println(manager.getLocations().get(0).availableSizeBytes());
}
private DataSegment newSegment(Interval interval, int partitionId)
diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
index 23da422..7be5328 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
@@ -39,18 +39,18 @@ public class StorageLocationTest
{
// free space ignored only maxSize matters
StorageLocation locationPlain = fakeLocation(100_000, 5_000, 10_000, null);
- Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013"), 9_000));
- Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013"), 11_000));
+ Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 9_000));
+ Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 11_000));
// enough space available maxSize is the limit
StorageLocation locationFree = fakeLocation(100_000, 25_000, 10_000, 10.0);
- Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013"), 9_000));
- Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013"), 11_000));
+ Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013").toString(), 9_000));
+ Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013").toString(), 11_000));
// disk almost full percentage is the limit
StorageLocation locationFull = fakeLocation(100_000, 15_000, 10_000, 10.0);
- Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013"), 4_000));
- Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013"), 6_000));
+ Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013").toString(), 4_000));
+ Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013").toString(), 6_000));
}
private StorageLocation fakeLocation(long total, long free, long max, Double percent)
@@ -99,7 +99,7 @@ public class StorageLocationTest
{
Assert.assertEquals(maxSize, loc.availableSizeBytes());
for (int i = 0; i <= maxSize; ++i) {
- Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014"), i));
+ Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014").toString(), i));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org