You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/04/07 16:47:34 UTC
[druid] branch master updated: check paths used for shuffle
intermediary data manager get and delete (#9630)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d267b1c check paths used for shuffle intermediary data manager get and delete (#9630)
d267b1c is described below
commit d267b1c414b9b55b129729692755273d2a35e304
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Tue Apr 7 09:47:18 2020 -0700
check paths used for shuffle intermediary data manager get and delete (#9630)
* check paths used for shuffle intermediary data manager get and delete
* add test
* newline
* meh
---
.../java/org/apache/druid/indexer/TaskIdUtils.java | 63 ++++++++++++
.../org/apache/druid/indexer/TaskIdUtilsTest.java | 111 +++++++++++++++++++++
.../druid/indexing/kafka/KafkaConsumerConfigs.java | 4 +-
.../indexing/kafka/supervisor/KafkaSupervisor.java | 4 +-
.../kinesis/supervisor/KinesisSupervisor.java | 4 +-
.../druid/indexing/common/task/AbstractTask.java | 4 +-
.../indexing/common/task/utils/RandomIdUtils.java | 34 -------
.../supervisor/SeekableStreamSupervisor.java | 6 +-
.../indexing/worker/IntermediaryDataManager.java | 3 +
...ermediaryDataManagerManualAddAndDeleteTest.java | 64 +++++++++++-
.../tests/indexer/AbstractKafkaIndexerTest.java | 4 +-
.../apache/druid/segment/indexing/DataSchema.java | 14 +--
.../druid/segment/indexing/DataSchemaTest.java | 28 +++---
13 files changed, 268 insertions(+), 75 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java
new file mode 100644
index 0000000..a88341b
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class TaskIdUtils
+{
+ private static final Pattern INVALIDCHARS = Pattern.compile("(?s).*[^\\S ].*");
+
+ public static void validateId(String thingToValidate, String stringToValidate)
+ {
+ Preconditions.checkArgument(
+ !Strings.isNullOrEmpty(stringToValidate),
+ StringUtils.format("%s cannot be null or empty. Please provide a %s.", thingToValidate, thingToValidate)
+ );
+ Preconditions.checkArgument(
+ !stringToValidate.startsWith("."),
+ StringUtils.format("%s cannot start with the '.' character.", thingToValidate)
+ );
+ Preconditions.checkArgument(
+ !stringToValidate.contains("/"),
+ StringUtils.format("%s cannot contain the '/' character.", thingToValidate)
+ );
+ Matcher m = INVALIDCHARS.matcher(stringToValidate);
+ Preconditions.checkArgument(
+ !m.matches(),
+ StringUtils.format("%s cannot contain whitespace character except space.", thingToValidate)
+ );
+ }
+
+ public static String getRandomId()
+ {
+ final StringBuilder suffix = new StringBuilder(8);
+ for (int i = 0; i < Integer.BYTES * 2; ++i) {
+ suffix.append((char) ('a' + ((ThreadLocalRandom.current().nextInt() >>> (i * 4)) & 0x0F)));
+ }
+ return suffix.toString();
+ }
+}
diff --git a/core/src/test/java/org/apache/druid/indexer/TaskIdUtilsTest.java b/core/src/test/java/org/apache/druid/indexer/TaskIdUtilsTest.java
new file mode 100644
index 0000000..5fed8fb
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/indexer/TaskIdUtilsTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TaskIdUtilsTest
+{
+ private static final String THINGO = "thingToValidate";
+ public static final String VALID_ID_CHARS = "alpha123..*~!@#&%^&*()-+ Россия\\ 한국 中国!";
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testValidIdName()
+ {
+ TaskIdUtils.validateId(THINGO, VALID_ID_CHARS);
+ }
+
+ @Test
+ public void testInvalidNull()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("thingToValidate cannot be null or empty. Please provide a thingToValidate.");
+ TaskIdUtils.validateId(THINGO, null);
+ }
+
+ @Test
+ public void testInvalidEmpty()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("thingToValidate cannot be null or empty. Please provide a thingToValidate.");
+ TaskIdUtils.validateId(THINGO, "");
+ }
+
+ @Test
+ public void testInvalidSlashes()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("thingToValidate cannot contain the '/' character.");
+ TaskIdUtils.validateId(THINGO, "/paths/are/bad/since/we/make/files/from/stuff");
+ }
+
+ @Test
+ public void testInvalidLeadingDot()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("thingToValidate cannot start with the '.' character.");
+ TaskIdUtils.validateId(THINGO, "./nice/try");
+ }
+
+ @Test
+ public void testInvalidSpacesRegexTabs()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("thingToValidate cannot contain whitespace character except space.");
+ TaskIdUtils.validateId(THINGO, "spaces\tare\tbetter\tthan\ttabs\twhich\tare\tillegal");
+ }
+
+ @Test
+ public void testInvalidSpacesRegexNewline()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("thingToValidate cannot contain whitespace character except space.");
+ TaskIdUtils.validateId(THINGO, "new\nline");
+ }
+
+ @Test
+ public void testInvalidSpacesRegexCarriageReturn()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("thingToValidate cannot contain whitespace character except space.");
+ TaskIdUtils.validateId(THINGO, "does\rexist\rby\ritself");
+ }
+
+ @Test
+ public void testInvalidSpacesRegexLineTabulation()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("thingToValidate cannot contain whitespace character except space.");
+ TaskIdUtils.validateId(THINGO, "wtf\u000Bis line tabulation");
+ }
+
+ @Test
+ public void testInvalidSpacesRegexFormFeed()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("thingToValidate cannot contain whitespace character except space.");
+ TaskIdUtils.validateId(THINGO, "form\u000cfeed?");
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
index 39174d5..7339d26 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java
@@ -19,7 +19,7 @@
package org.apache.druid.indexing.kafka;
-import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
+import org.apache.druid.indexer.TaskIdUtils;
import org.apache.druid.java.util.common.StringUtils;
import java.util.HashMap;
@@ -35,7 +35,7 @@ public class KafkaConsumerConfigs
{
final Map<String, Object> props = new HashMap<>();
props.put("metadata.max.age.ms", "10000");
- props.put("group.id", StringUtils.format("kafka-supervisor-%s", RandomIdUtils.getRandomId()));
+ props.put("group.id", StringUtils.format("kafka-supervisor-%s", TaskIdUtils.getRandomId()));
props.put("auto.offset.reset", "none");
props.put("enable.auto.commit", "false");
props.put("isolation.level", "read_committed");
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index f5cec3d..ba12128 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -24,10 +24,10 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
+import org.apache.druid.indexer.TaskIdUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
-import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
import org.apache.druid.indexing.kafka.KafkaIndexTask;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
@@ -221,7 +221,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
List<SeekableStreamIndexTask<Integer, Long>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
- String taskId = Joiner.on("_").join(baseSequenceName, RandomIdUtils.getRandomId());
+ String taskId = Joiner.on("_").join(baseSequenceName, TaskIdUtils.getRandomId());
taskList.add(new KafkaIndexTask(
taskId,
new TaskResource(baseSequenceName, 1),
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 13f94a5..c789fc7 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -25,10 +25,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.aws.AWSCredentialsConfig;
+import org.apache.druid.indexer.TaskIdUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
-import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
import org.apache.druid.indexing.kinesis.KinesisIndexTask;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory;
@@ -169,7 +169,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
List<SeekableStreamIndexTask<String, String>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
- String taskId = Joiner.on("_").join(baseSequenceName, RandomIdUtils.getRandomId());
+ String taskId = Joiner.on("_").join(baseSequenceName, TaskIdUtils.getRandomId());
taskList.add(new KinesisIndexTask(
taskId,
new TaskResource(baseSequenceName, 1),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index d47f900..40745bf 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -24,11 +24,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
+import org.apache.druid.indexer.TaskIdUtils;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
@@ -92,7 +92,7 @@ public abstract class AbstractTask implements Task
}
final List<Object> objects = new ArrayList<>();
- final String suffix = RandomIdUtils.getRandomId();
+ final String suffix = TaskIdUtils.getRandomId();
objects.add(typeName);
objects.add(dataSource);
objects.add(suffix);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java
deleted file mode 100644
index a782b66..0000000
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.utils;
-
-import java.util.concurrent.ThreadLocalRandom;
-
-public class RandomIdUtils
-{
- public static String getRandomId()
- {
- final StringBuilder suffix = new StringBuilder(8);
- for (int i = 0; i < Integer.BYTES * 2; ++i) {
- suffix.append((char) ('a' + ((ThreadLocalRandom.current().nextInt() >>> (i * 4)) & 0x0F)));
- }
- return suffix.toString();
- }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 948b687..682a560 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -430,13 +430,13 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
- // Map<{group RandomIdUtils}, {actively reading task group}>; see documentation for TaskGroup class
+ // Map<{group id}, {actively reading task group}>; see documentation for TaskGroup class
private final ConcurrentHashMap<Integer, TaskGroup> activelyReadingTaskGroups = new ConcurrentHashMap<>();
// After telling a taskGroup to stop reading and begin publishing a segment, it is moved from [activelyReadingTaskGroups] to here so
// we can monitor its status while we queue new tasks to read the next range of sequences. This is a list since we could
// have multiple sets of tasks publishing at once if time-to-publish > taskDuration.
- // Map<{group RandomIdUtils}, List<{pending completion task groups}>>
+ // Map<{group id}, List<{pending completion task groups}>>
private final ConcurrentHashMap<Integer, CopyOnWriteArrayList<TaskGroup>> pendingCompletionTaskGroups = new ConcurrentHashMap<>();
// We keep two separate maps for tracking the current state of partition->task group mappings [partitionGroups] and partition->offset
@@ -998,7 +998,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> new TaskData())));
if (activelyReadingTaskGroups.putIfAbsent(taskGroupId, group) != null) {
throw new ISE(
- "trying to add taskGroup with RandomIdUtils [%s] to actively reading task groups, but group already exists.",
+ "trying to add taskGroup with id [%s] to actively reading task groups, but group already exists.",
taskGroupId
);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
index b35b191..78090ca 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.TaskStatus;
import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexer.TaskIdUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
@@ -336,6 +337,7 @@ public class IntermediaryDataManager
@Nullable
public File findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int partitionId)
{
+ TaskIdUtils.validateId("supervisorTaskId", supervisorTaskId);
for (StorageLocation location : shuffleDataLocations) {
final File partitionDir = new File(location.getPath(), getPartitionDir(supervisorTaskId, interval, partitionId));
if (partitionDir.exists()) {
@@ -364,6 +366,7 @@ public class IntermediaryDataManager
public void deletePartitions(String supervisorTaskId) throws IOException
{
+ TaskIdUtils.validateId("supervisorTaskId", supervisorTaskId);
for (StorageLocation location : shuffleDataLocations) {
final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId);
if (supervisorTaskPath.exists()) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
index 1e1eab4..15aad92 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -51,11 +52,15 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
public ExpectedException expectedException = ExpectedException.none();
private IntermediaryDataManager intermediaryDataManager;
+ private File intermediarySegmentsLocation;
+ private File siblingLocation;
@Before
public void setup() throws IOException
{
final WorkerConfig workerConfig = new WorkerConfig();
+ intermediarySegmentsLocation = tempDir.newFolder();
+ siblingLocation = tempDir.newFolder();
final TaskConfig taskConfig = new TaskConfig(
null,
null,
@@ -65,7 +70,7 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
false,
null,
null,
- ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 600L, null))
+ ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null))
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
@@ -157,6 +162,63 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile);
}
+ @Test
+ public void testFailsWithCraftyFabricatedNamesForDelete() throws IOException
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("supervisorTaskId cannot start with the '.' character.");
+ final String supervisorTaskId = "../" + siblingLocation.getName();
+ final String someFile = "sneaky-snake.txt";
+ File dataFile = new File(siblingLocation, someFile);
+ FileUtils.write(
+ dataFile,
+ "test data",
+ StandardCharsets.UTF_8
+ );
+ Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists());
+ Assert.assertTrue(dataFile.exists());
+ intermediaryDataManager.deletePartitions(supervisorTaskId);
+ Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists());
+ Assert.assertTrue(dataFile.exists());
+ }
+
+ @Test
+ public void testFailsWithCraftyFabricatedNamesForFind() throws IOException
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("supervisorTaskId cannot start with the '.' character.");
+ final String supervisorTaskId = "../" + siblingLocation.getName();
+ final Interval interval = Intervals.of("2018/2019");
+ final int partitionId = 0;
+ final String intervalAndPart =
+ StringUtils.format("%s/%s/%s", interval.getStart().toString(), interval.getEnd().toString(), partitionId);
+
+ final String someFile = "sneaky-snake.txt";
+
+ final String someFilePath = intervalAndPart + "/" + someFile;
+
+ // can only traverse to find files that are in a path of the form {start}/{end}/{partitionId}, so write a data file
+ // in a location like that
+ File dataFile = new File(siblingLocation, someFilePath);
+ FileUtils.write(
+ dataFile,
+ "test data",
+ StandardCharsets.UTF_8
+ );
+
+ Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists());
+ Assert.assertTrue(
+ new File(intermediarySegmentsLocation, supervisorTaskId + "/" + someFilePath).exists());
+
+ final File foundFile1 = intermediaryDataManager.findPartitionFile(
+ supervisorTaskId,
+ someFile,
+ interval,
+ partitionId
+ );
+ Assert.assertNull(foundFile1);
+ }
+
private File generateSegmentDir(String fileName) throws IOException
{
// Each file size is 138 bytes after compression
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
index eea0f7f..a20254d 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
@@ -28,7 +28,7 @@ import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.io.IOUtils;
-import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
+import org.apache.druid.indexer.TaskIdUtils;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
@@ -165,7 +165,7 @@ abstract class AbstractKafkaIndexerTest extends AbstractIndexerTest
properties.setProperty("value.serializer", ByteArraySerializer.class.getName());
if (txnEnabled) {
properties.setProperty("enable.idempotence", "true");
- properties.setProperty("transactional.id", RandomIdUtils.getRandomId());
+ properties.setProperty("transactional.id", TaskIdUtils.getRandomId());
}
KafkaProducer<String, String> producer = new KafkaProducer<>(
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
index 37009da..6be9a71 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
@@ -27,12 +27,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskIdUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -45,7 +45,6 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -148,16 +147,7 @@ public class DataSchema
private static void validateDatasourceName(String dataSource)
{
- Preconditions.checkArgument(
- !Strings.isNullOrEmpty(dataSource),
- "dataSource cannot be null or empty. Please provide a dataSource."
- );
- Matcher m = INVALIDCHARS.matcher(dataSource);
- Preconditions.checkArgument(
- !m.matches(),
- "dataSource cannot contain whitespace character except space."
- );
- Preconditions.checkArgument(!dataSource.contains("/"), "dataSource cannot contain the '/' character.");
+ TaskIdUtils.validateId("dataSource", dataSource);
}
private static DimensionsSpec computeDimensionsSpec(
diff --git a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
index ff081c5..8ce50d3 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
@@ -26,12 +26,12 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.text.StringEscapeUtils;
-import org.apache.druid.common.config.NullHandlingTest;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskIdUtilsTest;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
@@ -46,6 +46,7 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
@@ -60,11 +61,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-public class DataSchemaTest extends NullHandlingTest
+public class DataSchemaTest extends InitializedNullHandlingTest
{
-
- private static final String VALID_DATASOURCE_CHARS_NAME = "alpha123..*~!@#&%^&*()-+ Россия\\ 한국 中国!";
-
@Rule
public ExpectedException expectedException = ExpectedException.none();
@@ -86,7 +84,7 @@ public class DataSchemaTest extends NullHandlingTest
);
DataSchema schema = new DataSchema(
- VALID_DATASOURCE_CHARS_NAME,
+ TaskIdUtilsTest.VALID_ID_CHARS,
parser,
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -123,7 +121,7 @@ public class DataSchemaTest extends NullHandlingTest
);
DataSchema schema = new DataSchema(
- VALID_DATASOURCE_CHARS_NAME,
+ TaskIdUtilsTest.VALID_ID_CHARS,
parser,
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -160,7 +158,7 @@ public class DataSchemaTest extends NullHandlingTest
);
DataSchema schema = new DataSchema(
- VALID_DATASOURCE_CHARS_NAME,
+ TaskIdUtilsTest.VALID_ID_CHARS,
parserMap,
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -218,7 +216,7 @@ public class DataSchemaTest extends NullHandlingTest
);
DataSchema schema = new DataSchema(
- VALID_DATASOURCE_CHARS_NAME,
+ TaskIdUtilsTest.VALID_ID_CHARS,
parser,
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -251,7 +249,7 @@ public class DataSchemaTest extends NullHandlingTest
);
DataSchema schema = new DataSchema(
- VALID_DATASOURCE_CHARS_NAME,
+ TaskIdUtilsTest.VALID_ID_CHARS,
parser,
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -269,7 +267,7 @@ public class DataSchemaTest extends NullHandlingTest
public void testSerdeWithInvalidParserMap() throws Exception
{
String jsonStr = "{"
- + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(VALID_DATASOURCE_CHARS_NAME) + "\","
+ + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(TaskIdUtilsTest.VALID_ID_CHARS) + "\","
+ "\"parser\":{\"type\":\"invalid\"},"
+ "\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}],"
+ "\"granularitySpec\":{"
@@ -372,7 +370,7 @@ public class DataSchemaTest extends NullHandlingTest
public void testSerde() throws Exception
{
String jsonStr = "{"
- + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(VALID_DATASOURCE_CHARS_NAME) + "\","
+ + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(TaskIdUtilsTest.VALID_ID_CHARS) + "\","
+ "\"parser\":{"
+ "\"type\":\"string\","
+ "\"parseSpec\":{"
@@ -396,7 +394,7 @@ public class DataSchemaTest extends NullHandlingTest
DataSchema.class
);
- Assert.assertEquals(actual.getDataSource(), VALID_DATASOURCE_CHARS_NAME);
+ Assert.assertEquals(actual.getDataSource(), TaskIdUtilsTest.VALID_ID_CHARS);
Assert.assertEquals(
actual.getParser().getParseSpec(),
new JSONParseSpec(
@@ -476,7 +474,7 @@ public class DataSchemaTest extends NullHandlingTest
);
DataSchema originalSchema = new DataSchema(
- VALID_DATASOURCE_CHARS_NAME,
+ TaskIdUtilsTest.VALID_ID_CHARS,
parser,
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1"),
@@ -515,7 +513,7 @@ public class DataSchemaTest extends NullHandlingTest
);
TestModifiedDataSchema originalSchema = new TestModifiedDataSchema(
- VALID_DATASOURCE_CHARS_NAME,
+ TaskIdUtilsTest.VALID_ID_CHARS,
null,
null,
new AggregatorFactory[]{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org