You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/07/29 04:26:41 UTC

[GitHub] [druid] jihoonson commented on a change in pull request #11507: option to use deep storage for storing shuffle data

jihoonson commented on a change in pull request #11507:
URL: https://github.com/apache/druid/pull/11507#discussion_r678800376



##########
File path: extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssDataSegmentPusher.java
##########
@@ -78,15 +78,19 @@ public DataSegment push(final File indexFilesDir, final DataSegment inSegment, f
       throws IOException
   {
     final String path = OssUtils.constructSegmentPath(config.getPrefix(), getStorageDir(inSegment, useUniquePath));
-
     log.debug("Copying segment[%s] to OSS at location[%s]", inSegment.getId(), path);
+    return pushToPath(indexFilesDir, inSegment, path);
+  }
 
+  @Override
+  public DataSegment pushToPath(File indexFilesDir, DataSegment inSegment, String path) throws IOException
+  {
     final File zipOutFile = File.createTempFile("druid", "index.zip");
     final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
 
     final DataSegment outSegment = inSegment.withSize(indexSize)
-                                            .withLoadSpec(makeLoadSpec(config.getBucket(), path))
-                                            .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
+        .withLoadSpec(makeLoadSpec(config.getBucket(), path))
+        .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));

Review comment:
       These changes don't seem like Druid code style. Are you using https://github.com/apache/druid/blob/master/dev/druid_intellij_formatting.xml?

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
##########
@@ -53,7 +56,7 @@
    *
    * @return size of the writen segment

Review comment:
       This method no longer returns the size written. Can you fix the javadoc as well?

##########
File path: extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssDataSegmentPusher.java
##########
@@ -78,15 +78,19 @@ public DataSegment push(final File indexFilesDir, final DataSegment inSegment, f
       throws IOException
   {
     final String path = OssUtils.constructSegmentPath(config.getPrefix(), getStorageDir(inSegment, useUniquePath));
-
     log.debug("Copying segment[%s] to OSS at location[%s]", inSegment.getId(), path);
+    return pushToPath(indexFilesDir, inSegment, path);
+  }
 
+  @Override
+  public DataSegment pushToPath(File indexFilesDir, DataSegment inSegment, String path) throws IOException
+  {
     final File zipOutFile = File.createTempFile("druid", "index.zip");
     final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
 
     final DataSegment outSegment = inSegment.withSize(indexSize)
-                                            .withLoadSpec(makeLoadSpec(config.getBucket(), path))
-                                            .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
+        .withLoadSpec(makeLoadSpec(config.getBucket(), path))
+        .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));

Review comment:
       Please fix the indentation in here and other places.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.common.io.ByteSource;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.task.batch.parallel.DeepStoragePartitionStat;
+import org.apache.druid.indexing.common.task.batch.parallel.GenericPartitionStat;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Optional;
+
+public class DeepStorageIntermediaryDataManager implements IntermediaryDataManager
+{
+  public static final String SHUFFLE_DATA_DIR_PREFIX = "shuffle-data";
+  private final DataSegmentPusher dataSegmentPusher;
+
+  @Override
+  public DataSegment addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir)
+      throws IOException
+  {
+    final BucketNumberedShardSpec<?> bucketNumberedShardSpec = (BucketNumberedShardSpec<?>) segment.getShardSpec();
+    final String partitionFilePath = getPartitionFilePath(
+        supervisorTaskId,
+        subTaskId,
+        segment.getInterval(),
+        bucketNumberedShardSpec.getBucketId() // we must use the bucket ID instead of partition ID
+    );
+    return dataSegmentPusher.pushToPath(segmentDir, segment, SHUFFLE_DATA_DIR_PREFIX + "/" + partitionFilePath);
+  }
+
+  @Inject
+  public DeepStorageIntermediaryDataManager(DataSegmentPusher dataSegmentPusher)
+  {
+    this.dataSegmentPusher = dataSegmentPusher;
+  }
+
+  @Override
+  public GenericPartitionStat generatePartitionStat(TaskToolbox toolbox, DataSegment segment)
+  {
+    return new DeepStoragePartitionStat(
+        toolbox.getTaskExecutorNode().getHost(),
+        toolbox.getTaskExecutorNode().getPortToUse(),
+        toolbox.getTaskExecutorNode().isEnableTlsPort(),
+        segment.getInterval(),
+        (BucketNumberedShardSpec) segment.getShardSpec(),
+        null, // numRows is not supported yet
+        null, // sizeBytes is not supported yet
+        segment.getLoadSpec()
+    );
+  }
+
+  @Nullable
+  @Override
+  public Optional<ByteSource> findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int bucketId)
+  {
+    return Optional.empty();

Review comment:
       I guess this should be never called? It would be better to throw an exception then.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionLocation.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.timeline.partition.BuildingShardSpec;
+import org.joda.time.Interval;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class DeepStoragePartitionLocation extends GenericPartitionLocation

Review comment:
       Seems like only `subTaskId` and `loadSpec` are in use in this class which confuses me why this class needs other fields. I suggest to add a new interface for `PartitionLocation` and let `DeepStoragePartitionLocation` implement it instead of extending `GenericPartitionLocation` because `GenericPartitionLocation` is designed for local storage for shuffle. Since we already have an abstract class of the same name of `PartitionLocation`, you will have to rename it or use another name for the new interface.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStat.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
+import org.apache.druid.timeline.partition.BuildingShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Objects;
+
+public class DeepStoragePartitionStat extends GenericPartitionStat
+{
+  private final Map<String, Object> loadSpec;
+
+  @JsonCreator
+  public DeepStoragePartitionStat(
+      @JsonProperty("taskExecutorHost") String taskExecutorHost,
+      @JsonProperty("taskExecutorPort") int taskExecutorPort,
+      @JsonProperty("useHttps") boolean useHttps,

Review comment:
       Similarly, I suggest to add a new interface for `PartitionStat` and rename the existing abstract class of `PartitionStat` to something else. Then this class can implement the new `PartitionStat` interface and keep only necessary fields. I think we will probably never need `useHttps`. For `taskExecutorHost` and `taskExecutorPort`, maybe they could be useful in the future if we want to collect per-task shuffle metrics in the supervisor task. But, as I'm not sure what we would want exactly yet, I would suggest remove them and keep only necessary fields for now.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
##########
@@ -74,4 +77,30 @@
    *
    */
   void deletePartitions(String supervisorTaskId) throws IOException;
+
+  GenericPartitionStat generatePartitionStat(TaskToolbox toolbox, DataSegment segment);
+
+  default String getPartitionFilePath(
+      String supervisorTaskId,
+      String subTaskId,
+      Interval interval,
+      int bucketId
+  )
+  {
+    return Paths.get(getPartitionDir(supervisorTaskId, interval, bucketId), subTaskId).toString();
+  }
+
+  default String getPartitionDir(

Review comment:
       nit: maybe `getPartitionDirPath()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org