You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by hi...@apache.org on 2019/08/05 20:38:47 UTC

[incubator-druid] branch master updated: Add shuffleSegmentPusher for data shuffle (#8115)

This is an automated email from the ASF dual-hosted git repository.

himanshug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ab5b3be  Add shuffleSegmentPusher for data shuffle (#8115)
ab5b3be is described below

commit ab5b3be6c61d7be540f85385327a6a6489b363b1
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Mon Aug 5 13:38:35 2019 -0700

    Add shuffleSegmentPusher for data shuffle (#8115)
    
    * Fix race between canHandle() and addSegment() in StorageLocation
    
    * add comment
    
    * Add shuffleSegmentPusher which is a dataSegmentPusher used for writing shuffle data in local storage.
    
    * add comments
    
    * unused import
    
    * add comments
    
    * fix test
    
    * address comments
    
    * remove <p> tag from javadoc
    
    * address comments
    
    * comparingLong
    
    * Address comments
    
    * fix test
---
 .../druid/indexing/common/config/TaskConfig.java   |   5 +
 .../indexing/worker/IntermediaryDataManager.java   | 166 +++++++++++++--------
 .../indexing/worker/ShuffleDataSegmentPusher.java  |  77 ++++++++++
 .../IntermediaryDataManagerAutoCleanupTest.java    |  15 +-
 ...ermediaryDataManagerManualAddAndDeleteTest.java |  47 +++---
 .../worker/ShuffleDataSegmentPusherTest.java       | 138 +++++++++++++++++
 .../druid/segment/loading/StorageLocation.java     |   7 +-
 ...mentLoaderLocalCacheManagerConcurrencyTest.java |   2 -
 .../druid/segment/loading/StorageLocationTest.java |  14 +-
 9 files changed, 366 insertions(+), 105 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
index 31405fe..52bf083 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
@@ -126,6 +126,11 @@ public class TaskConfig
     return new File(getTaskDir(taskId), "work");
   }
 
+  public File getTaskTempDir(String taskId)
+  {
+    return new File(getTaskDir(taskId), "temp");
+  }
+
   public File getTaskLockFile(String taskId)
   {
     return new File(getTaskDir(taskId), "lock");
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
index c47425a..95acb22 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
@@ -33,11 +33,13 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IOE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.loading.StorageLocation;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.utils.CompressionUtils;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.joda.time.Period;
@@ -45,6 +47,7 @@ import org.joda.time.Period;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
@@ -57,12 +60,14 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * This class manages intermediary segments for data shuffle between native parallel index tasks.
- * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers
+ * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers (or indexer)
  * and phase 2 tasks read those files via HTTP.
  *
  * The directory where segment files are placed is structured as
@@ -75,11 +80,12 @@ import java.util.stream.Collectors;
 @ManageLifecycle
 public class IntermediaryDataManager
 {
-  private static final Logger log = new Logger(IntermediaryDataManager.class);
+  private static final Logger LOG = new Logger(IntermediaryDataManager.class);
 
   private final long intermediaryPartitionDiscoveryPeriodSec;
   private final long intermediaryPartitionCleanupPeriodSec;
   private final Period intermediaryPartitionTimeout;
+  private final TaskConfig taskConfig;
   private final List<StorageLocation> shuffleDataLocations;
   private final IndexingServiceClient indexingServiceClient;
 
@@ -108,6 +114,7 @@ public class IntermediaryDataManager
     this.intermediaryPartitionDiscoveryPeriodSec = workerConfig.getIntermediaryPartitionDiscoveryPeriodSec();
     this.intermediaryPartitionCleanupPeriodSec = workerConfig.getIntermediaryPartitionCleanupPeriodSec();
     this.intermediaryPartitionTimeout = workerConfig.getIntermediaryPartitionTimeout();
+    this.taskConfig = taskConfig;
     this.shuffleDataLocations = taskConfig
         .getShuffleDataLocations()
         .stream()
@@ -119,6 +126,7 @@ public class IntermediaryDataManager
   @LifecycleStart
   public void start()
   {
+    discoverSupervisorTaskPartitions();
     supervisorTaskChecker = Execs.scheduledSingleThreaded("intermediary-data-manager-%d");
     // Discover partitions for new supervisorTasks
     supervisorTaskChecker.scheduleAtFixedRate(
@@ -127,7 +135,7 @@ public class IntermediaryDataManager
             discoverSupervisorTaskPartitions();
           }
           catch (Exception e) {
-            log.warn(e, "Error while discovering supervisorTasks");
+            LOG.warn(e, "Error while discovering supervisorTasks");
           }
         },
         intermediaryPartitionDiscoveryPeriodSec,
@@ -141,10 +149,10 @@ public class IntermediaryDataManager
             deleteExpiredSuprevisorTaskPartitionsIfNotRunning();
           }
           catch (InterruptedException e) {
-            log.error(e, "Error while cleaning up partitions for expired supervisors");
+            LOG.error(e, "Error while cleaning up partitions for expired supervisors");
           }
           catch (Exception e) {
-            log.warn(e, "Error while cleaning up partitions for expired supervisors");
+            LOG.warn(e, "Error while cleaning up partitions for expired supervisors");
           }
         },
         intermediaryPartitionCleanupPeriodSec,
@@ -163,9 +171,13 @@ public class IntermediaryDataManager
     supervisorTaskCheckTimes.clear();
   }
 
+  /**
+   * IntermediaryDataManager periodically calls this method after it starts up to search for unknown intermediary data.
+   */
   private void discoverSupervisorTaskPartitions()
   {
     for (StorageLocation location : shuffleDataLocations) {
+      final Path locationPath = location.getPath().toPath().toAbsolutePath();
       final MutableInt numDiscovered = new MutableInt(0);
       final File[] dirsPerSupervisorTask = location.getPath().listFiles();
       if (dirsPerSupervisorTask != null) {
@@ -174,13 +186,32 @@ public class IntermediaryDataManager
           supervisorTaskCheckTimes.computeIfAbsent(
               supervisorTaskId,
               k -> {
+                for (File eachFile : FileUtils.listFiles(supervisorTaskDir, null, true)) {
+                  final String relativeSegmentPath = locationPath
+                      .relativize(eachFile.toPath().toAbsolutePath())
+                      .toString();
+                  // StorageLocation keeps track of how much storage capacity is being used.
+                  // Newly found files should be known to the StorageLocation to keep it up to date.
+                  final File reservedFile = location.reserve(
+                      relativeSegmentPath,
+                      eachFile.getName(),
+                      eachFile.length()
+                  );
+                  if (reservedFile == null) {
+                    LOG.warn("Can't add a discovered partition[%s]", eachFile.getAbsolutePath());
+                  }
+                }
                 numDiscovered.increment();
                 return DateTimes.nowUtc().plus(intermediaryPartitionTimeout);
               }
           );
         }
       }
-      log.info("Discovered partitions for [%s] new supervisor tasks", numDiscovered.getValue());
+      LOG.info(
+          "Discovered partitions for [%s] new supervisor tasks under location[%s]",
+          numDiscovered.getValue(),
+          location.getPath()
+      );
     }
   }
 
@@ -203,7 +234,7 @@ public class IntermediaryDataManager
       }
     }
 
-    log.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size());
+    LOG.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size());
 
     final Map<String, TaskStatus> taskStatuses = indexingServiceClient.getTaskStatuses(expiredSupervisorTasks);
     for (Entry<String, TaskStatus> entry : taskStatuses.entrySet()) {
@@ -215,7 +246,7 @@ public class IntermediaryDataManager
           deletePartitions(supervisorTaskId);
         }
         catch (IOException e) {
-          log.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId);
+          LOG.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId);
         }
       } else {
         // If it's still running, update last access time.
@@ -227,17 +258,74 @@ public class IntermediaryDataManager
   /**
    * Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per
    * supervisorTaskId.
-   *
-   * This method is only useful for the new Indexer model. Tasks running in the existing middleManager should the static
-   * addSegment method.
    */
-  public void addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentFile)
+  long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir)
+      throws IOException
   {
+    // Get or create the location iterator for supervisorTask.
     final Iterator<StorageLocation> iterator = locationIterators.computeIfAbsent(
         supervisorTaskId,
-        k -> Iterators.cycle(shuffleDataLocations)
+        k -> {
+          final Iterator<StorageLocation> cyclicIterator = Iterators.cycle(shuffleDataLocations);
+          // Random start of the iterator
+          final int random = ThreadLocalRandom.current().nextInt(shuffleDataLocations.size());
+          IntStream.range(0, random).forEach(i -> cyclicIterator.next());
+          return cyclicIterator;
+        }
     );
-    addSegment(iterator, shuffleDataLocations.size(), supervisorTaskId, subTaskId, segment, segmentFile);
+
+    // Create a zipped segment in a temp directory.
+    final File taskTempDir = taskConfig.getTaskTempDir(subTaskId);
+
+    try (final Closer resourceCloser = Closer.create()) {
+      if (taskTempDir.mkdirs()) {
+        resourceCloser.register(() -> {
+          try {
+            FileUtils.forceDelete(taskTempDir);
+          }
+          catch (IOException e) {
+            LOG.warn(e, "Failed to delete directory[%s]", taskTempDir.getAbsolutePath());
+          }
+        });
+      }
+
+      // Tempary compressed file. Will be removed when taskTempDir is deleted.
+      final File tempZippedFile = new File(taskTempDir, segment.getId().toString());
+      final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, tempZippedFile);
+      if (unzippedSizeBytes == 0) {
+        throw new IOE(
+            "Read 0 bytes from segmentDir[%s]",
+            segmentDir.getAbsolutePath()
+        );
+      }
+
+      // Try copying the zipped segment to one of storage locations
+      for (int i = 0; i < shuffleDataLocations.size(); i++) {
+        final StorageLocation location = iterator.next();
+        final String partitionFilePath = getPartitionFilePath(
+            supervisorTaskId,
+            subTaskId,
+            segment.getInterval(),
+            segment.getShardSpec().getPartitionNum()
+        );
+        final File destFile = location.reserve(partitionFilePath, segment.getId().toString(), tempZippedFile.length());
+        if (destFile != null) {
+          FileUtils.forceMkdirParent(destFile);
+          org.apache.druid.java.util.common.FileUtils.writeAtomically(
+              destFile,
+              out -> Files.asByteSource(tempZippedFile).copyTo(out)
+          );
+          LOG.info(
+              "Wrote intermediary segment for segment[%s] of subtask[%s] at [%s]",
+              segment.getId(),
+              subTaskId,
+              destFile
+          );
+          return unzippedSizeBytes;
+        }
+      }
+      throw new ISE("Can't find location to handle segment[%s]", segment);
+    }
   }
 
   public List<File> findPartitionFiles(String supervisorTaskId, Interval interval, int partitionId)
@@ -259,7 +347,7 @@ public class IntermediaryDataManager
     for (StorageLocation location : shuffleDataLocations) {
       final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId);
       if (supervisorTaskPath.exists()) {
-        log.info("Cleaning up [%s]", supervisorTaskPath);
+        LOG.info("Cleaning up [%s]", supervisorTaskPath);
         for (File eachFile : FileUtils.listFiles(supervisorTaskPath, null, true)) {
           location.removeFile(eachFile);
         }
@@ -269,54 +357,6 @@ public class IntermediaryDataManager
     supervisorTaskCheckTimes.remove(supervisorTaskId);
   }
 
-  /**
-   * Iterate through the given storage locations to find one which can handle the given segment.
-   */
-  public static void addSegment(
-      Iterator<StorageLocation> cyclicIterator,
-      int numLocations,
-      String supervisorTaskId,
-      String subTaskId,
-      DataSegment segment,
-      File segmentFile
-  )
-  {
-    for (int i = 0; i < numLocations; i++) {
-      final StorageLocation location = cyclicIterator.next();
-      final File destFile = location.reserve(
-          getPartitionFilePath(
-              supervisorTaskId,
-              subTaskId,
-              segment.getInterval(),
-              segment.getShardSpec().getPartitionNum()
-          ),
-          segment.getId(),
-          segmentFile.length()
-      );
-      if (destFile != null) {
-        try {
-          FileUtils.forceMkdirParent(destFile);
-          final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile));
-          if (copiedBytes == 0) {
-            throw new IOE(
-                "0 bytes copied after copying a segment file from [%s] to [%s]",
-                segmentFile.getAbsolutePath(),
-                destFile.getAbsolutePath()
-            );
-          } else {
-            return;
-          }
-        }
-        catch (IOException e) {
-          // Only log here to try other locations as well.
-          log.warn(e, "Failed to write segmentFile at [%s]", destFile);
-          location.removeFile(segmentFile);
-        }
-      }
-    }
-    throw new ISE("Can't find location to handle segment[%s]", segment);
-  }
-
   private static String getPartitionFilePath(
       String supervisorTaskId,
       String subTaskId,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java
new file mode 100644
index 0000000..fcbdf9d
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker;
+
+import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.timeline.DataSegment;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * DataSegmentPusher used for storing intermediary data in local storage during data shuffle of native parallel
+ * indexing.
+ */
+public class ShuffleDataSegmentPusher implements DataSegmentPusher
+{
+  private final String supervisorTaskId;
+  private final String subTaskId;
+  private final IntermediaryDataManager intermediaryDataManager;
+
+  public ShuffleDataSegmentPusher(
+      String supervisorTaskId,
+      String subTaskId,
+      IntermediaryDataManager intermediaryDataManager
+  )
+  {
+    this.supervisorTaskId = supervisorTaskId;
+    this.subTaskId = subTaskId;
+    this.intermediaryDataManager = intermediaryDataManager;
+  }
+
+  @Override
+  public String getPathForHadoop(String dataSource)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String getPathForHadoop()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException
+  {
+    final long unzippedSize = intermediaryDataManager.addSegment(supervisorTaskId, subTaskId, segment, file);
+    return segment.withSize(unzippedSize)
+                  .withBinaryVersion(SegmentUtils.getVersionFromDir(file));
+  }
+
+  @Override
+  public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
+  {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java
index c158c0e..6a5f611 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.indexing.worker;
 
-import com.amazonaws.util.StringUtils;
 import com.google.common.collect.ImmutableList;
 import org.apache.commons.io.FileUtils;
 import org.apache.druid.client.indexing.IndexingServiceClient;
@@ -43,6 +42,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -116,19 +116,20 @@ public class IntermediaryDataManagerAutoCleanupTest
   {
     final String supervisorTaskId = "supervisorTaskId";
     final Interval interval = Intervals.of("2018/2019");
-    final File segmentFile = generateSegmentFile();
+    final File segmentFile = generateSegmentDir("test");
     final DataSegment segment = newSegment(interval, 0);
     intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile);
 
-    Thread.sleep(8000);
+    Thread.sleep(3000);
     Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, 0).isEmpty());
   }
 
-  private File generateSegmentFile() throws IOException
+  private File generateSegmentDir(String fileName) throws IOException
   {
-    final File segmentFile = tempDir.newFile();
-    FileUtils.write(segmentFile, "test data.", StringUtils.UTF8);
-    return segmentFile;
+    // Each file size is 138 bytes after compression
+    final File segmentDir = tempDir.newFolder();
+    FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8);
+    return segmentDir;
   }
 
   private DataSegment newSegment(Interval interval, int partitionId)
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
index 7bd9e90..7d322d0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.indexing.worker;
 
-import com.amazonaws.util.StringUtils;
 import com.google.common.collect.ImmutableList;
 import org.apache.commons.io.FileUtils;
 import org.apache.druid.client.indexing.IndexingServiceClient;
@@ -41,6 +40,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.Comparator;
 import java.util.List;
 
@@ -67,7 +67,7 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
         false,
         null,
         null,
-        ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 150L, null))
+        ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 600L, null))
     );
     final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
     intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
@@ -83,15 +83,16 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
   @Test
   public void testAddSegmentFailure() throws IOException
   {
-    for (int i = 0; i < 15; i++) {
-      File segmentFile = generateSegmentFile();
+    int i = 0;
+    for (; i < 4; i++) {
+      File segmentFile = generateSegmentDir("file_" + i);
       DataSegment segment = newSegment(Intervals.of("2018/2019"), i);
       intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
     }
     expectedException.expect(IllegalStateException.class);
     expectedException.expectMessage("Can't find location to handle segment");
-    File segmentFile = generateSegmentFile();
-    DataSegment segment = newSegment(Intervals.of("2018/2019"), 16);
+    File segmentFile = generateSegmentDir("file_" + i);
+    DataSegment segment = newSegment(Intervals.of("2018/2019"), 4);
     intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
   }
 
@@ -101,15 +102,15 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
     final String supervisorTaskId = "supervisorTaskId";
     final Interval interval = Intervals.of("2018/2019");
     final int partitionId = 0;
-    for (int i = 0; i < 10; i++) {
-      final File segmentFile = generateSegmentFile();
+    for (int i = 0; i < 4; i++) {
+      final File segmentFile = generateSegmentDir("file_" + i);
       final DataSegment segment = newSegment(interval, partitionId);
       intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + i, segment, segmentFile);
     }
     final List<File> files = intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId);
-    Assert.assertEquals(10, files.size());
+    Assert.assertEquals(4, files.size());
     files.sort(Comparator.comparing(File::getName));
-    for (int i = 0; i < 10; i++) {
+    for (int i = 0; i < 4; i++) {
       Assert.assertEquals("subTaskId_" + i, files.get(i).getName());
     }
   }
@@ -119,9 +120,9 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
   {
     final String supervisorTaskId = "supervisorTaskId";
     final Interval interval = Intervals.of("2018/2019");
-    for (int partitionId = 0; partitionId < 5; partitionId++) {
-      for (int subTaskId = 0; subTaskId < 3; subTaskId++) {
-        final File segmentFile = generateSegmentFile();
+    for (int partitionId = 0; partitionId < 2; partitionId++) {
+      for (int subTaskId = 0; subTaskId < 2; subTaskId++) {
+        final File segmentFile = generateSegmentDir("file_" + partitionId + "_" + subTaskId);
         final DataSegment segment = newSegment(interval, partitionId);
         intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + subTaskId, segment, segmentFile);
       }
@@ -129,7 +130,7 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
 
     intermediaryDataManager.deletePartitions(supervisorTaskId);
 
-    for (int partitionId = 0; partitionId < 5; partitionId++) {
+    for (int partitionId = 0; partitionId < 2; partitionId++) {
       Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId).isEmpty());
     }
   }
@@ -139,22 +140,24 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
   {
     final String supervisorTaskId = "supervisorTaskId";
     final Interval interval = Intervals.of("2018/2019");
-    for (int i = 0; i < 15; i++) {
-      File segmentFile = generateSegmentFile();
+    int i = 0;
+    for (; i < 4; i++) {
+      File segmentFile = generateSegmentDir("file_" + i);
       DataSegment segment = newSegment(interval, i);
       intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
     }
     intermediaryDataManager.deletePartitions(supervisorTaskId);
-    File segmentFile = generateSegmentFile();
-    DataSegment segment = newSegment(interval, 16);
+    File segmentFile = generateSegmentDir("file_" + i);
+    DataSegment segment = newSegment(interval, i);
     intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile);
   }
 
-  private File generateSegmentFile() throws IOException
+  private File generateSegmentDir(String fileName) throws IOException
   {
-    final File segmentFile = tempDir.newFile();
-    FileUtils.write(segmentFile, "test data.", StringUtils.UTF8);
-    return segmentFile;
+    // Each file size is 138 bytes after compression
+    final File segmentDir = tempDir.newFolder();
+    FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8);
+    return segmentDir;
   }
 
   private DataSegment newSegment(Interval interval, int partitionId)
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java
new file mode 100644
index 0000000..c3dfcdd
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+import com.google.common.primitives.Ints;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.client.indexing.NoopIndexingServiceClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.FileUtils.FileCopyResult;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.utils.CompressionUtils;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+public class ShuffleDataSegmentPusherTest
+{
+  @Rule
+  public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private IntermediaryDataManager intermediaryDataManager;
+  private ShuffleDataSegmentPusher segmentPusher;
+
+  @Before
+  public void setup() throws IOException
+  {
+    final WorkerConfig workerConfig = new WorkerConfig();
+    final TaskConfig taskConfig = new TaskConfig(
+        null,
+        null,
+        null,
+        null,
+        null,
+        false,
+        null,
+        null,
+        ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null))
+    );
+    final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
+    intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
+    intermediaryDataManager.start();
+    segmentPusher = new ShuffleDataSegmentPusher("supervisorTaskId", "subTaskId", intermediaryDataManager);
+  }
+
+  @After
+  public void teardown() throws InterruptedException
+  {
+    intermediaryDataManager.stop();
+  }
+
+  @Test
+  public void testPush() throws IOException
+  {
+    final File segmentDir = generateSegmentDir();
+    final DataSegment segment = newSegment(Intervals.of("2018/2019"));
+    final DataSegment pushed = segmentPusher.push(segmentDir, segment, true);
+
+    Assert.assertEquals(9, pushed.getBinaryVersion().intValue());
+    Assert.assertEquals(14, pushed.getSize()); // 10 bytes data + 4 bytes version
+
+    final List<File> files = intermediaryDataManager.findPartitionFiles(
+        "supervisorTaskId",
+        segment.getInterval(),
+        segment.getShardSpec().getPartitionNum()
+    );
+    Assert.assertEquals(1, files.size());
+    final File zippedSegment = files.get(0);
+    final File tempDir = temporaryFolder.newFolder();
+    final FileCopyResult result = CompressionUtils.unzip(zippedSegment, tempDir);
+    final List<File> unzippedFiles = new ArrayList<>(result.getFiles());
+    unzippedFiles.sort(Comparator.comparing(File::getName));
+    final File dataFile = unzippedFiles.get(0);
+    Assert.assertEquals("test", dataFile.getName());
+    Assert.assertEquals("test data.", Files.readFirstLine(dataFile, StandardCharsets.UTF_8));
+    final File versionFile = unzippedFiles.get(1);
+    Assert.assertEquals("version.bin", versionFile.getName());
+    Assert.assertArrayEquals(Ints.toByteArray(0x9), Files.toByteArray(versionFile));
+  }
+
+  private File generateSegmentDir() throws IOException
+  {
+    // Each file size is 138 bytes after compression
+    final File segmentDir = temporaryFolder.newFolder();
+    Files.asByteSink(new File(segmentDir, "version.bin")).write(Ints.toByteArray(0x9));
+    FileUtils.write(new File(segmentDir, "test"), "test data.", StandardCharsets.UTF_8);
+    return segmentDir;
+  }
+
+  private DataSegment newSegment(Interval interval)
+  {
+    return new DataSegment(
+        "dataSource",
+        interval,
+        "version",
+        null,
+        null,
+        null,
+        new NumberedShardSpec(0, 0),
+        9,
+        0
+    );
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
index 842295e..6ed86a3 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
@@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.io.FileUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.SegmentId;
 
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
@@ -115,7 +114,7 @@ public class StorageLocation
   @Nullable
   public synchronized File reserve(String segmentDir, DataSegment segment)
   {
-    return reserve(segmentDir, segment.getId(), segment.getSize());
+    return reserve(segmentDir, segment.getId().toString(), segment.getSize());
   }
 
   /**
@@ -124,7 +123,7 @@ public class StorageLocation
    * Returns null otherwise.
    */
   @Nullable
-  public synchronized File reserve(String segmentFilePathToAdd, SegmentId segmentId, long segmentSize)
+  public synchronized File reserve(String segmentFilePathToAdd, String segmentId, long segmentSize)
   {
     final File segmentFileToAdd = new File(path, segmentFilePathToAdd);
     if (files.contains(segmentFileToAdd)) {
@@ -145,7 +144,7 @@ public class StorageLocation
    */
   @VisibleForTesting
   @GuardedBy("this")
-  boolean canHandle(SegmentId segmentId, long segmentSize)
+  boolean canHandle(String segmentId, long segmentSize)
   {
     if (availableSizeBytes() < segmentSize) {
       log.warn(
diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java
index cb08217..f617261 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java
@@ -153,8 +153,6 @@ public class SegmentLoaderLocalCacheManagerConcurrencyTest
     for (Future future : futures) {
       future.get();
     }
-
-    System.out.println(manager.getLocations().get(0).availableSizeBytes());
   }
 
   private DataSegment newSegment(Interval interval, int partitionId)
diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
index 23da422..7be5328 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
@@ -39,18 +39,18 @@ public class StorageLocationTest
   {
     // free space ignored only maxSize matters
     StorageLocation locationPlain = fakeLocation(100_000, 5_000, 10_000, null);
-    Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013"), 9_000));
-    Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013"), 11_000));
+    Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 9_000));
+    Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 11_000));
 
     // enough space available maxSize is the limit
     StorageLocation locationFree = fakeLocation(100_000, 25_000, 10_000, 10.0);
-    Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013"), 9_000));
-    Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013"), 11_000));
+    Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013").toString(), 9_000));
+    Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013").toString(), 11_000));
 
     // disk almost full percentage is the limit
     StorageLocation locationFull = fakeLocation(100_000, 15_000, 10_000, 10.0);
-    Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013"), 4_000));
-    Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013"), 6_000));
+    Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013").toString(), 4_000));
+    Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013").toString(), 6_000));
   }
 
   private StorageLocation fakeLocation(long total, long free, long max, Double percent)
@@ -99,7 +99,7 @@ public class StorageLocationTest
   {
     Assert.assertEquals(maxSize, loc.availableSizeBytes());
     for (int i = 0; i <= maxSize; ++i) {
-      Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014"), i));
+      Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014").toString(), i));
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org