You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/07/18 21:46:56 UTC
[incubator-druid] branch master updated: Add intermediary data
server for shuffle (#8088)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new c7eb7cd Add intermediary data server for shuffle (#8088)
c7eb7cd is described below
commit c7eb7cd01837c48914ba284d08b6096b47c957b0
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Thu Jul 18 14:46:47 2019 -0700
Add intermediary data server for shuffle (#8088)
* Add intermediary data server for shuffle
* javadoc
* adjust timeout
* resolved todo
* fix test
* style
* address comments
* rename to shuffleDataLocations
* Address comments
* bit adjustment StorageLocation
* fix test
* address comment & fix test
* handle interrupted exception
---
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 1 +
.../indexing/kinesis/KinesisIndexTaskTest.java | 1 +
.../indexing/common/SegmentLoaderFactory.java | 2 +-
.../druid/indexing/common/config/TaskConfig.java | 25 +-
.../indexing/overlord/http/OverlordResource.java | 4 +-
.../indexing/worker/IntermediaryDataManager.java | 331 +++++++++++++++++++++
.../druid/indexing/worker/config/WorkerConfig.java | 31 +-
.../indexing/worker/http/ShuffleResource.java | 122 ++++++++
.../druid/indexing/common/TaskToolboxTest.java | 2 +-
.../AppenderatorDriverRealtimeIndexTaskTest.java | 2 +-
.../common/task/CompactionTaskRunTest.java | 11 +-
.../druid/indexing/common/task/HadoopTaskTest.java | 1 +
.../druid/indexing/common/task/IndexTaskTest.java | 2 +-
.../common/task/RealtimeIndexTaskTest.java | 2 +-
.../firehose/IngestSegmentFirehoseFactoryTest.java | 24 +-
.../IngestSegmentFirehoseFactoryTimelineTest.java | 4 +-
.../overlord/SingleTaskBackgroundRunnerTest.java | 1 +
.../druid/indexing/overlord/TaskLifecycleTest.java | 2 +-
.../IntermediaryDataManagerAutoCleanupTest.java | 148 +++++++++
...ermediaryDataManagerManualAddAndDeleteTest.java | 174 +++++++++++
.../indexing/worker/WorkerTaskManagerTest.java | 1 +
.../indexing/worker/WorkerTaskMonitorTest.java | 1 +
.../client/indexing/HttpIndexingServiceClient.java | 21 ++
.../client/indexing/IndexingServiceClient.java | 3 +
.../druid/client/indexing/TaskStatusResponse.java | 2 +-
.../loading/SegmentLoaderLocalCacheManager.java | 7 +-
.../druid/segment/loading/StorageLocation.java | 55 +++-
.../segment/loading/StorageLocationConfig.java | 56 ++--
.../client/indexing/NoopIndexingServiceClient.java | 7 +
.../SegmentLoaderLocalCacheManagerTest.java | 36 +--
.../druid/segment/loading/StorageLocationTest.java | 12 +-
.../server/SegmentManagerThreadSafetyTest.java | 2 +-
.../org/apache/druid/cli/CliMiddleManager.java | 5 +-
33 files changed, 989 insertions(+), 109 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index edf4c22..3fd6511 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2562,6 +2562,7 @@ public class KafkaIndexTaskTest
null,
true,
null,
+ null,
null
);
final TestDerbyConnector derbyConnector = derby.getConnector();
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index bb83c08..4fc7ba1 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -2747,6 +2747,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
null,
true,
null,
+ null,
null
);
final TestDerbyConnector derbyConnector = derby.getConnector();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java
index 83fa9db..17b8dc1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java
@@ -54,7 +54,7 @@ public class SegmentLoaderFactory
return new SegmentLoaderLocalCacheManager(
indexIO,
new SegmentLoaderConfig().withLocations(
- Collections.singletonList(new StorageLocationConfig().setPath(storageDir))),
+ Collections.singletonList(new StorageLocationConfig(storageDir, null, null))),
jsonMapper
);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
index 94a7c7d..31405fe 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
@@ -22,10 +22,13 @@ package org.apache.druid.indexing.common.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.segment.loading.StorageLocationConfig;
import org.joda.time.Period;
+import javax.annotation.Nullable;
import java.io.File;
import java.nio.file.Paths;
+import java.util.Collections;
import java.util.List;
public class TaskConfig
@@ -35,7 +38,6 @@ public class TaskConfig
);
private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M");
-
private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M");
@JsonProperty
@@ -62,6 +64,9 @@ public class TaskConfig
@JsonProperty
private final Period directoryLockTimeout;
+ @JsonProperty
+ private final List<StorageLocationConfig> shuffleDataLocations;
+
@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
@@ -71,7 +76,8 @@ public class TaskConfig
@JsonProperty("defaultHadoopCoordinates") List<String> defaultHadoopCoordinates,
@JsonProperty("restoreTasksOnRestart") boolean restoreTasksOnRestart,
@JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout,
- @JsonProperty("directoryLockTimeout") Period directoryLockTimeout
+ @JsonProperty("directoryLockTimeout") Period directoryLockTimeout,
+ @JsonProperty("shuffleDataLocations") List<StorageLocationConfig> shuffleDataLocations
)
{
this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir;
@@ -89,6 +95,13 @@ public class TaskConfig
this.directoryLockTimeout = directoryLockTimeout == null
? DEFAULT_DIRECTORY_LOCK_TIMEOUT
: directoryLockTimeout;
+ if (shuffleDataLocations == null) {
+ this.shuffleDataLocations = Collections.singletonList(
+ new StorageLocationConfig(new File(defaultDir(null, "intermediary-segments")), null, null)
+ );
+ } else {
+ this.shuffleDataLocations = shuffleDataLocations;
+ }
}
@JsonProperty
@@ -154,7 +167,13 @@ public class TaskConfig
return directoryLockTimeout;
}
- private String defaultDir(String configParameter, final String defaultVal)
+ @JsonProperty
+ public List<StorageLocationConfig> getShuffleDataLocations()
+ {
+ return shuffleDataLocations;
+ }
+
+ private String defaultDir(@Nullable String configParameter, final String defaultVal)
{
if (configParameter == null) {
return Paths.get(getBaseDir(), defaultVal).toString();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index 7ea5c8e..b153958 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -372,9 +372,7 @@ public class OverlordResource
@Path("/taskStatus")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(StateResourceFilter.class)
- public Response getMultipleTaskStatuses(
- Set<String> taskIds
- )
+ public Response getMultipleTaskStatuses(Set<String> taskIds)
{
if (taskIds == null || taskIds.size() == 0) {
return Response.status(Response.Status.BAD_REQUEST).entity("No TaskIds provided.").build();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
new file mode 100644
index 0000000..bfd202e
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker;
+
+import com.google.common.collect.Iterators;
+import com.google.common.io.Files;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.client.indexing.TaskStatus;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.loading.StorageLocation;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * This class manages intermediary segments for data shuffle between native parallel index tasks.
+ * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers
+ * and phase 2 tasks read those files via HTTP.
+ *
+ * The directory where segment files are placed is structured as
+ * {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/partitionIdOfSegment.
+ *
+ * This class provides interfaces to store, find, and remove segment files.
+ * It also has a self-cleanup mechanism to clean up stale segment files. It periodically checks the last access time
+ * per supervisorTask and removes its all segment files if the supervisorTask is not running anymore.
+ */
+@ManageLifecycle
+public class IntermediaryDataManager
+{
+ private static final Logger log = new Logger(IntermediaryDataManager.class);
+
+ private final long intermediaryPartitionDiscoveryPeriodSec;
+ private final long intermediaryPartitionCleanupPeriodSec;
+ private final Period intermediaryPartitionTimeout;
+ private final List<StorageLocation> shuffleDataLocations;
+ private final IndexingServiceClient indexingServiceClient;
+
+ // supervisorTaskId -> time to check supervisorTask status
+ // This time is initialized when a new supervisorTask is found and updated whenever a partition is accessed for
+ // the supervisor.
+ private final ConcurrentHashMap<String, DateTime> supervisorTaskCheckTimes = new ConcurrentHashMap<>();
+
+ // supervisorTaskId -> cyclic iterator of storage locations
+ private final Map<String, Iterator<StorageLocation>> locationIterators = new HashMap<>();
+
+ // The overlord is supposed to send a cleanup request as soon as the supervisorTask is finished in parallel indexing,
+ // but middleManager or indexer could miss the request. This executor is to automatically clean up unused intermediary
+ // partitions.
+ // This can be null until IntermediaryDataManager is started.
+ @Nullable
+ private ScheduledExecutorService supervisorTaskChecker;
+
+ @Inject
+ public IntermediaryDataManager(
+ WorkerConfig workerConfig,
+ TaskConfig taskConfig,
+ IndexingServiceClient indexingServiceClient
+ )
+ {
+ this.intermediaryPartitionDiscoveryPeriodSec = workerConfig.getIntermediaryPartitionDiscoveryPeriodSec();
+ this.intermediaryPartitionCleanupPeriodSec = workerConfig.getIntermediaryPartitionCleanupPeriodSec();
+ this.intermediaryPartitionTimeout = workerConfig.getIntermediaryPartitionTimeout();
+ this.shuffleDataLocations = taskConfig
+ .getShuffleDataLocations()
+ .stream()
+ .map(config -> new StorageLocation(config.getPath(), config.getMaxSize(), config.getFreeSpacePercent()))
+ .collect(Collectors.toList());
+ this.indexingServiceClient = indexingServiceClient;
+ }
+
+ @LifecycleStart
+ public void start()
+ {
+ supervisorTaskChecker = Execs.scheduledSingleThreaded("intermediary-data-manager-%d");
+ // Discover partitions for new supervisorTasks
+ supervisorTaskChecker.scheduleAtFixedRate(
+ () -> {
+ try {
+ discoverSupervisorTaskPartitions();
+ }
+ catch (Exception e) {
+ log.warn(e, "Error while discovering supervisorTasks");
+ }
+ },
+ intermediaryPartitionDiscoveryPeriodSec,
+ intermediaryPartitionDiscoveryPeriodSec,
+ TimeUnit.SECONDS
+ );
+
+ supervisorTaskChecker.scheduleAtFixedRate(
+ () -> {
+ try {
+ deleteExpiredSuprevisorTaskPartitionsIfNotRunning();
+ }
+ catch (InterruptedException e) {
+ log.error(e, "Error while cleaning up partitions for expired supervisors");
+ }
+ catch (Exception e) {
+ log.warn(e, "Error while cleaning up partitions for expired supervisors");
+ }
+ },
+ intermediaryPartitionCleanupPeriodSec,
+ intermediaryPartitionCleanupPeriodSec,
+ TimeUnit.SECONDS
+ );
+ }
+
+ @LifecycleStop
+ public void stop() throws InterruptedException
+ {
+ if (supervisorTaskChecker != null) {
+ supervisorTaskChecker.shutdownNow();
+ supervisorTaskChecker.awaitTermination(10, TimeUnit.SECONDS);
+ }
+ supervisorTaskCheckTimes.clear();
+ }
+
+ private void discoverSupervisorTaskPartitions()
+ {
+ for (StorageLocation location : shuffleDataLocations) {
+ final MutableInt numDiscovered = new MutableInt(0);
+ final File[] dirsPerSupervisorTask = location.getPath().listFiles();
+ if (dirsPerSupervisorTask != null) {
+ for (File supervisorTaskDir : dirsPerSupervisorTask) {
+ final String supervisorTaskId = supervisorTaskDir.getName();
+ supervisorTaskCheckTimes.computeIfAbsent(
+ supervisorTaskId,
+ k -> {
+ numDiscovered.increment();
+ return DateTimes.nowUtc().plus(intermediaryPartitionTimeout);
+ }
+ );
+ }
+ }
+ log.info("Discovered partitions for [%s] new supervisor tasks", numDiscovered.getValue());
+ }
+ }
+
+ /**
+ * Check supervisorTask status if its partitions have not been accessed in timeout and
+ * delete all partitions for the supervisorTask if it is already finished.
+ *
+ * Note that the overlord sends a cleanup request when a supervisorTask is finished. The below check is to trigger
+ * the self-cleanup for when the cleanup request is missing.
+ */
+ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws InterruptedException
+ {
+ final DateTime now = DateTimes.nowUtc();
+ final Set<String> expiredSupervisorTasks = new HashSet<>();
+ for (Entry<String, DateTime> entry : supervisorTaskCheckTimes.entrySet()) {
+ final String supervisorTaskId = entry.getKey();
+ final DateTime checkTime = entry.getValue();
+ if (checkTime.isAfter(now)) {
+ expiredSupervisorTasks.add(supervisorTaskId);
+ }
+ }
+
+ log.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size());
+
+ final Map<String, TaskStatus> taskStatuses = indexingServiceClient.getTaskStatuses(expiredSupervisorTasks);
+ for (Entry<String, TaskStatus> entry : taskStatuses.entrySet()) {
+ final String supervisorTaskId = entry.getKey();
+ final TaskStatus status = entry.getValue();
+ if (status.getStatusCode().isComplete()) {
+ // If it's finished, clean up all partitions for the supervisor task.
+ try {
+ deletePartitions(supervisorTaskId);
+ }
+ catch (IOException e) {
+ log.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId);
+ }
+ } else {
+ // If it's still running, update last access time.
+ supervisorTaskCheckTimes.put(supervisorTaskId, DateTimes.nowUtc());
+ }
+ }
+ }
+
+ /**
+ * Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per
+ * supervisorTaskId.
+ *
+ * This method is only useful for the new Indexer model. Tasks running in the existing middleManager should the static
+ * addSegment method.
+ */
+ public void addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentFile)
+ throws IOException
+ {
+ final Iterator<StorageLocation> iterator = locationIterators.computeIfAbsent(
+ supervisorTaskId,
+ k -> Iterators.cycle(shuffleDataLocations)
+ );
+ addSegment(iterator, shuffleDataLocations.size(), supervisorTaskId, subTaskId, segment, segmentFile);
+ }
+
+ public List<File> findPartitionFiles(String supervisorTaskId, Interval interval, int partitionId)
+ {
+ for (StorageLocation location : shuffleDataLocations) {
+ final File partitionDir = getPartitionDir(location, supervisorTaskId, interval, partitionId);
+ if (partitionDir.exists()) {
+ supervisorTaskCheckTimes.put(supervisorTaskId, DateTimes.nowUtc());
+ final File[] segmentFiles = partitionDir.listFiles();
+ return segmentFiles == null ? Collections.emptyList() : Arrays.asList(segmentFiles);
+ }
+ }
+
+ return Collections.emptyList();
+ }
+
+ public void deletePartitions(String supervisorTaskId) throws IOException
+ {
+ for (StorageLocation location : shuffleDataLocations) {
+ final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId);
+ if (supervisorTaskPath.exists()) {
+ log.info("Cleaning up [%s]", supervisorTaskPath);
+ for (File eachFile : FileUtils.listFiles(supervisorTaskPath, null, true)) {
+ location.removeFile(eachFile);
+ }
+ FileUtils.forceDelete(supervisorTaskPath);
+ }
+ }
+ supervisorTaskCheckTimes.remove(supervisorTaskId);
+ }
+
+ /**
+ * Iterate through the given storage locations to find one which can handle the given segment.
+ */
+ public static void addSegment(
+ Iterator<StorageLocation> cyclicIterator,
+ int numLocations,
+ String supervisorTaskId,
+ String subTaskId,
+ DataSegment segment,
+ File segmentFile
+ ) throws IOException
+ {
+ final StorageLocation location = findLocationForSegment(cyclicIterator, numLocations, segment);
+ final File destFile = new File(
+ getPartitionDir(location, supervisorTaskId, segment.getInterval(), segment.getShardSpec().getPartitionNum()),
+ subTaskId
+ );
+ FileUtils.forceMkdirParent(destFile);
+ final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile));
+ if (copiedBytes == 0) {
+ throw new IOE(
+ "0 bytes copied after copying a segment file from [%s] to [%s]",
+ segmentFile.getAbsolutePath(),
+ destFile.getAbsolutePath()
+ );
+ }
+ location.addFile(destFile);
+ }
+
+ private static StorageLocation findLocationForSegment(
+ Iterator<StorageLocation> cyclicIterator,
+ int numLocations,
+ DataSegment segment
+ )
+ {
+ for (int i = 0; i < numLocations; i++) {
+ final StorageLocation location = cyclicIterator.next();
+ if (location.canHandle(segment)) {
+ return location;
+ }
+ }
+ throw new ISE("Can't find location to handle segment[%s]", segment);
+ }
+
+ private static File getPartitionDir(
+ StorageLocation location,
+ String supervisorTaskId,
+ Interval interval,
+ int partitionId
+ )
+ {
+ return FileUtils.getFile(
+ location.getPath(),
+ supervisorTaskId,
+ interval.getStart().toString(),
+ interval.getEnd().toString(),
+ String.valueOf(partitionId)
+ );
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
index e574a0d..65d09e7 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.worker.config;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.server.DruidNode;
import org.apache.druid.utils.JvmUtils;
+import org.joda.time.Period;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
@@ -32,15 +33,24 @@ public class WorkerConfig
{
@JsonProperty
@NotNull
- private String ip = DruidNode.getDefaultHost();
+ private final String ip = DruidNode.getDefaultHost();
@JsonProperty
@NotNull
- private String version = "0";
+ private final String version = "0";
@JsonProperty
@Min(1)
- private int capacity = Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1);
+ private final int capacity = Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1);
+
+ @JsonProperty
+ private final long intermediaryPartitionDiscoveryPeriodSec = 60L;
+
+ @JsonProperty
+ private final long intermediaryPartitionCleanupPeriodSec = 300L;
+
+ @JsonProperty
+ private final Period intermediaryPartitionTimeout = new Period("P1D");
public String getIp()
{
@@ -56,4 +66,19 @@ public class WorkerConfig
{
return capacity;
}
+
+ public long getIntermediaryPartitionDiscoveryPeriodSec()
+ {
+ return intermediaryPartitionDiscoveryPeriodSec;
+ }
+
+ public long getIntermediaryPartitionCleanupPeriodSec()
+ {
+ return intermediaryPartitionCleanupPeriodSec;
+ }
+
+ public Period getIntermediaryPartitionTimeout()
+ {
+ return intermediaryPartitionTimeout;
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java
new file mode 100644
index 0000000..9bf197c
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.http;
+
+import com.google.common.io.ByteStreams;
+import com.google.inject.Inject;
+import com.sun.jersey.spi.container.ResourceFilters;
+import org.apache.druid.indexing.worker.IntermediaryDataManager;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.http.security.StateResourceFilter;
+import org.joda.time.Interval;
+
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * HTTP endpoints for shuffle system. The MiddleManager and Indexer use this resource to serve intermediary shuffle
+ * data.
+ *
+ * We use {@link StateResourceFilter} here because it performs an admin-like authorization and
+ * all endpoints here are supposed to be used for only internal communcation.
+ * Another possible alternate could be performing datasource-level authorization as in TaskResourceFilter.
+ * However, datasource information is not available in middleManagers or indexers yet which makes hard to use it.
+ * We could develop a new ResourceFileter in the future if needed.
+ */
+@Path("/druid/worker/v1/shuffle")
+@ResourceFilters(StateResourceFilter.class)
+public class ShuffleResource
+{
+ private static final Logger log = new Logger(ShuffleResource.class);
+
+ private final IntermediaryDataManager intermediaryDataManager;
+
+ @Inject
+ public ShuffleResource(IntermediaryDataManager intermediaryDataManager)
+ {
+ this.intermediaryDataManager = intermediaryDataManager;
+ }
+
+ @GET
+ @Path("/task/{supervisorTaskId}/partition")
+ @Produces(MediaType.APPLICATION_OCTET_STREAM)
+ public Response getPartition(
+ @PathParam("supervisorTaskId") String supervisorTaskId,
+ @QueryParam("startTime") String startTime,
+ @QueryParam("endTime") String endTime,
+ @QueryParam("partitionId") int partitionId
+ )
+ {
+ final Interval interval = new Interval(DateTimes.of(startTime), DateTimes.of(endTime));
+ final List<File> partitionFiles = intermediaryDataManager.findPartitionFiles(
+ supervisorTaskId,
+ interval,
+ partitionId
+ );
+
+ if (partitionFiles.isEmpty()) {
+ final String errorMessage = StringUtils.format(
+ "Can't find the partition for supervisor[%s], interval[%s], and partitionId[%s]",
+ supervisorTaskId,
+ interval,
+ partitionId
+ );
+ return Response.status(Status.NOT_FOUND).entity(errorMessage).build();
+ } else {
+ return Response.ok(
+ (StreamingOutput) output -> {
+ for (File partitionFile : partitionFiles) {
+ try (final FileInputStream fileInputStream = new FileInputStream(partitionFile)) {
+ ByteStreams.copy(fileInputStream, output);
+ }
+ }
+ }
+ ).build();
+ }
+ }
+
+ @DELETE
+ @Path("/task/{supervisorTaskId}")
+ public Response deletePartitions(@PathParam("supervisorTaskId") String supervisorTaskId)
+ {
+ try {
+ intermediaryDataManager.deletePartitions(supervisorTaskId);
+ return Response.ok(supervisorTaskId).build();
+ }
+ catch (IOException e) {
+ log.error(e, "Error while deleting partitions of supervisorTask[%s]", supervisorTaskId);
+ return Response.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build();
+ }
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
index 0966d1b..0575afe 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
@@ -95,7 +95,7 @@ public class TaskToolboxTest
EasyMock.replay(task, mockHandoffNotifierFactory);
taskToolbox = new TaskToolboxFactory(
- new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null),
+ new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null, null),
mockTaskActionClientFactory,
mockEmitter,
mockSegmentPusher,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index d675069..7628dea 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -1519,7 +1519,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
return result;
}
};
- final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null);
+ final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null, null);
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
taskLockbox,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index f157513..9e5ab4c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -346,16 +346,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
@Override
public List<StorageLocationConfig> getLocations()
{
- return ImmutableList.of(
- new StorageLocationConfig()
- {
- @Override
- public File getPath()
- {
- return deepStorageDir;
- }
- }
- );
+ return ImmutableList.of(new StorageLocationConfig(deepStorageDir, null, null));
}
},
objectMapper
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
index c77f063..64d3dd8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
@@ -78,6 +78,7 @@ public class HadoopTaskTest
ImmutableList.of("something:hadoop:1"),
false,
null,
+ null,
null
)).once();
EasyMock.replay(toolbox);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 4bae4c8..a8f02e6 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -204,7 +204,7 @@ public class IndexTaskTest
public List<StorageLocationConfig> getLocations()
{
return Collections.singletonList(
- new StorageLocationConfig().setPath(cacheDir)
+ new StorageLocationConfig(cacheDir, null, null)
);
}
},
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index daeeaab..316327b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -881,7 +881,7 @@ public class RealtimeIndexTaskTest
final File directory
)
{
- final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null);
+ final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null, null);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage);
try {
taskStorage.insert(task, TaskStatus.running(task.getId()));
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
index 70e5544..f17274a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
@@ -84,10 +84,14 @@ import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.easymock.EasyMock;
import org.joda.time.Interval;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -115,6 +119,9 @@ public class IngestSegmentFirehoseFactoryTest
private static final TaskLockbox TASK_LOCKBOX;
private static final Task TASK;
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
static {
TestUtils testUtils = new TestUtils();
MAPPER = setupInjectablesInObjectMapper(TestHelper.makeJsonMapper());
@@ -299,6 +306,7 @@ public class IngestSegmentFirehoseFactoryTest
private final FirehoseFactory<InputRowParser> factory;
private final InputRowParser rowParser;
+ private File tempDir;
private static final InputRowParser<Map<String, Object>> ROW_PARSER = new MapInputRowParser(
new TimeAndDimsParseSpec(
@@ -376,6 +384,18 @@ public class IngestSegmentFirehoseFactoryTest
}
}
+ @Before
+ public void setup() throws IOException
+ {
+ tempDir = temporaryFolder.newFolder();
+ }
+
+ @After
+ public void teardown()
+ {
+ tempDir.delete();
+ }
+
@Test
public void sanityTest()
{
@@ -402,7 +422,7 @@ public class IngestSegmentFirehoseFactoryTest
{
Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size());
Integer rowcount = 0;
- try (final Firehose firehose = factory.connect(rowParser, null)) {
+ try (final Firehose firehose = factory.connect(rowParser, tmpDir)) {
while (firehose.hasMore()) {
InputRow row = firehose.nextRow();
Assert.assertArrayEquals(new String[]{DIM_NAME}, row.getDimensions().toArray());
@@ -432,7 +452,7 @@ public class IngestSegmentFirehoseFactoryTest
);
int skipped = 0;
try (final Firehose firehose =
- factory.connect(transformSpec.decorate(rowParser), null)) {
+ factory.connect(transformSpec.decorate(rowParser), tmpDir)) {
while (firehose.hasMore()) {
InputRow row = firehose.nextRow();
if (row == null) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
index 119097b..4944d27 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
@@ -148,7 +148,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
int count = 0;
long sum = 0;
- try (final Firehose firehose = factory.connect(ROW_PARSER, null)) {
+ try (final Firehose firehose = factory.connect(ROW_PARSER, tmpDir)) {
while (firehose.hasMore()) {
final InputRow row = firehose.nextRow();
count++;
@@ -176,7 +176,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
for (InputSplit<List<WindowedSegmentId>> split : splits) {
final FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>> splitFactory =
factory.withSplit(split);
- try (final Firehose firehose = splitFactory.connect(ROW_PARSER, null)) {
+ try (final Firehose firehose = splitFactory.connect(ROW_PARSER, tmpDir)) {
while (firehose.hasMore()) {
final InputRow row = firehose.nextRow();
count++;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 49315d3..a16a80a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -76,6 +76,7 @@ public class SingleTaskBackgroundRunnerTest
null,
true,
null,
+ null,
null
);
final ServiceEmitter emitter = new NoopServiceEmitter();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index d8c0af3..8139f8d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -540,7 +540,7 @@ public class TaskLifecycleTest
new TaskAuditLogConfig(true)
);
File tmpDir = temporaryFolder.newFolder();
- taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null);
+ taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null);
SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
{
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java
new file mode 100644
index 0000000..c158c0e
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker;
+
+import com.amazonaws.util.StringUtils;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.client.indexing.NoopIndexingServiceClient;
+import org.apache.druid.client.indexing.TaskStatus;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class IntermediaryDataManagerAutoCleanupTest
+{
+ @Rule
+ public TemporaryFolder tempDir = new TemporaryFolder();
+
+ private IntermediaryDataManager intermediaryDataManager;
+
+ @Before
+ public void setup() throws IOException
+ {
+ final WorkerConfig workerConfig = new WorkerConfig()
+ {
+ @Override
+ public long getIntermediaryPartitionDiscoveryPeriodSec()
+ {
+ return 1;
+ }
+
+ @Override
+ public long getIntermediaryPartitionCleanupPeriodSec()
+ {
+ return 2;
+ }
+
+ @Override
+ public Period getIntermediaryPartitionTimeout()
+ {
+ return new Period("PT2S");
+ }
+
+ };
+ final TaskConfig taskConfig = new TaskConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ false,
+ null,
+ null,
+ ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null))
+ );
+ final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient()
+ {
+ @Override
+ public Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds)
+ {
+ final Map<String, TaskStatus> result = new HashMap<>();
+ for (String taskId : taskIds) {
+ result.put(taskId, new TaskStatus(taskId, TaskState.SUCCESS, 10));
+ }
+ return result;
+ }
+ };
+ intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
+ intermediaryDataManager.start();
+ }
+
+ @After
+ public void teardown() throws InterruptedException
+ {
+ intermediaryDataManager.stop();
+ }
+
+ @Test
+ public void testCleanup() throws IOException, InterruptedException
+ {
+ final String supervisorTaskId = "supervisorTaskId";
+ final Interval interval = Intervals.of("2018/2019");
+ final File segmentFile = generateSegmentFile();
+ final DataSegment segment = newSegment(interval, 0);
+ intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile);
+
+ Thread.sleep(8000);
+ Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, 0).isEmpty());
+ }
+
+ private File generateSegmentFile() throws IOException
+ {
+ final File segmentFile = tempDir.newFile();
+ FileUtils.write(segmentFile, "test data.", StringUtils.UTF8);
+ return segmentFile;
+ }
+
+ private DataSegment newSegment(Interval interval, int partitionId)
+ {
+ return new DataSegment(
+ "dataSource",
+ interval,
+ "version",
+ null,
+ null,
+ null,
+ new NumberedShardSpec(partitionId, 0),
+ 9,
+ 10
+ );
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
new file mode 100644
index 0000000..7bd9e90
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker;
+
+import com.amazonaws.util.StringUtils;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.client.indexing.NoopIndexingServiceClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+
+public class IntermediaryDataManagerManualAddAndDeleteTest
+{
+ @Rule
+ public TemporaryFolder tempDir = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private IntermediaryDataManager intermediaryDataManager;
+
+ @Before
+ public void setup() throws IOException
+ {
+ final WorkerConfig workerConfig = new WorkerConfig();
+ final TaskConfig taskConfig = new TaskConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ false,
+ null,
+ null,
+ ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 150L, null))
+ );
+ final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
+ intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
+ intermediaryDataManager.start();
+ }
+
+ @After
+ public void teardown() throws InterruptedException
+ {
+ intermediaryDataManager.stop();
+ }
+
+ @Test
+ public void testAddSegmentFailure() throws IOException
+ {
+ for (int i = 0; i < 15; i++) {
+ File segmentFile = generateSegmentFile();
+ DataSegment segment = newSegment(Intervals.of("2018/2019"), i);
+ intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
+ }
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Can't find location to handle segment");
+ File segmentFile = generateSegmentFile();
+ DataSegment segment = newSegment(Intervals.of("2018/2019"), 16);
+ intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
+ }
+
+ @Test
+ public void testFindPartitionFiles() throws IOException
+ {
+ final String supervisorTaskId = "supervisorTaskId";
+ final Interval interval = Intervals.of("2018/2019");
+ final int partitionId = 0;
+ for (int i = 0; i < 10; i++) {
+ final File segmentFile = generateSegmentFile();
+ final DataSegment segment = newSegment(interval, partitionId);
+ intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + i, segment, segmentFile);
+ }
+ final List<File> files = intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId);
+ Assert.assertEquals(10, files.size());
+ files.sort(Comparator.comparing(File::getName));
+ for (int i = 0; i < 10; i++) {
+ Assert.assertEquals("subTaskId_" + i, files.get(i).getName());
+ }
+ }
+
+ @Test
+ public void deletePartitions() throws IOException
+ {
+ final String supervisorTaskId = "supervisorTaskId";
+ final Interval interval = Intervals.of("2018/2019");
+ for (int partitionId = 0; partitionId < 5; partitionId++) {
+ for (int subTaskId = 0; subTaskId < 3; subTaskId++) {
+ final File segmentFile = generateSegmentFile();
+ final DataSegment segment = newSegment(interval, partitionId);
+ intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + subTaskId, segment, segmentFile);
+ }
+ }
+
+ intermediaryDataManager.deletePartitions(supervisorTaskId);
+
+ for (int partitionId = 0; partitionId < 5; partitionId++) {
+ Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId).isEmpty());
+ }
+ }
+
+ @Test
+ public void testAddRemoveAdd() throws IOException
+ {
+ final String supervisorTaskId = "supervisorTaskId";
+ final Interval interval = Intervals.of("2018/2019");
+ for (int i = 0; i < 15; i++) {
+ File segmentFile = generateSegmentFile();
+ DataSegment segment = newSegment(interval, i);
+ intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
+ }
+ intermediaryDataManager.deletePartitions(supervisorTaskId);
+ File segmentFile = generateSegmentFile();
+ DataSegment segment = newSegment(interval, 16);
+ intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile);
+ }
+
+ private File generateSegmentFile() throws IOException
+ {
+ final File segmentFile = tempDir.newFile();
+ FileUtils.write(segmentFile, "test data.", StringUtils.UTF8);
+ return segmentFile;
+ }
+
+ private DataSegment newSegment(Interval interval, int partitionId)
+ {
+ return new DataSegment(
+ "dataSource",
+ interval,
+ "version",
+ null,
+ null,
+ null,
+ new NumberedShardSpec(partitionId, 0),
+ 9,
+ 10
+ );
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index b86b654..635bf4a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -85,6 +85,7 @@ public class WorkerTaskManagerTest
null,
false,
null,
+ null,
null
);
TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index 4afdd3c..8c22aaa 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -153,6 +153,7 @@ public class WorkerTaskMonitorTest
null,
false,
null,
+ null,
null
);
TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);
diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index 7588da7..2b609c2 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -271,6 +271,27 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
}
@Override
+ public Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds) throws InterruptedException
+ {
+ try {
+ final FullResponseHolder responseHolder = druidLeaderClient.go(
+ druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/taskStatus")
+ .setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(taskIds))
+ );
+
+ return jsonMapper.readValue(
+ responseHolder.getContent(),
+ new TypeReference<Map<String, TaskStatus>>()
+ {
+ }
+ );
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
@Nullable
public TaskStatusPlus getLastCompleteTask()
{
diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index f2299c9..39fd93f 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -27,6 +27,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public interface IndexingServiceClient
{
@@ -55,6 +56,8 @@ public interface IndexingServiceClient
TaskStatusResponse getTaskStatus(String taskId);
+ Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds) throws InterruptedException;
+
@Nullable
TaskStatusPlus getLastCompleteTask();
diff --git a/server/src/main/java/org/apache/druid/client/indexing/TaskStatusResponse.java b/server/src/main/java/org/apache/druid/client/indexing/TaskStatusResponse.java
index 08a8519..247ecabd 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/TaskStatusResponse.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/TaskStatusResponse.java
@@ -33,7 +33,7 @@ public class TaskStatusResponse
{
private final String task; // Task ID, named "task" in the JSONification of this class.
@Nullable
- private final TaskStatusPlus status;
+ private final TaskStatusPlus status; // null for unknown tasks
@JsonCreator
public TaskStatusResponse(
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
index 327db5d..b25c216 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
@@ -163,8 +163,9 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
if (loc == null) {
loc = loadSegmentWithRetry(segment, storageDir);
}
- loc.addSegment(segment);
- return new File(loc.getPath(), storageDir);
+ final File localStorageDir = new File(loc.getPath(), storageDir);
+ loc.addSegmentDir(localStorageDir, segment);
+ return localStorageDir;
}
finally {
unlock(segment, lock);
@@ -270,7 +271,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
// Druid creates folders of the form dataSource/interval/version/partitionNum.
// We need to clean up all these directories if they are all empty.
cleanupCacheFiles(location.getPath(), localStorageDir);
- location.removeSegment(segment);
+ location.removeSegmentDir(localStorageDir, segment);
}
}
}
diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
index e91ffee..4fb4c4e 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
@@ -19,6 +19,8 @@
package org.apache.druid.segment.loading;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
@@ -29,18 +31,18 @@ import java.util.Set;
/**
*/
-class StorageLocation
+public class StorageLocation
{
private static final Logger log = new Logger(StorageLocation.class);
private final File path;
private final long maxSize;
private final long freeSpaceToKeep;
- private final Set<DataSegment> segments;
+ private final Set<File> files = new HashSet<>();
private volatile long currSize = 0;
- StorageLocation(File path, long maxSize, @Nullable Double freeSpacePercent)
+ public StorageLocation(File path, long maxSize, @Nullable Double freeSpacePercent)
{
this.path = path;
this.maxSize = maxSize;
@@ -57,35 +59,62 @@ class StorageLocation
} else {
this.freeSpaceToKeep = 0;
}
-
- this.segments = new HashSet<>();
}
- File getPath()
+ public File getPath()
{
return path;
}
- long getMaxSize()
+ public long getMaxSize()
{
return maxSize;
}
- synchronized void addSegment(DataSegment segment)
+ /**
+ * Add a new file to this location. The given file argument must be a file rather than directory.
+ */
+ public synchronized void addFile(File file)
+ {
+ if (file.isDirectory()) {
+ throw new ISE("[%s] must be a file. Use a");
+ }
+ if (files.add(file)) {
+ currSize += FileUtils.sizeOf(file);
+ }
+ }
+
+ /**
+ * Add a new segment dir to this location. The segment size is added to currSize.
+ */
+ public synchronized void addSegmentDir(File segmentDir, DataSegment segment)
{
- if (segments.add(segment)) {
+ if (files.add(segmentDir)) {
currSize += segment.getSize();
}
}
- synchronized void removeSegment(DataSegment segment)
+ /**
+ * Remove a segment file from this location. The given file argument must be a file rather than directory.
+ */
+ public synchronized void removeFile(File file)
+ {
+ if (files.remove(file)) {
+ currSize -= FileUtils.sizeOf(file);
+ }
+ }
+
+ /**
+ * Remove a segment dir from this location. The segment size is subtracted from currSize.
+ */
+ public synchronized void removeSegmentDir(File segmentDir, DataSegment segment)
{
- if (segments.remove(segment)) {
+ if (files.remove(segmentDir)) {
currSize -= segment.getSize();
}
}
- boolean canHandle(DataSegment segment)
+ public boolean canHandle(DataSegment segment)
{
if (available() < segment.getSize()) {
log.warn(
@@ -114,7 +143,7 @@ class StorageLocation
return true;
}
- synchronized long available()
+ public synchronized long available()
{
return maxSize - currSize;
}
diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationConfig.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationConfig.java
index 8877940..bbfd7ed 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationConfig.java
@@ -19,66 +19,66 @@
package org.apache.druid.segment.loading;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
-import javax.validation.constraints.Min;
-import javax.validation.constraints.NotNull;
+import javax.annotation.Nullable;
import java.io.File;
/**
*/
public class StorageLocationConfig
{
- @JsonProperty
- @NotNull
- private File path = null;
+ private final File path;
+ private final long maxSize;
+ @Nullable
+ private final Double freeSpacePercent;
- @JsonProperty
- @Min(1)
- private long maxSize = Long.MAX_VALUE;
+ @JsonCreator
+ public StorageLocationConfig(
+ @JsonProperty("path") File path,
+ @JsonProperty("maxSize") @Nullable Long maxSize,
+ @JsonProperty("freeSpacePercent") @Nullable Double freeSpacePercent
+ )
+ {
+ this.path = Preconditions.checkNotNull(path, "path");
+ this.maxSize = maxSize == null ? Long.MAX_VALUE : maxSize;
+ this.freeSpacePercent = freeSpacePercent;
+ Preconditions.checkArgument(this.maxSize > 0, "maxSize[%s] should be positive", this.maxSize);
+ Preconditions.checkArgument(
+ this.freeSpacePercent == null || this.freeSpacePercent >= 0,
+ "freeSpacePercent[%s] should be 0 or a positive double",
+ this.freeSpacePercent
+ );
+ }
@JsonProperty
- private Double freeSpacePercent;
-
public File getPath()
{
return path;
}
- public StorageLocationConfig setPath(File path)
- {
- this.path = path;
- return this;
- }
-
+ @JsonProperty
public long getMaxSize()
{
return maxSize;
}
- public StorageLocationConfig setMaxSize(long maxSize)
- {
- this.maxSize = maxSize;
- return this;
- }
-
+ @JsonProperty
+ @Nullable
public Double getFreeSpacePercent()
{
return freeSpacePercent;
}
- public StorageLocationConfig setFreeSpacePercent(Double freeSpacePercent)
- {
- this.freeSpacePercent = freeSpacePercent;
- return this;
- }
-
@Override
public String toString()
{
return "StorageLocationConfig{" +
"path=" + path +
", maxSize=" + maxSize +
+ ", freeSpacePercent=" + freeSpacePercent +
'}';
}
}
diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index 8908173..794ea08 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -28,6 +28,7 @@ import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class NoopIndexingServiceClient implements IndexingServiceClient
{
@@ -85,6 +86,12 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
return new TaskStatusResponse(taskId, null);
}
+ @Override
+ public Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds)
+ {
+ return Collections.emptyMap();
+ }
+
@Nullable
@Override
public TaskStatusPlus getLastCompleteTask()
diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java
index 5a54f8a..cb38d7b 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java
@@ -88,9 +88,7 @@ public class SegmentLoaderLocalCacheManagerTest
localSegmentCacheFolder = tmpFolder.newFolder("segment_cache_folder");
final List<StorageLocationConfig> locations = new ArrayList<>();
- final StorageLocationConfig locationConfig = new StorageLocationConfig();
- locationConfig.setPath(localSegmentCacheFolder);
- locationConfig.setMaxSize(10000000000L);
+ final StorageLocationConfig locationConfig = new StorageLocationConfig(localSegmentCacheFolder, 10000000000L, null);
locations.add(locationConfig);
manager = new SegmentLoaderLocalCacheManager(
@@ -157,14 +155,10 @@ public class SegmentLoaderLocalCacheManagerTest
final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
final List<StorageLocationConfig> locations = new ArrayList<>();
- final StorageLocationConfig locationConfig = new StorageLocationConfig();
- locationConfig.setPath(localStorageFolder);
- locationConfig.setMaxSize(10000000000L);
+ final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, 10000000000L, null);
locations.add(locationConfig);
- final StorageLocationConfig locationConfig2 = new StorageLocationConfig();
final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2");
- locationConfig2.setPath(localStorageFolder2);
- locationConfig2.setMaxSize(1000000000L);
+ final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 1000000000L, null);
locations.add(locationConfig2);
manager = new SegmentLoaderLocalCacheManager(
@@ -207,17 +201,13 @@ public class SegmentLoaderLocalCacheManagerTest
public void testRetrySuccessAtSecondLocation() throws Exception
{
final List<StorageLocationConfig> locations = new ArrayList<>();
- final StorageLocationConfig locationConfig = new StorageLocationConfig();
final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
// mock can't write in first location
localStorageFolder.setWritable(false);
- locationConfig.setPath(localStorageFolder);
- locationConfig.setMaxSize(1000000000L);
+ final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, 1000000000L, null);
locations.add(locationConfig);
- final StorageLocationConfig locationConfig2 = new StorageLocationConfig();
final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2");
- locationConfig2.setPath(localStorageFolder2);
- locationConfig2.setMaxSize(10000000L);
+ final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10000000L, null);
locations.add(locationConfig2);
manager = new SegmentLoaderLocalCacheManager(
@@ -260,19 +250,15 @@ public class SegmentLoaderLocalCacheManagerTest
public void testRetryAllFail() throws Exception
{
final List<StorageLocationConfig> locations = new ArrayList<>();
- final StorageLocationConfig locationConfig = new StorageLocationConfig();
final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
// mock can't write in first location
localStorageFolder.setWritable(false);
- locationConfig.setPath(localStorageFolder);
- locationConfig.setMaxSize(1000000000L);
+ final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, 1000000000L, null);
locations.add(locationConfig);
- final StorageLocationConfig locationConfig2 = new StorageLocationConfig();
final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2");
// mock can't write in second location
localStorageFolder2.setWritable(false);
- locationConfig2.setPath(localStorageFolder2);
- locationConfig2.setMaxSize(10000000L);
+ final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10000000L, null);
locations.add(locationConfig2);
manager = new SegmentLoaderLocalCacheManager(
@@ -316,17 +302,13 @@ public class SegmentLoaderLocalCacheManagerTest
public void testEmptyToFullOrder() throws Exception
{
final List<StorageLocationConfig> locations = new ArrayList<>();
- final StorageLocationConfig locationConfig = new StorageLocationConfig();
final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
localStorageFolder.setWritable(true);
- locationConfig.setPath(localStorageFolder);
- locationConfig.setMaxSize(10L);
+ final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, 10L, null);
locations.add(locationConfig);
- final StorageLocationConfig locationConfig2 = new StorageLocationConfig();
final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2");
localStorageFolder2.setWritable(true);
- locationConfig2.setPath(localStorageFolder2);
- locationConfig2.setMaxSize(10L);
+ final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10L, null);
locations.add(locationConfig2);
manager = new SegmentLoaderLocalCacheManager(
diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
index cc6b6fe..cdfcd47 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
@@ -71,25 +71,25 @@ public class StorageLocationTest
final DataSegment secondSegment = makeSegment("2012-01-02/2012-01-03", 23);
- loc.addSegment(makeSegment("2012-01-01/2012-01-02", 10));
+ loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10));
expectedAvail -= 10;
verifyLoc(expectedAvail, loc);
- loc.addSegment(makeSegment("2012-01-01/2012-01-02", 10));
+ loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10));
verifyLoc(expectedAvail, loc);
- loc.addSegment(secondSegment);
+ loc.addSegmentDir(new File("test2"), secondSegment);
expectedAvail -= 23;
verifyLoc(expectedAvail, loc);
- loc.removeSegment(makeSegment("2012-01-01/2012-01-02", 10));
+ loc.removeSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10));
expectedAvail += 10;
verifyLoc(expectedAvail, loc);
- loc.removeSegment(makeSegment("2012-01-01/2012-01-02", 10));
+ loc.removeSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10));
verifyLoc(expectedAvail, loc);
- loc.removeSegment(secondSegment);
+ loc.removeSegmentDir(new File("test2"), secondSegment);
expectedAvail += 23;
verifyLoc(expectedAvail, loc);
}
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
index 649c8b2..fdd89d1 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
@@ -105,7 +105,7 @@ public class SegmentManagerThreadSafetyTest
public List<StorageLocationConfig> getLocations()
{
return Collections.singletonList(
- new StorageLocationConfig().setPath(segmentCacheDir)
+ new StorageLocationConfig(segmentCacheDir, null, null)
);
}
},
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index eeb8ae1..68d374c 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -29,6 +29,7 @@ import com.google.inject.multibindings.MapBinder;
import com.google.inject.name.Names;
import com.google.inject.util.Providers;
import io.airlift.airline.Command;
+import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.discovery.WorkerNodeService;
@@ -53,6 +54,7 @@ import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.WorkerCuratorCoordinator;
import org.apache.druid.indexing.worker.WorkerTaskMonitor;
import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.indexing.worker.http.ShuffleResource;
import org.apache.druid.indexing.worker.http.TaskManagementResource;
import org.apache.druid.indexing.worker.http.WorkerResource;
import org.apache.druid.java.util.common.logger.Logger;
@@ -101,7 +103,7 @@ public class CliMiddleManager extends ServerRunnable
binder.bind(TaskRunner.class).to(ForkingTaskRunner.class);
binder.bind(ForkingTaskRunner.class).in(LazySingleton.class);
- binder.bind(IndexingServiceClient.class).toProvider(Providers.of(null));
+ binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>() {})
.toProvider(Providers.of(null));
binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null));
@@ -129,6 +131,7 @@ public class CliMiddleManager extends ServerRunnable
.in(LazySingleton.class);
Jerseys.addResource(binder, WorkerResource.class);
Jerseys.addResource(binder, TaskManagementResource.class);
+ Jerseys.addResource(binder, ShuffleResource.class);
LifecycleModule.register(binder, Server.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org