You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/03/20 01:07:57 UTC
[incubator-druid] branch 0.14.0-incubating updated: Fix race in
historical when loading segments in parallel (#7203) (#7298)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
new e926121 Fix race in historical when loading segments in parallel (#7203) (#7298)
e926121 is described below
commit e9261215764307b9c18a1f3c61f40fa6056e4431
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Tue Mar 19 18:07:49 2019 -0700
Fix race in historical when loading segments in parallel (#7203) (#7298)
* Fix race in historical when loading segments in parallel
* revert unnecessary change
* remove synchronized
* add reference counting locking
* fix build
* fix comment
---
.../druid/segment/ReferenceCountingSegment.java | 5 +-
.../druid/segment/loading/SegmentLoader.java | 2 +
.../loading/SegmentLoaderLocalCacheManager.java | 188 +++++++++++----
.../org/apache/druid/server/SegmentManager.java | 6 +-
.../coordination/BatchDataSegmentAnnouncer.java | 9 +-
.../coordination/SegmentLoadDropHandler.java | 26 +-
.../server/coordinator/HttpLoadQueuePeon.java | 10 +-
.../segment/loading/CacheTestSegmentLoader.java | 1 -
.../apache/druid/server/SegmentManagerTest.java | 24 +-
.../server/SegmentManagerThreadSafetyTest.java | 268 +++++++++++++++++++++
.../coordination/SegmentLoadDropHandlerTest.java | 9 -
.../BatchDataSegmentAnnouncerTest.java | 110 ++++++++-
.../server/coordinator/HttpLoadQueuePeonTest.java | 26 --
13 files changed, 561 insertions(+), 123 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java
index 4f27a4f..ef98b75 100644
--- a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java
+++ b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java
@@ -19,7 +19,6 @@
package org.apache.druid.segment;
-import com.google.common.base.Preconditions;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
@@ -45,8 +44,10 @@ public class ReferenceCountingSegment extends AbstractSegment
@Override
protected boolean onAdvance(int phase, int registeredParties)
{
- Preconditions.checkState(registeredParties == 0);
// Ensure that onAdvance() doesn't throw exception, otherwise termination won't happen
+ if (registeredParties != 0) {
+ log.error("registeredParties[%s] is not 0", registeredParties);
+ }
try {
baseSegment.close();
}
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
index 3db4672..301e7370 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
@@ -25,6 +25,8 @@ import org.apache.druid.timeline.DataSegment;
import java.io.File;
/**
+ * Loading segments from deep storage to local storage.
+ * Implementations must be thread-safe.
*/
public interface SegmentLoader
{
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
index 68e8a20..fb7b143 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
@@ -20,10 +20,12 @@
package org.apache.druid.segment.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Longs;
import com.google.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.Segment;
@@ -32,15 +34,17 @@ import org.apache.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class SegmentLoaderLocalCacheManager implements SegmentLoader
{
private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class);
+ private static final Comparator<StorageLocation> COMPARATOR = (left, right) ->
+ Longs.compare(right.available(), left.available());
private final IndexIO indexIO;
private final SegmentLoaderConfig config;
@@ -48,15 +52,30 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
private final List<StorageLocation> locations;
- private final Object lock = new Object();
+ // This directoryWriteRemoveLock is used when creating or removing a directory
+ private final Object directoryWriteRemoveLock = new Object();
- private static final Comparator<StorageLocation> COMPARATOR = new Comparator<StorageLocation>()
- {
- @Override public int compare(StorageLocation left, StorageLocation right)
- {
- return Longs.compare(right.available(), left.available());
- }
- };
+ /**
+ * A map between segment and referenceCountingLocks.
+ *
+ * These locks should be acquired whenever getting or deleting files for a segment.
+ * If different threads try to get or delete files simultaneously, one of them creates a lock first using
+ * {@link #createOrGetLock}. And then, all threads compete with each other to get the lock.
+ * Finally, the lock should be released using {@link #unlock}.
+ *
+ * An example usage is:
+ *
+ * final ReferenceCountingLock lock = createOrGetLock(segment);
+ * synchronized (lock) {
+ * try {
+ * doSomething();
+ * }
+ * finally {
+ * unlock(lock);
+ * }
+ * }
+ */
+ private final ConcurrentHashMap<DataSegment, ReferenceCountingLock> segmentLocks = new ConcurrentHashMap<>();
@Inject
public SegmentLoaderLocalCacheManager(
@@ -71,12 +90,15 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
this.locations = new ArrayList<>();
for (StorageLocationConfig locationConfig : config.getLocations()) {
- locations.add(new StorageLocation(
- locationConfig.getPath(),
- locationConfig.getMaxSize(),
- locationConfig.getFreeSpacePercent()
- ));
+ locations.add(
+ new StorageLocation(
+ locationConfig.getPath(),
+ locationConfig.getMaxSize(),
+ locationConfig.getFreeSpacePercent()
+ )
+ );
}
+ locations.sort(COMPARATOR);
}
public SegmentLoaderLocalCacheManager withConfig(SegmentLoaderConfig config)
@@ -92,7 +114,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
private StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
{
- for (StorageLocation location : getSortedList(locations)) {
+ for (StorageLocation location : locations) {
File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
if (localStorageDir.exists()) {
return location;
@@ -104,7 +126,16 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
@Override
public Segment getSegment(DataSegment segment) throws SegmentLoadingException
{
- File segmentFiles = getSegmentFiles(segment);
+ final ReferenceCountingLock lock = createOrGetLock(segment);
+ final File segmentFiles;
+ synchronized (lock) {
+ try {
+ segmentFiles = getSegmentFiles(segment);
+ }
+ finally {
+ unlock(segment, lock);
+ }
+ }
File factoryJson = new File(segmentFiles, "factory.json");
final SegmentizerFactory factory;
@@ -125,14 +156,22 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
@Override
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
- StorageLocation loc = findStorageLocationIfLoaded(segment);
- String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false);
+ final ReferenceCountingLock lock = createOrGetLock(segment);
+ synchronized (lock) {
+ try {
+ StorageLocation loc = findStorageLocationIfLoaded(segment);
+ String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false);
- if (loc == null) {
- loc = loadSegmentWithRetry(segment, storageDir);
+ if (loc == null) {
+ loc = loadSegmentWithRetry(segment, storageDir);
+ }
+ loc.addSegment(segment);
+ return new File(loc.getPath(), storageDir);
+ }
+ finally {
+ unlock(segment, lock);
+ }
}
- loc.addSegment(segment);
- return new File(loc.getPath(), storageDir);
}
/**
@@ -142,7 +181,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
*/
private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException
{
- for (StorageLocation loc : getSortedList(locations)) {
+ for (StorageLocation loc : locations) {
if (loc.canHandle(segment)) {
File storageDir = new File(loc.getPath(), storageDirStr);
@@ -171,7 +210,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
// We use a marker to prevent the case where a segment is downloaded, but before the download completes,
// the parent directories of the segment are removed
final File downloadStartMarker = new File(storageDir, "downloadStartMarker");
- synchronized (lock) {
+ synchronized (directoryWriteRemoveLock) {
if (!storageDir.mkdirs()) {
log.debug("Unable to make parent file[%s]", storageDir);
}
@@ -214,23 +253,31 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
return;
}
- StorageLocation loc = findStorageLocationIfLoaded(segment);
+ final ReferenceCountingLock lock = createOrGetLock(segment);
+ synchronized (lock) {
+ try {
+ StorageLocation loc = findStorageLocationIfLoaded(segment);
- if (loc == null) {
- log.info("Asked to cleanup something[%s] that didn't exist. Skipping.", segment);
- return;
- }
+ if (loc == null) {
+ log.warn("Asked to cleanup something[%s] that didn't exist. Skipping.", segment);
+ return;
+ }
- // If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed,
- // in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not.
- // So we should always clean all possible locations here
- for (StorageLocation location : getSortedList(locations)) {
- File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
- if (localStorageDir.exists()) {
- // Druid creates folders of the form dataSource/interval/version/partitionNum.
- // We need to clean up all these directories if they are all empty.
- cleanupCacheFiles(location.getPath(), localStorageDir);
- location.removeSegment(segment);
+ // If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed,
+ // in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not.
+ // So we should always clean all possible locations here
+ for (StorageLocation location : locations) {
+ File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
+ if (localStorageDir.exists()) {
+ // Druid creates folders of the form dataSource/interval/version/partitionNum.
+ // We need to clean up all these directories if they are all empty.
+ cleanupCacheFiles(location.getPath(), localStorageDir);
+ location.removeSegment(segment);
+ }
+ }
+ }
+ finally {
+ unlock(segment, lock);
}
}
}
@@ -241,13 +288,13 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
return;
}
- synchronized (lock) {
+ synchronized (directoryWriteRemoveLock) {
log.info("Deleting directory[%s]", cacheFile);
try {
FileUtils.deleteDirectory(cacheFile);
}
catch (Exception e) {
- log.error("Unable to remove file[%s]", cacheFile);
+ log.error(e, "Unable to remove directory[%s]", cacheFile);
}
}
@@ -260,11 +307,62 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
}
}
- private List<StorageLocation> getSortedList(List<StorageLocation> locs)
+ private ReferenceCountingLock createOrGetLock(DataSegment dataSegment)
+ {
+ return segmentLocks.compute(
+ dataSegment,
+ (segment, lock) -> {
+ final ReferenceCountingLock nonNullLock;
+ if (lock == null) {
+ nonNullLock = new ReferenceCountingLock();
+ } else {
+ nonNullLock = lock;
+ }
+ nonNullLock.increment();
+ return nonNullLock;
+ }
+ );
+ }
+
+ private void unlock(DataSegment dataSegment, ReferenceCountingLock lock)
+ {
+ segmentLocks.compute(
+ dataSegment,
+ (segment, existingLock) -> {
+ //noinspection ObjectEquality
+ if (existingLock == null || existingLock != lock) {
+ throw new ISE("WTH? Different createOrGetLock instance");
+ } else {
+ if (existingLock.numReferences == 1) {
+ return null;
+ } else {
+ existingLock.decrement();
+ return existingLock;
+ }
+ }
+ }
+ );
+ }
+
+ @VisibleForTesting
+ private static class ReferenceCountingLock
{
- List<StorageLocation> locations = new ArrayList<>(locs);
- Collections.sort(locations, COMPARATOR);
+ private int numReferences;
+
+ private void increment()
+ {
+ ++numReferences;
+ }
- return locations;
+ private void decrement()
+ {
+ --numReferences;
+ }
+ }
+
+ @VisibleForTesting
+ public ConcurrentHashMap<DataSegment, ReferenceCountingLock> getSegmentLocks()
+ {
+ return segmentLocks;
}
}
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index a1975ff..ecbb0d4 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -171,7 +171,7 @@ public class SegmentManager
);
if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) {
- log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getId());
+ log.warn("Told to load an adapter for segment[%s] that already exists", segment.getId());
resultSupplier.set(false);
} else {
loadedIntervals.add(
@@ -223,6 +223,8 @@ public class SegmentManager
final PartitionChunk<ReferenceCountingSegment> removed = loadedIntervals.remove(
segment.getInterval(),
segment.getVersion(),
+ // remove() internally searches for a partitionChunk to remove which is *equal* to the given
+ // partitionChunk. Note that partitionChunk.equals() checks only the partitionNum, but not the object.
segment.getShardSpec().createChunk(null)
);
final ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject();
@@ -234,7 +236,7 @@ public class SegmentManager
oldQueryable.close();
} else {
log.info(
- "Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.",
+ "Told to delete a queryable on dataSource[%s] for interval[%s] and version[%s] that I don't have.",
dataSourceName,
segment.getInterval(),
segment.getVersion()
diff --git a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
index d45c51c..205bbac 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
@@ -119,9 +119,14 @@ public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer
return;
}
- DataSegment toAnnounce = segmentTransformer.apply(segment);
-
synchronized (lock) {
+ if (segmentLookup.containsKey(segment)) {
+ log.info("Skipping announcement of segment [%s]. Announcement exists already.", segment.getId());
+ return;
+ }
+
+ DataSegment toAnnounce = segmentTransformer.apply(segment);
+
changes.addChangeRequest(new SegmentChangeRequestLoad(toAnnounce));
if (config.isSkipSegmentAnnouncementOnZk()) {
diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 53b12c9..36fbe56 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -106,11 +106,16 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
SegmentManager segmentManager
)
{
- this(jsonMapper, config, announcer, serverAnnouncer, segmentManager,
- Executors.newScheduledThreadPool(
- config.getNumLoadingThreads(),
- Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
- )
+ this(
+ jsonMapper,
+ config,
+ announcer,
+ serverAnnouncer,
+ segmentManager,
+ Executors.newScheduledThreadPool(
+ config.getNumLoadingThreads(),
+ Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
+ )
);
}
@@ -250,7 +255,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
* Load a single segment. If the segment is loaded successfully, this function simply returns. Otherwise it will
* throw a SegmentLoadingException
*
- * @throws SegmentLoadingException
+ * @throws SegmentLoadingException if it fails to load the given segment
*/
private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException
{
@@ -305,6 +310,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
}
}
loadSegment(segment, DataSegmentChangeCallback.NOOP);
+ // announce segment even if the segment file already exists.
try {
announcer.announceSegment(segment);
}
@@ -727,17 +733,11 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
(request, statusRef) -> result.add(new DataSegmentChangeRequestAndStatus(request, statusRef.get()))
);
- super.set(result);
+ set(result);
}
}
@Override
- public boolean setException(Throwable throwable)
- {
- return super.setException(throwable);
- }
-
- @Override
public boolean cancel(boolean interruptIfRunning)
{
synchronized (waitingFutures) {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
index e3ab604..0c0ee56 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
@@ -147,9 +147,9 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
return;
}
- int batchSize = config.getHttpLoadQueuePeonBatchSize();
+ final int batchSize = config.getHttpLoadQueuePeonBatchSize();
- List<DataSegmentChangeRequest> newRequests = new ArrayList<>(batchSize);
+ final List<DataSegmentChangeRequest> newRequests = new ArrayList<>(batchSize);
synchronized (lock) {
Iterator<Map.Entry<DataSegment, SegmentHolder>> iter = Iterators.concat(
@@ -157,8 +157,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
segmentsToLoad.entrySet().iterator()
);
- while (batchSize > 0 && iter.hasNext()) {
- batchSize--;
+ while (newRequests.size() < batchSize && iter.hasNext()) {
Map.Entry<DataSegment, SegmentHolder> entry = iter.next();
if (entry.getValue().hasTimedOut()) {
entry.getValue().requestFailed("timed out");
@@ -304,8 +303,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
return;
}
- if (status.getState()
- == SegmentLoadDropHandler.Status.STATE.FAILED) {
+ if (status.getState() == SegmentLoadDropHandler.Status.STATE.FAILED) {
holder.requestFailed(status.getFailureCause());
} else {
holder.requestSucceeded();
diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
index 5723359..0b291d4 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
@@ -37,7 +37,6 @@ import java.util.Set;
*/
public class CacheTestSegmentLoader implements SegmentLoader
{
-
private final Set<DataSegment> segmentsInTrash = new HashSet<>();
@Override
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
index 826f249..14cbdbd 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
@@ -309,7 +309,8 @@ public class SegmentManagerTest
public void testLoadDuplicatedSegmentsInParallel()
throws ExecutionException, InterruptedException, SegmentLoadingException
{
- final List<Future<Boolean>> futures = ImmutableList.of(segments.get(0), segments.get(0), segments.get(0)).stream()
+ final List<Future<Boolean>> futures = ImmutableList.of(segments.get(0), segments.get(0), segments.get(0))
+ .stream()
.map(
segment -> executor.submit(
() -> segmentManager.loadSegment(segment)
@@ -347,16 +348,17 @@ public class SegmentManagerTest
throws SegmentLoadingException, ExecutionException, InterruptedException
{
segmentManager.loadSegment(segments.get(0));
- final List<Future<Void>> futures = ImmutableList.of(segments.get(1), segments.get(2)).stream()
- .map(
- segment -> executor.submit(
- () -> {
- segmentManager.dropSegment(segment);
- return (Void) null;
- }
- )
- )
- .collect(Collectors.toList());
+ final List<Future<Void>> futures = ImmutableList.of(segments.get(1), segments.get(2))
+ .stream()
+ .map(
+ segment -> executor.submit(
+ () -> {
+ segmentManager.dropSegment(segment);
+ return (Void) null;
+ }
+ )
+ )
+ .collect(Collectors.toList());
for (Future<Void> future : futures) {
future.get();
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
new file mode 100644
index 0000000..ab78689
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server;
+
+import com.fasterxml.jackson.databind.InjectableValues.Std;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.FileUtils.FileCopyResult;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.LocalDataSegmentPuller;
+import org.apache.druid.segment.loading.LocalLoadSpec;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentizerFactory;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class SegmentManagerThreadSafetyTest
+{
+ private static final int NUM_THREAD = 4;
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private TestSegmentPuller segmentPuller;
+ private ObjectMapper objectMapper;
+ private IndexIO indexIO;
+ private File segmentCacheDir;
+ private File segmentDeepStorageDir;
+ private SegmentLoaderLocalCacheManager segmentLoader;
+ private SegmentManager segmentManager;
+ private ExecutorService exec;
+
+ @Before
+ public void setup() throws IOException
+ {
+ segmentPuller = new TestSegmentPuller();
+ objectMapper = new DefaultObjectMapper()
+ .registerModule(
+ new SimpleModule().registerSubtypes(new NamedType(LocalLoadSpec.class, "local"), new NamedType(TestSegmentizerFactory.class, "test"))
+ )
+ .setInjectableValues(new Std().addValue(LocalDataSegmentPuller.class, segmentPuller));
+ indexIO = new IndexIO(objectMapper, () -> 0);
+ segmentCacheDir = temporaryFolder.newFolder();
+ segmentDeepStorageDir = temporaryFolder.newFolder();
+ segmentLoader = new SegmentLoaderLocalCacheManager(
+ indexIO,
+ new SegmentLoaderConfig()
+ {
+ @Override
+ public List<StorageLocationConfig> getLocations()
+ {
+ return Collections.singletonList(
+ new StorageLocationConfig().setPath(segmentCacheDir)
+ );
+ }
+ },
+ objectMapper
+ );
+ segmentManager = new SegmentManager(segmentLoader);
+ exec = Execs.multiThreaded(NUM_THREAD, "SegmentManagerThreadSafetyTest-%d");
+ EmittingLogger.registerEmitter(new NoopServiceEmitter());
+ }
+
+ @After
+ public void teardown() throws IOException
+ {
+ exec.shutdownNow();
+ FileUtils.deleteDirectory(segmentCacheDir);
+ }
+
+ @Test(timeout = 5000L)
+ public void testLoadSameSegment() throws IOException, ExecutionException, InterruptedException
+ {
+ final DataSegment segment = createSegment("2019-01-01/2019-01-02");
+ final List<Future> futures = IntStream
+ .range(0, 16)
+ .mapToObj(i -> exec.submit(() -> segmentManager.loadSegment(segment)))
+ .collect(Collectors.toList());
+ for (Future future : futures) {
+ future.get();
+ }
+ Assert.assertEquals(1, segmentPuller.numFileLoaded.size());
+ Assert.assertEquals(1, segmentPuller.numFileLoaded.values().iterator().next().intValue());
+ Assert.assertEquals(0, segmentLoader.getSegmentLocks().size());
+ }
+
+ @Test(timeout = 5000L)
+ public void testLoadMultipleSegments() throws IOException, ExecutionException, InterruptedException
+ {
+ final List<DataSegment> segments = new ArrayList<>(88);
+ for (int i = 0; i < 11; i++) {
+ for (int j = 0; j < 8; j++) {
+ segments.add(createSegment(StringUtils.format("2019-%02d-01/2019-%02d-01", i + 1, i + 2)));
+ }
+ }
+
+ final List<Future> futures = IntStream
+ .range(0, 16)
+ .mapToObj(i -> exec.submit(() -> {
+ for (DataSegment segment : segments) {
+ try {
+ segmentManager.loadSegment(segment);
+ }
+ catch (SegmentLoadingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }))
+ .collect(Collectors.toList());
+ for (Future future : futures) {
+ future.get();
+ }
+ Assert.assertEquals(11, segmentPuller.numFileLoaded.size());
+ Assert.assertEquals(1, segmentPuller.numFileLoaded.values().iterator().next().intValue());
+ Assert.assertEquals(0, segmentLoader.getSegmentLocks().size());
+ }
+
+ private DataSegment createSegment(String interval) throws IOException
+ {
+ final DataSegment tmpSegment = new DataSegment(
+ "dataSource",
+ Intervals.of(interval),
+ "version",
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new NumberedShardSpec(0, 0),
+ 9,
+ 100
+ );
+ final String storageDir = DataSegmentPusher.getDefaultStorageDir(tmpSegment, false);
+ final File segmentDir = new File(segmentDeepStorageDir, storageDir);
+ FileUtils.forceMkdir(segmentDir);
+
+ final File factoryJson = new File(segmentDir, "factory.json");
+ objectMapper.writeValue(factoryJson, new TestSegmentizerFactory());
+ return tmpSegment.withLoadSpec(
+ ImmutableMap.of("type", "local", "path", segmentDir.getAbsolutePath())
+ );
+ }
+
+ private static class TestSegmentPuller extends LocalDataSegmentPuller
+ {
+ private final Map<File, Integer> numFileLoaded = new HashMap<>();
+
+ @Override
+ public FileCopyResult getSegmentFiles(final File sourceFile, final File dir)
+ {
+ numFileLoaded.compute(sourceFile, (f, numLoaded) -> numLoaded == null ? 1 : numLoaded + 1);
+ try {
+ FileUtils.copyDirectory(sourceFile, dir);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return new FileCopyResult()
+ {
+ @Override
+ public long size()
+ {
+ return 100L;
+ }
+ };
+ }
+ }
+
+ private static class TestSegmentizerFactory implements SegmentizerFactory
+ {
+ @Override
+ public Segment factorize(DataSegment segment, File parentDir)
+ {
+ return new Segment()
+ {
+ @Override
+ public SegmentId getId()
+ {
+ return segment.getId();
+ }
+
+ @Override
+ public Interval getDataInterval()
+ {
+ return segment.getInterval();
+ }
+
+ @Nullable
+ @Override
+ public QueryableIndex asQueryableIndex()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public StorageAdapter asStorageAdapter()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T as(Class<T> clazz)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+ };
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index 6f227e4..f5455fb 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -65,15 +65,6 @@ public class SegmentLoadDropHandlerTest
private static final Logger log = new Logger(ZkCoordinatorTest.class);
private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
- private final DruidServerMetadata me = new DruidServerMetadata(
- "dummyServer",
- "dummyHost",
- null,
- 0,
- ServerType.HISTORICAL,
- "normal",
- 0
- );
private SegmentLoadDropHandler segmentLoadDropHandler;
private DataSegmentAnnouncer announcer;
diff --git a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
index 52006f5..8debba1 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
@@ -51,10 +51,17 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -65,11 +72,12 @@ public class BatchDataSegmentAnnouncerTest
private static final String testBasePath = "/test";
private static final String testSegmentsPath = "/test/segments/id";
private static final Joiner joiner = Joiner.on("/");
+ private static final int NUM_THREADS = 4;
private TestingCluster testingCluster;
private CuratorFramework cf;
private ObjectMapper jsonMapper;
- private Announcer announcer;
+ private TestAnnouncer announcer;
private SegmentReader segmentReader;
private BatchDataSegmentAnnouncer segmentAnnouncer;
private Set<DataSegment> testSegments;
@@ -78,6 +86,7 @@ public class BatchDataSegmentAnnouncerTest
private Boolean skipDimensionsAndMetrics;
private Boolean skipLoadSpec;
+ private ExecutorService exec;
@Before
public void setUp() throws Exception
@@ -96,7 +105,7 @@ public class BatchDataSegmentAnnouncerTest
jsonMapper = TestHelper.makeJsonMapper();
- announcer = new Announcer(
+ announcer = new TestAnnouncer(
cf,
Execs.directExecutor()
);
@@ -157,6 +166,8 @@ public class BatchDataSegmentAnnouncerTest
for (int i = 0; i < 100; i++) {
testSegments.add(makeSegment(i));
}
+
+ exec = Execs.multiThreaded(NUM_THREADS, "BatchDataSegmentAnnouncerTest-%d");
}
@After
@@ -165,6 +176,7 @@ public class BatchDataSegmentAnnouncerTest
announcer.stop();
cf.close();
testingCluster.stop();
+ exec.shutdownNow();
}
@Test
@@ -299,6 +311,14 @@ public class BatchDataSegmentAnnouncerTest
testBatchAnnounce(true);
}
+ @Test
+ public void testMultipleBatchAnnounce() throws Exception
+ {
+ for (int i = 0; i < 10; i++) {
+ testBatchAnnounce(false);
+ }
+ }
+
private void testBatchAnnounce(boolean testHistory) throws Exception
{
segmentAnnouncer.announceSegments(testSegments);
@@ -342,11 +362,72 @@ public class BatchDataSegmentAnnouncerTest
}
}
- @Test
- public void testMultipleBatchAnnounce() throws Exception
+ @Test(timeout = 5000L)
+ public void testAnnounceSegmentsWithSameSegmentConcurrently() throws ExecutionException, InterruptedException
{
- for (int i = 0; i < 10; i++) {
- testBatchAnnounce(false);
+ final List<Future> futures = new ArrayList<>(NUM_THREADS);
+
+ for (int i = 0; i < NUM_THREADS; i++) {
+ futures.add(
+ exec.submit(() -> {
+ try {
+ segmentAnnouncer.announceSegments(testSegments);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ );
+ }
+
+ for (Future future : futures) {
+ future.get();
+ }
+
+ // Announcing 100 segments requires 2 nodes because of maxBytesPerNode configuration.
+ Assert.assertEquals(2, announcer.numPathAnnounced.size());
+ for (ConcurrentHashMap<byte[], AtomicInteger> eachMap : announcer.numPathAnnounced.values()) {
+ for (Entry<byte[], AtomicInteger> entry : eachMap.entrySet()) {
+ Assert.assertEquals(1, entry.getValue().get());
+ }
+ }
+ }
+
+ @Test(timeout = 5000L)
+ public void testAnnounceSegmentWithSameSegmentConcurrently() throws ExecutionException, InterruptedException
+ {
+ final List<Future> futures = new ArrayList<>(NUM_THREADS);
+
+ final DataSegment segment1 = makeSegment(0);
+ final DataSegment segment2 = makeSegment(1);
+ final DataSegment segment3 = makeSegment(2);
+ final DataSegment segment4 = makeSegment(3);
+
+ for (int i = 0; i < NUM_THREADS; i++) {
+ futures.add(
+ exec.submit(() -> {
+ try {
+ segmentAnnouncer.announceSegment(segment1);
+ segmentAnnouncer.announceSegment(segment2);
+ segmentAnnouncer.announceSegment(segment3);
+ segmentAnnouncer.announceSegment(segment4);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ );
+ }
+
+ for (Future future : futures) {
+ future.get();
+ }
+
+ Assert.assertEquals(1, announcer.numPathAnnounced.size());
+ for (ConcurrentHashMap<byte[], AtomicInteger> eachMap : announcer.numPathAnnounced.values()) {
+ for (Entry<byte[], AtomicInteger> entry : eachMap.entrySet()) {
+ Assert.assertEquals(1, entry.getValue().get());
+ }
}
}
@@ -396,4 +477,21 @@ public class BatchDataSegmentAnnouncerTest
return new HashSet<>();
}
}
+
+ private static class TestAnnouncer extends Announcer
+ {
+ private final ConcurrentHashMap<String, ConcurrentHashMap<byte[], AtomicInteger>> numPathAnnounced = new ConcurrentHashMap<>();
+
+ private TestAnnouncer(CuratorFramework curator, ExecutorService exec)
+ {
+ super(curator, exec);
+ }
+
+ @Override
+ public void announce(String path, byte[] bytes, boolean removeParentIfCreated)
+ {
+ numPathAnnounced.computeIfAbsent(path, k -> new ConcurrentHashMap<>()).computeIfAbsent(bytes, k -> new AtomicInteger(0)).incrementAndGet();
+ super.announce(path, bytes, removeParentIfCreated);
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
index 1b6ff3d..0f73695 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
@@ -20,12 +20,9 @@
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.druid.discovery.DiscoveryDruidNode;
-import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -47,12 +44,10 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
/**
*/
@@ -172,29 +167,8 @@ public class HttpLoadQueuePeonTest
}
- private static class TestDruidNodeDiscovery implements DruidNodeDiscovery
- {
- Listener listener;
-
- @Override
- public Collection<DiscoveryDruidNode> getAllNodes()
- {
- throw new UnsupportedOperationException("Not Implemented.");
- }
-
- @Override
- public void registerListener(Listener listener)
- {
- listener.nodesAdded(ImmutableList.of());
- listener.nodeViewInitialized();
- this.listener = listener;
- }
- }
-
private static class TestHttpClient implements HttpClient
{
- AtomicInteger requestNum = new AtomicInteger(0);
-
@Override
public <Intermediate, Final> ListenableFuture<Final> go(
Request request,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org