You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/04/07 16:47:34 UTC

[druid] branch master updated: check paths used for shuffle intermediary data manager get and delete (#9630)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d267b1c  check paths used for shuffle intermediary data manager get and delete (#9630)
d267b1c is described below

commit d267b1c414b9b55b129729692755273d2a35e304
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Tue Apr 7 09:47:18 2020 -0700

    check paths used for shuffle intermediary data manager get and delete (#9630)
    
    * check paths used for shuffle intermediary data manager get and delete
    
    * add test
    
    * newline
    
    * meh
---
 .../java/org/apache/druid/indexer/TaskIdUtils.java |  63 ++++++++++++
 .../org/apache/druid/indexer/TaskIdUtilsTest.java  | 111 +++++++++++++++++++++
 .../druid/indexing/kafka/KafkaConsumerConfigs.java |   4 +-
 .../indexing/kafka/supervisor/KafkaSupervisor.java |   4 +-
 .../kinesis/supervisor/KinesisSupervisor.java      |   4 +-
 .../druid/indexing/common/task/AbstractTask.java   |   4 +-
 .../indexing/common/task/utils/RandomIdUtils.java  |  34 -------
 .../supervisor/SeekableStreamSupervisor.java       |   6 +-
 .../indexing/worker/IntermediaryDataManager.java   |   3 +
 ...ermediaryDataManagerManualAddAndDeleteTest.java |  64 +++++++++++-
 .../tests/indexer/AbstractKafkaIndexerTest.java    |   4 +-
 .../apache/druid/segment/indexing/DataSchema.java  |  14 +--
 .../druid/segment/indexing/DataSchemaTest.java     |  28 +++---
 13 files changed, 268 insertions(+), 75 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java
new file mode 100644
index 0000000..a88341b
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class TaskIdUtils
+{
+  private static final Pattern INVALIDCHARS = Pattern.compile("(?s).*[^\\S ].*");
+
+  public static void validateId(String thingToValidate, String stringToValidate)
+  {
+    Preconditions.checkArgument(
+        !Strings.isNullOrEmpty(stringToValidate),
+        StringUtils.format("%s cannot be null or empty. Please provide a %s.", thingToValidate, thingToValidate)
+    );
+    Preconditions.checkArgument(
+        !stringToValidate.startsWith("."),
+        StringUtils.format("%s cannot start with the '.' character.", thingToValidate)
+    );
+    Preconditions.checkArgument(
+        !stringToValidate.contains("/"),
+        StringUtils.format("%s cannot contain the '/' character.", thingToValidate)
+    );
+    Matcher m = INVALIDCHARS.matcher(stringToValidate);
+    Preconditions.checkArgument(
+        !m.matches(),
+        StringUtils.format("%s cannot contain whitespace character except space.", thingToValidate)
+    );
+  }
+
+  public static String getRandomId()
+  {
+    final StringBuilder suffix = new StringBuilder(8);
+    for (int i = 0; i < Integer.BYTES * 2; ++i) {
+      suffix.append((char) ('a' + ((ThreadLocalRandom.current().nextInt() >>> (i * 4)) & 0x0F)));
+    }
+    return suffix.toString();
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/indexer/TaskIdUtilsTest.java b/core/src/test/java/org/apache/druid/indexer/TaskIdUtilsTest.java
new file mode 100644
index 0000000..5fed8fb
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/indexer/TaskIdUtilsTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TaskIdUtilsTest
+{
+  private static final String THINGO = "thingToValidate";
+  public static final String VALID_ID_CHARS = "alpha123..*~!@#&%^&*()-+ Россия\\ 한국 中国!";
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testValidIdName()
+  {
+    TaskIdUtils.validateId(THINGO, VALID_ID_CHARS);
+  }
+
+  @Test
+  public void testInvalidNull()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot be null or empty. Please provide a thingToValidate.");
+    TaskIdUtils.validateId(THINGO, null);
+  }
+
+  @Test
+  public void testInvalidEmpty()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot be null or empty. Please provide a thingToValidate.");
+    TaskIdUtils.validateId(THINGO, "");
+  }
+
+  @Test
+  public void testInvalidSlashes()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot contain the '/' character.");
+    TaskIdUtils.validateId(THINGO, "/paths/are/bad/since/we/make/files/from/stuff");
+  }
+
+  @Test
+  public void testInvalidLeadingDot()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot start with the '.' character.");
+    TaskIdUtils.validateId(THINGO, "./nice/try");
+  }
+
+  @Test
+  public void testInvalidSpacesRegexTabs()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot contain whitespace character except space.");
+    TaskIdUtils.validateId(THINGO, "spaces\tare\tbetter\tthan\ttabs\twhich\tare\tillegal");
+  }
+
+  @Test
+  public void testInvalidSpacesRegexNewline()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot contain whitespace character except space.");
+    TaskIdUtils.validateId(THINGO, "new\nline");
+  }
+
+  @Test
+  public void testInvalidSpacesRegexCarriageReturn()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot contain whitespace character except space.");
+    TaskIdUtils.validateId(THINGO, "does\rexist\rby\ritself");
+  }
+
+  @Test
+  public void testInvalidSpacesRegexLineTabulation()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot contain whitespace character except space.");
+    TaskIdUtils.validateId(THINGO, "wtf\u000Bis line tabulation");
+  }
+
+  @Test
+  public void testInvalidSpacesRegexFormFeed()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("thingToValidate cannot contain whitespace character except space.");
+    TaskIdUtils.validateId(THINGO, "form\u000cfeed?");
+  }
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
index 39174d5..7339d26 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
@@ -19,7 +19,7 @@
 
 package org.apache.druid.indexing.kafka;
 
-import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
+import org.apache.druid.indexer.TaskIdUtils;
 import org.apache.druid.java.util.common.StringUtils;
 
 import java.util.HashMap;
@@ -35,7 +35,7 @@ public class KafkaConsumerConfigs
   {
     final Map<String, Object> props = new HashMap<>();
     props.put("metadata.max.age.ms", "10000");
-    props.put("group.id", StringUtils.format("kafka-supervisor-%s", RandomIdUtils.getRandomId()));
+    props.put("group.id", StringUtils.format("kafka-supervisor-%s", TaskIdUtils.getRandomId()));
     props.put("auto.offset.reset", "none");
     props.put("enable.auto.commit", "false");
     props.put("isolation.level", "read_committed");
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index f5cec3d..ba12128 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -24,10 +24,10 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
+import org.apache.druid.indexer.TaskIdUtils;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
-import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
 import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
 import org.apache.druid.indexing.kafka.KafkaIndexTask;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
@@ -221,7 +221,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
 
     List<SeekableStreamIndexTask<Integer, Long>> taskList = new ArrayList<>();
     for (int i = 0; i < replicas; i++) {
-      String taskId = Joiner.on("_").join(baseSequenceName, RandomIdUtils.getRandomId());
+      String taskId = Joiner.on("_").join(baseSequenceName, TaskIdUtils.getRandomId());
       taskList.add(new KafkaIndexTask(
           taskId,
           new TaskResource(baseSequenceName, 1),
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 13f94a5..c789fc7 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -25,10 +25,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.common.aws.AWSCredentialsConfig;
+import org.apache.druid.indexer.TaskIdUtils;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
-import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
 import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
 import org.apache.druid.indexing.kinesis.KinesisIndexTask;
 import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory;
@@ -169,7 +169,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
 
     List<SeekableStreamIndexTask<String, String>> taskList = new ArrayList<>();
     for (int i = 0; i < replicas; i++) {
-      String taskId = Joiner.on("_").join(baseSequenceName, RandomIdUtils.getRandomId());
+      String taskId = Joiner.on("_").join(baseSequenceName, TaskIdUtils.getRandomId());
       taskList.add(new KinesisIndexTask(
           taskId,
           new TaskResource(baseSequenceName, 1),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index d47f900..40745bf 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -24,11 +24,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import org.apache.druid.indexer.TaskIdUtils;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.actions.LockListAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
@@ -92,7 +92,7 @@ public abstract class AbstractTask implements Task
     }
 
     final List<Object> objects = new ArrayList<>();
-    final String suffix = RandomIdUtils.getRandomId();
+    final String suffix = TaskIdUtils.getRandomId();
     objects.add(typeName);
     objects.add(dataSource);
     objects.add(suffix);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java
deleted file mode 100644
index a782b66..0000000
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.utils;
-
-import java.util.concurrent.ThreadLocalRandom;
-
-public class RandomIdUtils
-{
-  public static String getRandomId()
-  {
-    final StringBuilder suffix = new StringBuilder(8);
-    for (int i = 0; i < Integer.BYTES * 2; ++i) {
-      suffix.append((char) ('a' + ((ThreadLocalRandom.current().nextInt() >>> (i * 4)) & 0x0F)));
-    }
-    return suffix.toString();
-  }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 948b687..682a560 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -430,13 +430,13 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
-  // Map<{group RandomIdUtils}, {actively reading task group}>; see documentation for TaskGroup class
+  // Map<{group id}, {actively reading task group}>; see documentation for TaskGroup class
   private final ConcurrentHashMap<Integer, TaskGroup> activelyReadingTaskGroups = new ConcurrentHashMap<>();
 
   // After telling a taskGroup to stop reading and begin publishing a segment, it is moved from [activelyReadingTaskGroups] to here so
   // we can monitor its status while we queue new tasks to read the next range of sequences. This is a list since we could
   // have multiple sets of tasks publishing at once if time-to-publish > taskDuration.
-  // Map<{group RandomIdUtils}, List<{pending completion task groups}>>
+  // Map<{group id}, List<{pending completion task groups}>>
   private final ConcurrentHashMap<Integer, CopyOnWriteArrayList<TaskGroup>> pendingCompletionTaskGroups = new ConcurrentHashMap<>();
 
   // We keep two separate maps for tracking the current state of partition->task group mappings [partitionGroups] and partition->offset
@@ -998,7 +998,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> new TaskData())));
     if (activelyReadingTaskGroups.putIfAbsent(taskGroupId, group) != null) {
       throw new ISE(
-          "trying to add taskGroup with RandomIdUtils [%s] to actively reading task groups, but group already exists.",
+          "trying to add taskGroup with id [%s] to actively reading task groups, but group already exists.",
           taskGroupId
       );
     }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
index b35b191..78090ca 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.TaskStatus;
 import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexer.TaskIdUtils;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.DateTimes;
@@ -336,6 +337,7 @@ public class IntermediaryDataManager
   @Nullable
   public File findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int partitionId)
   {
+    TaskIdUtils.validateId("supervisorTaskId", supervisorTaskId);
     for (StorageLocation location : shuffleDataLocations) {
       final File partitionDir = new File(location.getPath(), getPartitionDir(supervisorTaskId, interval, partitionId));
       if (partitionDir.exists()) {
@@ -364,6 +366,7 @@ public class IntermediaryDataManager
 
   public void deletePartitions(String supervisorTaskId) throws IOException
   {
+    TaskIdUtils.validateId("supervisorTaskId", supervisorTaskId);
     for (StorageLocation location : shuffleDataLocations) {
       final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId);
       if (supervisorTaskPath.exists()) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
index 1e1eab4..15aad92 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.client.indexing.NoopIndexingServiceClient;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -51,11 +52,15 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
   public ExpectedException expectedException = ExpectedException.none();
 
   private IntermediaryDataManager intermediaryDataManager;
+  private File intermediarySegmentsLocation;
+  private File siblingLocation;
 
   @Before
   public void setup() throws IOException
   {
     final WorkerConfig workerConfig = new WorkerConfig();
+    intermediarySegmentsLocation = tempDir.newFolder();
+    siblingLocation = tempDir.newFolder();
     final TaskConfig taskConfig = new TaskConfig(
         null,
         null,
@@ -65,7 +70,7 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
         false,
         null,
         null,
-        ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 600L, null))
+        ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null))
     );
     final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
     intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
@@ -157,6 +162,63 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
     intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile);
   }
 
+  @Test
+  public void testFailsWithCraftyFabricatedNamesForDelete() throws IOException
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("supervisorTaskId cannot start with the '.' character.");
+    final String supervisorTaskId = "../" + siblingLocation.getName();
+    final String someFile = "sneaky-snake.txt";
+    File dataFile = new File(siblingLocation, someFile);
+    FileUtils.write(
+        dataFile,
+        "test data",
+        StandardCharsets.UTF_8
+    );
+    Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists());
+    Assert.assertTrue(dataFile.exists());
+    intermediaryDataManager.deletePartitions(supervisorTaskId);
+    Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists());
+    Assert.assertTrue(dataFile.exists());
+  }
+
+  @Test
+  public void testFailsWithCraftyFabricatedNamesForFind() throws IOException
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("supervisorTaskId cannot start with the '.' character.");
+    final String supervisorTaskId = "../" + siblingLocation.getName();
+    final Interval interval = Intervals.of("2018/2019");
+    final int partitionId = 0;
+    final String intervalAndPart =
+        StringUtils.format("%s/%s/%s", interval.getStart().toString(), interval.getEnd().toString(), partitionId);
+
+    final String someFile = "sneaky-snake.txt";
+
+    final String someFilePath = intervalAndPart + "/" + someFile;
+
+    // can only traverse to find files that are in a path of the form {start}/{end}/{partitionId}, so write a data file
+    // in a location like that
+    File dataFile = new File(siblingLocation, someFilePath);
+    FileUtils.write(
+        dataFile,
+        "test data",
+        StandardCharsets.UTF_8
+    );
+
+    Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists());
+    Assert.assertTrue(
+        new File(intermediarySegmentsLocation, supervisorTaskId + "/" + someFilePath).exists());
+
+    final File foundFile1 = intermediaryDataManager.findPartitionFile(
+        supervisorTaskId,
+        someFile,
+        interval,
+        partitionId
+    );
+    Assert.assertNull(foundFile1);
+  }
+
   private File generateSegmentDir(String fileName) throws IOException
   {
     // Each file size is 138 bytes after compression
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
index eea0f7f..a20254d 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
@@ -28,7 +28,7 @@ import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
 import org.apache.commons.io.IOUtils;
-import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
+import org.apache.druid.indexer.TaskIdUtils;
 import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
@@ -165,7 +165,7 @@ abstract class AbstractKafkaIndexerTest extends AbstractIndexerTest
     properties.setProperty("value.serializer", ByteArraySerializer.class.getName());
     if (txnEnabled) {
       properties.setProperty("enable.idempotence", "true");
-      properties.setProperty("transactional.id", RandomIdUtils.getRandomId());
+      properties.setProperty("transactional.id", TaskIdUtils.getRandomId());
     }
 
     KafkaProducer<String, String> producer = new KafkaProducer<>(
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
index 37009da..6be9a71 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
@@ -27,12 +27,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import com.google.common.collect.Sets;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskIdUtils;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -45,7 +45,6 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -148,16 +147,7 @@ public class DataSchema
 
   private static void validateDatasourceName(String dataSource)
   {
-    Preconditions.checkArgument(
-        !Strings.isNullOrEmpty(dataSource),
-        "dataSource cannot be null or empty. Please provide a dataSource."
-    );
-    Matcher m = INVALIDCHARS.matcher(dataSource);
-    Preconditions.checkArgument(
-        !m.matches(),
-        "dataSource cannot contain whitespace character except space."
-    );
-    Preconditions.checkArgument(!dataSource.contains("/"), "dataSource cannot contain the '/' character.");
+    TaskIdUtils.validateId("dataSource", dataSource);
   }
 
   private static DimensionsSpec computeDimensionsSpec(
diff --git a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
index ff081c5..8ce50d3 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
@@ -26,12 +26,12 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.commons.text.StringEscapeUtils;
-import org.apache.druid.common.config.NullHandlingTest;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.JSONParseSpec;
 import org.apache.druid.data.input.impl.StringInputRowParser;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskIdUtilsTest;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
@@ -46,6 +46,7 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.segment.transform.ExpressionTransform;
 import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -60,11 +61,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-public class DataSchemaTest extends NullHandlingTest
+public class DataSchemaTest extends InitializedNullHandlingTest
 {
-
-  private static final String VALID_DATASOURCE_CHARS_NAME = "alpha123..*~!@#&%^&*()-+ Россия\\ 한국 中国!";
-
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
@@ -86,7 +84,7 @@ public class DataSchemaTest extends NullHandlingTest
     );
 
     DataSchema schema = new DataSchema(
-        VALID_DATASOURCE_CHARS_NAME,
+        TaskIdUtilsTest.VALID_ID_CHARS,
         parser,
         new AggregatorFactory[]{
             new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -123,7 +121,7 @@ public class DataSchemaTest extends NullHandlingTest
     );
 
     DataSchema schema = new DataSchema(
-        VALID_DATASOURCE_CHARS_NAME,
+        TaskIdUtilsTest.VALID_ID_CHARS,
         parser,
         new AggregatorFactory[]{
             new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -160,7 +158,7 @@ public class DataSchemaTest extends NullHandlingTest
     );
 
     DataSchema schema = new DataSchema(
-        VALID_DATASOURCE_CHARS_NAME,
+        TaskIdUtilsTest.VALID_ID_CHARS,
         parserMap,
         new AggregatorFactory[]{
             new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -218,7 +216,7 @@ public class DataSchemaTest extends NullHandlingTest
     );
 
     DataSchema schema = new DataSchema(
-        VALID_DATASOURCE_CHARS_NAME,
+        TaskIdUtilsTest.VALID_ID_CHARS,
         parser,
         new AggregatorFactory[]{
             new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -251,7 +249,7 @@ public class DataSchemaTest extends NullHandlingTest
     );
 
     DataSchema schema = new DataSchema(
-        VALID_DATASOURCE_CHARS_NAME,
+        TaskIdUtilsTest.VALID_ID_CHARS,
         parser,
         new AggregatorFactory[]{
             new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -269,7 +267,7 @@ public class DataSchemaTest extends NullHandlingTest
   public void testSerdeWithInvalidParserMap() throws Exception
   {
     String jsonStr = "{"
-                     + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(VALID_DATASOURCE_CHARS_NAME) + "\","
+                     + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(TaskIdUtilsTest.VALID_ID_CHARS) + "\","
                      + "\"parser\":{\"type\":\"invalid\"},"
                      + "\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}],"
                      + "\"granularitySpec\":{"
@@ -372,7 +370,7 @@ public class DataSchemaTest extends NullHandlingTest
   public void testSerde() throws Exception
   {
     String jsonStr = "{"
-                     + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(VALID_DATASOURCE_CHARS_NAME) + "\","
+                     + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(TaskIdUtilsTest.VALID_ID_CHARS) + "\","
                      + "\"parser\":{"
                      + "\"type\":\"string\","
                      + "\"parseSpec\":{"
@@ -396,7 +394,7 @@ public class DataSchemaTest extends NullHandlingTest
         DataSchema.class
     );
 
-    Assert.assertEquals(actual.getDataSource(), VALID_DATASOURCE_CHARS_NAME);
+    Assert.assertEquals(actual.getDataSource(), TaskIdUtilsTest.VALID_ID_CHARS);
     Assert.assertEquals(
         actual.getParser().getParseSpec(),
         new JSONParseSpec(
@@ -476,7 +474,7 @@ public class DataSchemaTest extends NullHandlingTest
     );
 
     DataSchema originalSchema = new DataSchema(
-        VALID_DATASOURCE_CHARS_NAME,
+        TaskIdUtilsTest.VALID_ID_CHARS,
         parser,
         new AggregatorFactory[]{
             new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -515,7 +513,7 @@ public class DataSchemaTest extends NullHandlingTest
     );
 
     TestModifiedDataSchema originalSchema = new TestModifiedDataSchema(
-        VALID_DATASOURCE_CHARS_NAME,
+        TaskIdUtilsTest.VALID_ID_CHARS,
         null,
         null,
         new AggregatorFactory[]{


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org