You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/03/20 01:07:57 UTC

[incubator-druid] branch 0.14.0-incubating updated: Fix race in historical when loading segments in parallel (#7203) (#7298)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
     new e926121  Fix race in historical when loading segments in parallel (#7203) (#7298)
e926121 is described below

commit e9261215764307b9c18a1f3c61f40fa6056e4431
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Tue Mar 19 18:07:49 2019 -0700

    Fix race in historical when loading segments in parallel (#7203) (#7298)
    
    * Fix race in historical when loading segments in parallel
    
    * revert unnecessary change
    
    * remove synchronized
    
    * add reference counting locking
    
    * fix build
    
    * fix comment
---
 .../druid/segment/ReferenceCountingSegment.java    |   5 +-
 .../druid/segment/loading/SegmentLoader.java       |   2 +
 .../loading/SegmentLoaderLocalCacheManager.java    | 188 +++++++++++----
 .../org/apache/druid/server/SegmentManager.java    |   6 +-
 .../coordination/BatchDataSegmentAnnouncer.java    |   9 +-
 .../coordination/SegmentLoadDropHandler.java       |  26 +-
 .../server/coordinator/HttpLoadQueuePeon.java      |  10 +-
 .../segment/loading/CacheTestSegmentLoader.java    |   1 -
 .../apache/druid/server/SegmentManagerTest.java    |  24 +-
 .../server/SegmentManagerThreadSafetyTest.java     | 268 +++++++++++++++++++++
 .../coordination/SegmentLoadDropHandlerTest.java   |   9 -
 .../BatchDataSegmentAnnouncerTest.java             | 110 ++++++++-
 .../server/coordinator/HttpLoadQueuePeonTest.java  |  26 --
 13 files changed, 561 insertions(+), 123 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java
index 4f27a4f..ef98b75 100644
--- a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java
+++ b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.segment;
 
-import com.google.common.base.Preconditions;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.timeline.SegmentId;
 import org.joda.time.Interval;
@@ -45,8 +44,10 @@ public class ReferenceCountingSegment extends AbstractSegment
     @Override
     protected boolean onAdvance(int phase, int registeredParties)
     {
-      Preconditions.checkState(registeredParties == 0);
       // Ensure that onAdvance() doesn't throw exception, otherwise termination won't happen
+      if (registeredParties != 0) {
+        log.error("registeredParties[%s] is not 0", registeredParties);
+      }
       try {
         baseSegment.close();
       }
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
index 3db4672..301e7370 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
@@ -25,6 +25,8 @@ import org.apache.druid.timeline.DataSegment;
 import java.io.File;
 
 /**
+ * Loading segments from deep storage to local storage.
+ * Implementations must be thread-safe.
  */
 public interface SegmentLoader
 {
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
index 68e8a20..fb7b143 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
@@ -20,10 +20,12 @@
 package org.apache.druid.segment.loading;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Longs;
 import com.google.inject.Inject;
 import org.apache.commons.io.FileUtils;
 import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.Segment;
@@ -32,15 +34,17 @@ import org.apache.druid.timeline.DataSegment;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  */
 public class SegmentLoaderLocalCacheManager implements SegmentLoader
 {
   private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class);
+  private static final Comparator<StorageLocation> COMPARATOR = (left, right) ->
+      Longs.compare(right.available(), left.available());
 
   private final IndexIO indexIO;
   private final SegmentLoaderConfig config;
@@ -48,15 +52,30 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
 
   private final List<StorageLocation> locations;
 
-  private final Object lock = new Object();
+  // This directoryWriteRemoveLock is used when creating or removing a directory
+  private final Object directoryWriteRemoveLock = new Object();
 
-  private static final Comparator<StorageLocation> COMPARATOR = new Comparator<StorageLocation>()
-  {
-    @Override public int compare(StorageLocation left, StorageLocation right)
-    {
-      return Longs.compare(right.available(), left.available());
-    }
-  };
+  /**
+   * A map between segment and referenceCountingLocks.
+   *
+   * These locks should be acquired whenever getting or deleting files for a segment.
+   * If different threads try to get or delete files simultaneously, one of them creates a lock first using
+   * {@link #createOrGetLock}. And then, all threads compete with each other to get the lock.
+   * Finally, the lock should be released using {@link #unlock}.
+   *
+   * An example usage is:
+   *
+   * final ReferenceCountingLock lock = createOrGetLock(segment);
+   * synchronized (lock) {
+   *   try {
+   *     doSomething();
+   *   }
+   *   finally {
+   *     unlock(lock);
+   *   }
+   * }
+   */
+  private final ConcurrentHashMap<DataSegment, ReferenceCountingLock> segmentLocks = new ConcurrentHashMap<>();
 
   @Inject
   public SegmentLoaderLocalCacheManager(
@@ -71,12 +90,15 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
 
     this.locations = new ArrayList<>();
     for (StorageLocationConfig locationConfig : config.getLocations()) {
-      locations.add(new StorageLocation(
-          locationConfig.getPath(),
-          locationConfig.getMaxSize(),
-          locationConfig.getFreeSpacePercent()
-      ));
+      locations.add(
+          new StorageLocation(
+              locationConfig.getPath(),
+              locationConfig.getMaxSize(),
+              locationConfig.getFreeSpacePercent()
+          )
+      );
     }
+    locations.sort(COMPARATOR);
   }
 
   public SegmentLoaderLocalCacheManager withConfig(SegmentLoaderConfig config)
@@ -92,7 +114,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
 
   private StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
   {
-    for (StorageLocation location : getSortedList(locations)) {
+    for (StorageLocation location : locations) {
       File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
       if (localStorageDir.exists()) {
         return location;
@@ -104,7 +126,16 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
   @Override
   public Segment getSegment(DataSegment segment) throws SegmentLoadingException
   {
-    File segmentFiles = getSegmentFiles(segment);
+    final ReferenceCountingLock lock = createOrGetLock(segment);
+    final File segmentFiles;
+    synchronized (lock) {
+      try {
+        segmentFiles = getSegmentFiles(segment);
+      }
+      finally {
+        unlock(segment, lock);
+      }
+    }
     File factoryJson = new File(segmentFiles, "factory.json");
     final SegmentizerFactory factory;
 
@@ -125,14 +156,22 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
   @Override
   public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
   {
-    StorageLocation loc = findStorageLocationIfLoaded(segment);
-    String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false);
+    final ReferenceCountingLock lock = createOrGetLock(segment);
+    synchronized (lock) {
+      try {
+        StorageLocation loc = findStorageLocationIfLoaded(segment);
+        String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false);
 
-    if (loc == null) {
-      loc = loadSegmentWithRetry(segment, storageDir);
+        if (loc == null) {
+          loc = loadSegmentWithRetry(segment, storageDir);
+        }
+        loc.addSegment(segment);
+        return new File(loc.getPath(), storageDir);
+      }
+      finally {
+        unlock(segment, lock);
+      }
     }
-    loc.addSegment(segment);
-    return new File(loc.getPath(), storageDir);
   }
 
   /**
@@ -142,7 +181,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
    */
   private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException
   {
-    for (StorageLocation loc : getSortedList(locations)) {
+    for (StorageLocation loc : locations) {
       if (loc.canHandle(segment)) {
         File storageDir = new File(loc.getPath(), storageDirStr);
 
@@ -171,7 +210,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
     // We use a marker to prevent the case where a segment is downloaded, but before the download completes,
     // the parent directories of the segment are removed
     final File downloadStartMarker = new File(storageDir, "downloadStartMarker");
-    synchronized (lock) {
+    synchronized (directoryWriteRemoveLock) {
       if (!storageDir.mkdirs()) {
         log.debug("Unable to make parent file[%s]", storageDir);
       }
@@ -214,23 +253,31 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
       return;
     }
 
-    StorageLocation loc = findStorageLocationIfLoaded(segment);
+    final ReferenceCountingLock lock = createOrGetLock(segment);
+    synchronized (lock) {
+      try {
+        StorageLocation loc = findStorageLocationIfLoaded(segment);
 
-    if (loc == null) {
-      log.info("Asked to cleanup something[%s] that didn't exist.  Skipping.", segment);
-      return;
-    }
+        if (loc == null) {
+          log.warn("Asked to cleanup something[%s] that didn't exist.  Skipping.", segment);
+          return;
+        }
 
-    // If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed,
-    // in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not.
-    // So we should always clean all possible locations here
-    for (StorageLocation location : getSortedList(locations)) {
-      File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
-      if (localStorageDir.exists()) {
-        // Druid creates folders of the form dataSource/interval/version/partitionNum.
-        // We need to clean up all these directories if they are all empty.
-        cleanupCacheFiles(location.getPath(), localStorageDir);
-        location.removeSegment(segment);
+        // If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed,
+        // in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not.
+        // So we should always clean all possible locations here
+        for (StorageLocation location : locations) {
+          File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
+          if (localStorageDir.exists()) {
+            // Druid creates folders of the form dataSource/interval/version/partitionNum.
+            // We need to clean up all these directories if they are all empty.
+            cleanupCacheFiles(location.getPath(), localStorageDir);
+            location.removeSegment(segment);
+          }
+        }
+      }
+      finally {
+        unlock(segment, lock);
       }
     }
   }
@@ -241,13 +288,13 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
       return;
     }
 
-    synchronized (lock) {
+    synchronized (directoryWriteRemoveLock) {
       log.info("Deleting directory[%s]", cacheFile);
       try {
         FileUtils.deleteDirectory(cacheFile);
       }
       catch (Exception e) {
-        log.error("Unable to remove file[%s]", cacheFile);
+        log.error(e, "Unable to remove directory[%s]", cacheFile);
       }
     }
 
@@ -260,11 +307,62 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
     }
   }
 
-  private List<StorageLocation> getSortedList(List<StorageLocation> locs)
+  private ReferenceCountingLock createOrGetLock(DataSegment dataSegment)
+  {
+    return segmentLocks.compute(
+        dataSegment,
+        (segment, lock) -> {
+          final ReferenceCountingLock nonNullLock;
+          if (lock == null) {
+            nonNullLock = new ReferenceCountingLock();
+          } else {
+            nonNullLock = lock;
+          }
+          nonNullLock.increment();
+          return nonNullLock;
+        }
+    );
+  }
+
+  private void unlock(DataSegment dataSegment, ReferenceCountingLock lock)
+  {
+    segmentLocks.compute(
+        dataSegment,
+        (segment, existingLock) -> {
+          //noinspection ObjectEquality
+          if (existingLock == null || existingLock != lock) {
+            throw new ISE("WTH? Different createOrGetLock instance");
+          } else {
+            if (existingLock.numReferences == 1) {
+              return null;
+            } else {
+              existingLock.decrement();
+              return existingLock;
+            }
+          }
+        }
+    );
+  }
+
+  @VisibleForTesting
+  private static class ReferenceCountingLock
   {
-    List<StorageLocation> locations = new ArrayList<>(locs);
-    Collections.sort(locations, COMPARATOR);
+    private int numReferences;
+
+    private void increment()
+    {
+      ++numReferences;
+    }
 
-    return locations;
+    private void decrement()
+    {
+      --numReferences;
+    }
+  }
+
+  @VisibleForTesting
+  public ConcurrentHashMap<DataSegment, ReferenceCountingLock> getSegmentLocks()
+  {
+    return segmentLocks;
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index a1975ff..ecbb0d4 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -171,7 +171,7 @@ public class SegmentManager
           );
 
           if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) {
-            log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getId());
+            log.warn("Told to load an adapter for segment[%s] that already exists", segment.getId());
             resultSupplier.set(false);
           } else {
             loadedIntervals.add(
@@ -223,6 +223,8 @@ public class SegmentManager
             final PartitionChunk<ReferenceCountingSegment> removed = loadedIntervals.remove(
                 segment.getInterval(),
                 segment.getVersion(),
+                // remove() internally searches for a partitionChunk to remove which is *equal* to the given
+                // partitionChunk. Note that partitionChunk.equals() checks only the partitionNum, but not the object.
                 segment.getShardSpec().createChunk(null)
             );
             final ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject();
@@ -234,7 +236,7 @@ public class SegmentManager
               oldQueryable.close();
             } else {
               log.info(
-                  "Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.",
+                  "Told to delete a queryable on dataSource[%s] for interval[%s] and version[%s] that I don't have.",
                   dataSourceName,
                   segment.getInterval(),
                   segment.getVersion()
diff --git a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
index d45c51c..205bbac 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java
@@ -119,9 +119,14 @@ public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer
       return;
     }
 
-    DataSegment toAnnounce = segmentTransformer.apply(segment);
-
     synchronized (lock) {
+      if (segmentLookup.containsKey(segment)) {
+        log.info("Skipping announcement of segment [%s]. Announcement exists already.", segment.getId());
+        return;
+      }
+
+      DataSegment toAnnounce = segmentTransformer.apply(segment);
+
       changes.addChangeRequest(new SegmentChangeRequestLoad(toAnnounce));
 
       if (config.isSkipSegmentAnnouncementOnZk()) {
diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 53b12c9..36fbe56 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -106,11 +106,16 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
       SegmentManager segmentManager
   )
   {
-    this(jsonMapper, config, announcer, serverAnnouncer, segmentManager,
-         Executors.newScheduledThreadPool(
-             config.getNumLoadingThreads(),
-             Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
-         )
+    this(
+        jsonMapper,
+        config,
+        announcer,
+        serverAnnouncer,
+        segmentManager,
+        Executors.newScheduledThreadPool(
+            config.getNumLoadingThreads(),
+            Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
+        )
     );
   }
 
@@ -250,7 +255,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
    * Load a single segment. If the segment is loaded successfully, this function simply returns. Otherwise it will
    * throw a SegmentLoadingException
    *
-   * @throws SegmentLoadingException
+   * @throws SegmentLoadingException if it fails to load the given segment
    */
   private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException
   {
@@ -305,6 +310,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
         }
       }
       loadSegment(segment, DataSegmentChangeCallback.NOOP);
+      // announce segment even if the segment file already exists.
       try {
         announcer.announceSegment(segment);
       }
@@ -727,17 +733,11 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
             (request, statusRef) -> result.add(new DataSegmentChangeRequestAndStatus(request, statusRef.get()))
         );
 
-        super.set(result);
+        set(result);
       }
     }
 
     @Override
-    public boolean setException(Throwable throwable)
-    {
-      return super.setException(throwable);
-    }
-
-    @Override
     public boolean cancel(boolean interruptIfRunning)
     {
       synchronized (waitingFutures) {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
index e3ab604..0c0ee56 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
@@ -147,9 +147,9 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
       return;
     }
 
-    int batchSize = config.getHttpLoadQueuePeonBatchSize();
+    final int batchSize = config.getHttpLoadQueuePeonBatchSize();
 
-    List<DataSegmentChangeRequest> newRequests = new ArrayList<>(batchSize);
+    final List<DataSegmentChangeRequest> newRequests = new ArrayList<>(batchSize);
 
     synchronized (lock) {
       Iterator<Map.Entry<DataSegment, SegmentHolder>> iter = Iterators.concat(
@@ -157,8 +157,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
           segmentsToLoad.entrySet().iterator()
       );
 
-      while (batchSize > 0 && iter.hasNext()) {
-        batchSize--;
+      while (newRequests.size() < batchSize && iter.hasNext()) {
         Map.Entry<DataSegment, SegmentHolder> entry = iter.next();
         if (entry.getValue().hasTimedOut()) {
           entry.getValue().requestFailed("timed out");
@@ -304,8 +303,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
               return;
             }
 
-            if (status.getState()
-                == SegmentLoadDropHandler.Status.STATE.FAILED) {
+            if (status.getState() == SegmentLoadDropHandler.Status.STATE.FAILED) {
               holder.requestFailed(status.getFailureCause());
             } else {
               holder.requestSucceeded();
diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
index 5723359..0b291d4 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
@@ -37,7 +37,6 @@ import java.util.Set;
 */
 public class CacheTestSegmentLoader implements SegmentLoader
 {
-
   private final Set<DataSegment> segmentsInTrash = new HashSet<>();
 
   @Override
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
index 826f249..14cbdbd 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
@@ -309,7 +309,8 @@ public class SegmentManagerTest
   public void testLoadDuplicatedSegmentsInParallel()
       throws ExecutionException, InterruptedException, SegmentLoadingException
   {
-    final List<Future<Boolean>> futures = ImmutableList.of(segments.get(0), segments.get(0), segments.get(0)).stream()
+    final List<Future<Boolean>> futures = ImmutableList.of(segments.get(0), segments.get(0), segments.get(0))
+                                                       .stream()
                                                        .map(
                                                            segment -> executor.submit(
                                                                () -> segmentManager.loadSegment(segment)
@@ -347,16 +348,17 @@ public class SegmentManagerTest
       throws SegmentLoadingException, ExecutionException, InterruptedException
   {
     segmentManager.loadSegment(segments.get(0));
-    final List<Future<Void>> futures = ImmutableList.of(segments.get(1), segments.get(2)).stream()
-                                                       .map(
-                                                           segment -> executor.submit(
-                                                               () -> {
-                                                                 segmentManager.dropSegment(segment);
-                                                                 return (Void) null;
-                                                               }
-                                                           )
-                                                       )
-                                                       .collect(Collectors.toList());
+    final List<Future<Void>> futures = ImmutableList.of(segments.get(1), segments.get(2))
+                                                    .stream()
+                                                    .map(
+                                                        segment -> executor.submit(
+                                                            () -> {
+                                                              segmentManager.dropSegment(segment);
+                                                              return (Void) null;
+                                                            }
+                                                        )
+                                                    )
+                                                    .collect(Collectors.toList());
 
     for (Future<Void> future : futures) {
       future.get();
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
new file mode 100644
index 0000000..ab78689
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server;
+
+import com.fasterxml.jackson.databind.InjectableValues.Std;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.FileUtils.FileCopyResult;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.LocalDataSegmentPuller;
+import org.apache.druid.segment.loading.LocalLoadSpec;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentizerFactory;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class SegmentManagerThreadSafetyTest
+{
+  private static final int NUM_THREAD = 4;
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private TestSegmentPuller segmentPuller;
+  private ObjectMapper objectMapper;
+  private IndexIO indexIO;
+  private File segmentCacheDir;
+  private File segmentDeepStorageDir;
+  private SegmentLoaderLocalCacheManager segmentLoader;
+  private SegmentManager segmentManager;
+  private ExecutorService exec;
+
+  @Before
+  public void setup() throws IOException
+  {
+    segmentPuller = new TestSegmentPuller();
+    objectMapper = new DefaultObjectMapper()
+        .registerModule(
+            new SimpleModule().registerSubtypes(new NamedType(LocalLoadSpec.class, "local"), new NamedType(TestSegmentizerFactory.class, "test"))
+        )
+        .setInjectableValues(new Std().addValue(LocalDataSegmentPuller.class, segmentPuller));
+    indexIO = new IndexIO(objectMapper, () -> 0);
+    segmentCacheDir = temporaryFolder.newFolder();
+    segmentDeepStorageDir = temporaryFolder.newFolder();
+    segmentLoader = new SegmentLoaderLocalCacheManager(
+        indexIO,
+        new SegmentLoaderConfig()
+        {
+          @Override
+          public List<StorageLocationConfig> getLocations()
+          {
+            return Collections.singletonList(
+                new StorageLocationConfig().setPath(segmentCacheDir)
+            );
+          }
+        },
+        objectMapper
+    );
+    segmentManager = new SegmentManager(segmentLoader);
+    exec = Execs.multiThreaded(NUM_THREAD, "SegmentManagerThreadSafetyTest-%d");
+    EmittingLogger.registerEmitter(new NoopServiceEmitter());
+  }
+
+  @After
+  public void teardown() throws IOException
+  {
+    exec.shutdownNow();
+    FileUtils.deleteDirectory(segmentCacheDir);
+  }
+
+  @Test(timeout = 5000L)
+  public void testLoadSameSegment() throws IOException, ExecutionException, InterruptedException
+  {
+    final DataSegment segment = createSegment("2019-01-01/2019-01-02");
+    final List<Future> futures = IntStream
+        .range(0, 16)
+        .mapToObj(i -> exec.submit(() -> segmentManager.loadSegment(segment)))
+        .collect(Collectors.toList());
+    for (Future future : futures) {
+      future.get();
+    }
+    Assert.assertEquals(1, segmentPuller.numFileLoaded.size());
+    Assert.assertEquals(1, segmentPuller.numFileLoaded.values().iterator().next().intValue());
+    Assert.assertEquals(0, segmentLoader.getSegmentLocks().size());
+  }
+
+  @Test(timeout = 5000L)
+  public void testLoadMultipleSegments() throws IOException, ExecutionException, InterruptedException
+  {
+    final List<DataSegment> segments = new ArrayList<>(88);
+    for (int i = 0; i < 11; i++) {
+      for (int j = 0; j < 8; j++) {
+        segments.add(createSegment(StringUtils.format("2019-%02d-01/2019-%02d-01", i + 1, i + 2)));
+      }
+    }
+
+    final List<Future> futures = IntStream
+        .range(0, 16)
+        .mapToObj(i -> exec.submit(() -> {
+          for (DataSegment segment : segments) {
+            try {
+              segmentManager.loadSegment(segment);
+            }
+            catch (SegmentLoadingException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        }))
+        .collect(Collectors.toList());
+    for (Future future : futures) {
+      future.get();
+    }
+    Assert.assertEquals(11, segmentPuller.numFileLoaded.size());
+    Assert.assertEquals(1, segmentPuller.numFileLoaded.values().iterator().next().intValue());
+    Assert.assertEquals(0, segmentLoader.getSegmentLocks().size());
+  }
+
+  private DataSegment createSegment(String interval) throws IOException
+  {
+    final DataSegment tmpSegment = new DataSegment(
+        "dataSource",
+        Intervals.of(interval),
+        "version",
+        Collections.emptyMap(),
+        Collections.emptyList(),
+        Collections.emptyList(),
+        new NumberedShardSpec(0, 0),
+        9,
+        100
+    );
+    final String storageDir = DataSegmentPusher.getDefaultStorageDir(tmpSegment, false);
+    final File segmentDir = new File(segmentDeepStorageDir, storageDir);
+    FileUtils.forceMkdir(segmentDir);
+
+    final File factoryJson = new File(segmentDir, "factory.json");
+    objectMapper.writeValue(factoryJson, new TestSegmentizerFactory());
+    return tmpSegment.withLoadSpec(
+        ImmutableMap.of("type", "local", "path", segmentDir.getAbsolutePath())
+    );
+  }
+
+  private static class TestSegmentPuller extends LocalDataSegmentPuller
+  {
+    private final Map<File, Integer> numFileLoaded = new HashMap<>();
+
+    @Override
+    public FileCopyResult getSegmentFiles(final File sourceFile, final File dir)
+    {
+      numFileLoaded.compute(sourceFile, (f, numLoaded) -> numLoaded == null ? 1 : numLoaded + 1);
+      try {
+        FileUtils.copyDirectory(sourceFile, dir);
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return new FileCopyResult()
+      {
+        @Override
+        public long size()
+        {
+          return 100L;
+        }
+      };
+    }
+  }
+
+  private static class TestSegmentizerFactory implements SegmentizerFactory
+  {
+    @Override
+    public Segment factorize(DataSegment segment, File parentDir)
+    {
+      return new Segment()
+      {
+        @Override
+        public SegmentId getId()
+        {
+          return segment.getId();
+        }
+
+        @Override
+        public Interval getDataInterval()
+        {
+          return segment.getInterval();
+        }
+
+        @Nullable
+        @Override
+        public QueryableIndex asQueryableIndex()
+        {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public StorageAdapter asStorageAdapter()
+        {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> T as(Class<T> clazz)
+        {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void close()
+        {
+
+        }
+      };
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index 6f227e4..f5455fb 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -65,15 +65,6 @@ public class SegmentLoadDropHandlerTest
   private static final Logger log = new Logger(ZkCoordinatorTest.class);
 
   private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
-  private final DruidServerMetadata me = new DruidServerMetadata(
-      "dummyServer",
-      "dummyHost",
-      null,
-      0,
-      ServerType.HISTORICAL,
-      "normal",
-      0
-  );
 
   private SegmentLoadDropHandler segmentLoadDropHandler;
   private DataSegmentAnnouncer announcer;
diff --git a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
index 52006f5..8debba1 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
@@ -51,10 +51,17 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -65,11 +72,12 @@ public class BatchDataSegmentAnnouncerTest
   private static final String testBasePath = "/test";
   private static final String testSegmentsPath = "/test/segments/id";
   private static final Joiner joiner = Joiner.on("/");
+  private static final int NUM_THREADS = 4;
 
   private TestingCluster testingCluster;
   private CuratorFramework cf;
   private ObjectMapper jsonMapper;
-  private Announcer announcer;
+  private TestAnnouncer announcer;
   private SegmentReader segmentReader;
   private BatchDataSegmentAnnouncer segmentAnnouncer;
   private Set<DataSegment> testSegments;
@@ -78,6 +86,7 @@ public class BatchDataSegmentAnnouncerTest
   private Boolean skipDimensionsAndMetrics;
   private Boolean skipLoadSpec;
 
+  private ExecutorService exec;
 
   @Before
   public void setUp() throws Exception
@@ -96,7 +105,7 @@ public class BatchDataSegmentAnnouncerTest
 
     jsonMapper = TestHelper.makeJsonMapper();
 
-    announcer = new Announcer(
+    announcer = new TestAnnouncer(
         cf,
         Execs.directExecutor()
     );
@@ -157,6 +166,8 @@ public class BatchDataSegmentAnnouncerTest
     for (int i = 0; i < 100; i++) {
       testSegments.add(makeSegment(i));
     }
+
+    exec = Execs.multiThreaded(NUM_THREADS, "BatchDataSegmentAnnouncerTest-%d");
   }
 
   @After
@@ -165,6 +176,7 @@ public class BatchDataSegmentAnnouncerTest
     announcer.stop();
     cf.close();
     testingCluster.stop();
+    exec.shutdownNow();
   }
 
   @Test
@@ -299,6 +311,14 @@ public class BatchDataSegmentAnnouncerTest
     testBatchAnnounce(true);
   }
 
+  @Test
+  public void testMultipleBatchAnnounce() throws Exception
+  {
+    for (int i = 0; i < 10; i++) {
+      testBatchAnnounce(false);
+    }
+  }
+
   private void testBatchAnnounce(boolean testHistory) throws Exception
   {
     segmentAnnouncer.announceSegments(testSegments);
@@ -342,11 +362,72 @@ public class BatchDataSegmentAnnouncerTest
     }
   }
 
-  @Test
-  public void testMultipleBatchAnnounce() throws Exception
+  @Test(timeout = 5000L)
+  public void testAnnounceSegmentsWithSameSegmentConcurrently() throws ExecutionException, InterruptedException
   {
-    for (int i = 0; i < 10; i++) {
-      testBatchAnnounce(false);
+    final List<Future> futures = new ArrayList<>(NUM_THREADS);
+
+    for (int i = 0; i < NUM_THREADS; i++) {
+      futures.add(
+          exec.submit(() -> {
+            try {
+              segmentAnnouncer.announceSegments(testSegments);
+            }
+            catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          })
+      );
+    }
+
+    for (Future future : futures) {
+      future.get();
+    }
+
+    // Announcing 100 segments requires 2 nodes because of maxBytesPerNode configuration.
+    Assert.assertEquals(2, announcer.numPathAnnounced.size());
+    for (ConcurrentHashMap<byte[], AtomicInteger> eachMap : announcer.numPathAnnounced.values()) {
+      for (Entry<byte[], AtomicInteger> entry : eachMap.entrySet()) {
+        Assert.assertEquals(1, entry.getValue().get());
+      }
+    }
+  }
+
+  @Test(timeout = 5000L)
+  public void testAnnounceSegmentWithSameSegmentConcurrently() throws ExecutionException, InterruptedException
+  {
+    final List<Future> futures = new ArrayList<>(NUM_THREADS);
+
+    final DataSegment segment1 = makeSegment(0);
+    final DataSegment segment2 = makeSegment(1);
+    final DataSegment segment3 = makeSegment(2);
+    final DataSegment segment4 = makeSegment(3);
+
+    for (int i = 0; i < NUM_THREADS; i++) {
+      futures.add(
+          exec.submit(() -> {
+            try {
+              segmentAnnouncer.announceSegment(segment1);
+              segmentAnnouncer.announceSegment(segment2);
+              segmentAnnouncer.announceSegment(segment3);
+              segmentAnnouncer.announceSegment(segment4);
+            }
+            catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          })
+      );
+    }
+
+    for (Future future : futures) {
+      future.get();
+    }
+
+    Assert.assertEquals(1, announcer.numPathAnnounced.size());
+    for (ConcurrentHashMap<byte[], AtomicInteger> eachMap : announcer.numPathAnnounced.values()) {
+      for (Entry<byte[], AtomicInteger> entry : eachMap.entrySet()) {
+        Assert.assertEquals(1, entry.getValue().get());
+      }
     }
   }
 
@@ -396,4 +477,21 @@ public class BatchDataSegmentAnnouncerTest
       return new HashSet<>();
     }
   }
+
+  private static class TestAnnouncer extends Announcer
+  {
+    private final ConcurrentHashMap<String, ConcurrentHashMap<byte[], AtomicInteger>> numPathAnnounced = new ConcurrentHashMap<>();
+
+    private TestAnnouncer(CuratorFramework curator, ExecutorService exec)
+    {
+      super(curator, exec);
+    }
+
+    @Override
+    public void announce(String path, byte[] bytes, boolean removeParentIfCreated)
+    {
+      numPathAnnounced.computeIfAbsent(path, k -> new ConcurrentHashMap<>()).computeIfAbsent(bytes, k -> new AtomicInteger(0)).incrementAndGet();
+      super.announce(path, bytes, removeParentIfCreated);
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
index 1b6ff3d..0f73695 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
@@ -20,12 +20,9 @@
 package org.apache.druid.server.coordinator;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.druid.discovery.DiscoveryDruidNode;
-import org.apache.druid.discovery.DruidNodeDiscovery;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -47,12 +44,10 @@ import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  */
@@ -172,29 +167,8 @@ public class HttpLoadQueuePeonTest
 
   }
 
-  private static class TestDruidNodeDiscovery implements DruidNodeDiscovery
-  {
-    Listener listener;
-
-    @Override
-    public Collection<DiscoveryDruidNode> getAllNodes()
-    {
-      throw new UnsupportedOperationException("Not Implemented.");
-    }
-
-    @Override
-    public void registerListener(Listener listener)
-    {
-      listener.nodesAdded(ImmutableList.of());
-      listener.nodeViewInitialized();
-      this.listener = listener;
-    }
-  }
-
   private static class TestHttpClient implements HttpClient
   {
-    AtomicInteger requestNum = new AtomicInteger(0);
-
     @Override
     public <Intermediate, Final> ListenableFuture<Final> go(
         Request request,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org