You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/06/19 01:40:55 UTC
[druid] branch master updated: Create packed core partitions for
hash/range-partitioned segments in native batch ingestion (#10025)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d644a27 Create packed core partitions for hash/range-partitioned segments in native batch ingestion (#10025)
d644a27 is described below
commit d644a27f1a545105a4b1a4110f3ed83d7c46a46f
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Thu Jun 18 18:40:43 2020 -0700
Create packed core partitions for hash/range-partitioned segments in native batch ingestion (#10025)
* Fill in the core partition set size properly for batch ingestion with
dynamic partitioning
* incomplete javadoc
* Address comments
* fix tests
* fix json serde, add tests
* checkstyle
* Set core partition set size for hash-partitioned segments properly in
batch ingestion
* test for both parallel and single-threaded task
* unused variables
* fix test
* unused imports
* add hash/range buckets
* some test adjustment and missing json serde
* centralized partition id allocation in parallel and simple tasks
* remove string partition chunk
* revive string partition chunk
* fill numCorePartitions for hadoop
* clean up hash stuffs
* resolved todos
* javadocs
* Fix tests
* add more tests
* doc
* unused imports
---
.../druid/segment/loading/DataSegmentPusher.java | 9 ++
.../partition/BucketNumberedShardSpec.java | 106 +++++++++++++
.../BuildingHashBasedNumberedShardSpec.java | 144 +++++++++++++++++
.../partition/BuildingNumberedShardSpec.java | 74 ++-------
.../timeline/partition/BuildingShardSpec.java | 103 ++++++++++++
....java => BuildingSingleDimensionShardSpec.java} | 94 +++++------
.../HashBasedNumberedPartialShardSpec.java | 27 +++-
.../partition/HashBasedNumberedShardSpec.java | 97 ++++++++++--
.../timeline/partition/HashBucketShardSpec.java | 129 +++++++++++++++
.../druid/timeline/partition/LinearShardSpec.java | 8 +-
.../druid/timeline/partition/NoneShardSpec.java | 8 +-
.../partition/NumberedOverwriteShardSpec.java | 2 +-
.../partition/NumberedPartialShardSpec.java | 2 +-
.../timeline/partition/NumberedShardSpec.java | 24 +--
.../timeline/partition/OverwriteShardSpec.java | 11 ++
.../timeline/partition/RangeBucketShardSpec.java | 137 ++++++++++++++++
.../apache/druid/timeline/partition/ShardSpec.java | 15 +-
.../partition/SingleDimensionPartialShardSpec.java | 25 +--
.../partition/SingleDimensionShardSpec.java | 94 ++++++++---
.../org/apache/druid/timeline/DataSegmentTest.java | 8 +-
.../BuildingHashBasedNumberedShardSpecTest.java | 85 ++++++++++
.../partition/BuildingNumberedShardSpecTest.java | 35 +----
...a => BuildingSingleDimensionShardSpecTest.java} | 43 +++--
.../HashBasedNumberedPartialShardSpecTest.java | 5 +-
...dSpecTest.java => HashBucketShardSpecTest.java} | 60 +++++--
.../partition/NumberedOverwriteShardSpecTest.java | 2 +-
.../partition/PartitionHolderCompletenessTest.java | 103 ++++++++++++
...SpecTest.java => RangeBucketShardSpecTest.java} | 56 +++++--
.../timeline/partition/ShardSpecTestUtils.java | 49 ++++++
docs/ingestion/native-batch.md | 2 +-
.../MaterializedViewSupervisorTest.java | 16 +-
.../indexer/DetermineHashedPartitionsJob.java | 2 +
.../druid/indexer/DeterminePartitionsJob.java | 15 +-
.../HadoopDruidDetermineConfigurationJob.java | 2 +
.../druid/indexer/BatchDeltaIngestionTest.java | 2 +-
.../indexer/HadoopDruidIndexerConfigTest.java | 2 +-
.../druid/indexer/IndexGeneratorJobTest.java | 9 +-
.../common/task/CachingLocalSegmentAllocator.java | 138 ++++++++--------
.../druid/indexing/common/task/IndexTask.java | 15 +-
.../common/task/LocalSegmentAllocator.java | 12 +-
...NonLinearlyPartitionedSequenceNameFunction.java | 7 +-
.../task/OverlordCoordinatingSegmentAllocator.java | 12 +-
...llocator.java => SegmentAllocatorForBatch.java} | 12 +-
.../indexing/common/task/SegmentAllocators.java | 16 +-
.../druid/indexing/common/task/ShardSpecs.java | 14 +-
...SupervisorTaskCoordinatingSegmentAllocator.java | 12 +-
.../apache/druid/indexing/common/task/Task.java | 2 -
.../parallel/GeneratedHashPartitionsReport.java | 44 ------
.../batch/parallel/GenericPartitionLocation.java | 12 +-
.../task/batch/parallel/GenericPartitionStat.java | 13 +-
.../task/batch/parallel/HashPartitionLocation.java | 51 ------
.../task/batch/parallel/HashPartitionStat.java | 90 -----------
.../parallel/ParallelIndexSupervisorTask.java | 104 +++++--------
.../batch/parallel/ParallelIndexTaskRunner.java | 5 +-
.../batch/parallel/ParallelIndexTuningConfig.java | 4 +-
.../parallel/PartialGenericSegmentMergeTask.java | 36 +++--
...HashSegmentGenerateParallelIndexTaskRunner.java | 4 +-
.../parallel/PartialHashSegmentGenerateTask.java | 25 +--
.../parallel/PartialHashSegmentMergeIOConfig.java | 40 -----
.../PartialHashSegmentMergeIngestionSpec.java | 37 -----
...ialHashSegmentMergeParallelIndexTaskRunner.java | 115 --------------
.../parallel/PartialHashSegmentMergeTask.java | 112 -------------
...angeSegmentGenerateParallelIndexTaskRunner.java | 2 -
.../parallel/PartialRangeSegmentGenerateTask.java | 9 +-
.../batch/parallel/PartialSegmentGenerateTask.java | 12 +-
.../batch/parallel/PartialSegmentMergeTask.java | 32 ++--
.../task/batch/parallel/PartitionLocation.java | 8 +-
.../common/task/batch/parallel/PartitionStat.java | 2 +-
.../task/batch/parallel/SinglePhaseSubTask.java | 1 +
.../common/task/batch/parallel/SubTaskReport.java | 1 -
.../batch/partition/CompletePartitionAnalysis.java | 9 +-
.../batch/partition/HashPartitionAnalysis.java | 49 ++----
.../batch/partition/RangePartitionAnalysis.java | 85 +++-------
.../indexing/worker/IntermediaryDataManager.java | 14 +-
.../indexing/worker/http/ShuffleResource.java | 8 +-
.../common/actions/SegmentAllocateActionTest.java | 17 +-
.../druid/indexing/common/task/IndexTaskTest.java | 4 +-
...ePartitionCachingLocalSegmentAllocatorTest.java | 45 +++---
.../druid/indexing/common/task/ShardSpecsTest.java | 12 +-
.../AbstractMultiPhaseParallelIndexingTest.java | 47 +++---
.../AbstractParallelIndexSupervisorTaskTest.java | 3 +-
.../GeneratedHashPartitionsReportTest.java | 59 -------
.../parallel/GenericPartitionLocationTest.java | 2 +-
.../batch/parallel/GenericPartitionStatTest.java | 12 +-
...ashPartitionAdjustingCorePartitionSizeTest.java | 165 ++++++++++++++++++++
...hPartitionCachingLocalSegmentAllocatorTest.java | 21 +--
...ashPartitionMultiPhaseParallelIndexingTest.java | 21 ++-
.../task/batch/parallel/HashPartitionStatTest.java | 59 -------
.../task/batch/parallel/HttpShuffleClientTest.java | 2 +-
.../ParallelIndexSupervisorTaskResourceTest.java | 1 +
.../parallel/ParallelIndexSupervisorTaskTest.java | 24 +--
.../parallel/ParallelIndexTestingFactory.java | 5 +-
.../PartialHashSegmentMergeIOConfigTest.java | 54 -------
.../PartialHashSegmentMergeIngestionSpecTest.java | 68 --------
.../parallel/PartialHashSegmentMergeTaskTest.java | 88 -----------
...ngePartitionAdjustingCorePartitionSizeTest.java | 167 ++++++++++++++++++++
.../parallel/SinglePhaseParallelIndexingTest.java | 6 +-
.../druid/indexing/overlord/TaskLockboxTest.java | 4 +-
.../tests/indexer/AbstractITBatchIndexTest.java | 2 -
.../appenderator/SegmentPublisherHelper.java | 17 +-
.../druid/client/CachingClusteredClientTest.java | 48 +++---
.../IndexerSQLMetadataStorageCoordinatorTest.java | 13 +-
.../appenderator/SegmentIdWithShardSpecTest.java | 2 +-
.../appenderator/SegmentPublisherHelperTest.java | 173 +++++++++++++++++++++
.../druid/server/shard/NumberedShardSpecTest.java | 11 +-
.../server/shard/SingleDimensionShardSpecTest.java | 2 +-
.../partition/HashBasedNumberedShardSpecTest.java | 69 ++++----
107 files changed, 2361 insertions(+), 1631 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java b/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java
index 7e9eb92..98c7550 100644
--- a/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java
+++ b/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java
@@ -20,9 +20,11 @@
package org.apache.druid.segment.loading;
import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import java.io.File;
import java.io.IOException;
@@ -105,6 +107,13 @@ public interface DataSegmentPusher
// on segment deletion if segment being deleted was the only segment
static String getDefaultStorageDir(DataSegment segment, boolean useUniquePath)
{
+ // Sanity check for shardSpec type.
+ // BucketNumberedShardSpec should never be used in segment push.
+ Preconditions.checkArgument(
+ !(segment.getShardSpec() instanceof BucketNumberedShardSpec),
+ "Illegal shardSpec type[%s]",
+ segment.getShardSpec()
+ );
return JOINER.join(
segment.getDataSource(),
StringUtils.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()),
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/BucketNumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/BucketNumberedShardSpec.java
new file mode 100644
index 0000000..d692dad
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/timeline/partition/BucketNumberedShardSpec.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.RangeSet;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is one of the special shardSpecs which are temporarily used during batch ingestion. In Druid, there is a
+ * concept of core partition set which is a set of segments atomically becoming queryable together in Brokers. The core
+ * partition set is represented as a range of partitionIds, i.e., [0, {@link ShardSpec#getNumCorePartitions()}).
+ *
+ * When you run a batch ingestion task with a non-linear partitioning scheme, the task populates all possible buckets
+ * upfront at the beginning (see {@code CachingLocalSegmentAllocator}) and uses them to partition input rows. However,
+ * some of the buckets can be empty even after the task consumes all inputs if the data is highly skewed. Since Druid
+ * doesn't create empty segments, the partitionId should be dynamically allocated when a bucket is actually in use,
+ * so that we can always create the packed core partition set without missing partitionIds.
+ *
+ * This BucketNumberedShardSpec is used for such use case. The task with a non-linear partitioning scheme uses it
+ * to postpone the partitionId allocation until all empty buckets are identified. See
+ * {@code ParallelIndexSupervisorTask.groupGenericPartitionLocationsPerPartition} and
+ * {@code CachingLocalSegmentAllocator} for parallel and sequential ingestion, respectively.
+ *
+ * Note that {@link org.apache.druid.timeline.SegmentId} requires the partitionId. Since the segmentId is used
+ * everwhere during ingestion, this class should implement {@link #getPartitionNum()} which returns the bucketId.
+ * This should be fine because the segmentId is only used to identify each segment until pushing them to deep storage.
+ * The bucketId should be enough to uniquely identify each segment. However, when pushing segments to deep storage,
+ * the partitionId is used to create the path to store the segment on deep storage
+ * ({@link org.apache.druid.segment.loading.DataSegmentPusher#getDefaultStorageDir} which should be correct.
+ * As a result, this shardSpec should not be used in pushing segments.
+ *
+ * This class should be Jackson-serializable as the subtasks can send it to the parallel task in parallel ingestion.
+ *
+ * This interface doesn't really have to extend {@link ShardSpec}. The only reason is the ShardSpec is used in many
+ * places such as {@link org.apache.druid.timeline.DataSegment}, and we have to modify those places to allow other
+ * types than ShardSpec which seems pretty invasive. Maybe we could clean up this mess someday in the future.
+ *
+ * @see BuildingShardSpec
+ */
+public interface BucketNumberedShardSpec<T extends BuildingShardSpec> extends ShardSpec
+{
+ int getBucketId();
+
+ T convert(int partitionId);
+
+ @Override
+ default <O> PartitionChunk<O> createChunk(O obj)
+ {
+ // The partitionId (or partitionNum, chunkNumber) is not determined yet. Use bucketId for now.
+ return new NumberedPartitionChunk<>(getBucketId(), 0, obj);
+ }
+
+ @Override
+ default int getPartitionNum()
+ {
+ // See the class-level Javadoc for returning bucketId here.
+ return getBucketId();
+ }
+
+ @Override
+ default int getNumCorePartitions()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ // The below methods are used on the query side, and so must not be called for this shardSpec.
+
+ @JsonIgnore
+ @Override
+ default List<String> getDomainDimensions()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default boolean possibleInDomain(Map<String, RangeSet<String>> domain)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default boolean isCompatible(Class<? extends ShardSpec> other)
+ {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpec.java
new file mode 100644
index 0000000..fb896fc
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpec.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * See {@link BuildingShardSpec} for how this class is used.
+ *
+ * @see HashBasedNumberedShardSpec
+ */
+public class BuildingHashBasedNumberedShardSpec implements BuildingShardSpec<HashBasedNumberedShardSpec>
+{
+ public static final String TYPE = "building_hashed";
+
+ private final int partitionId;
+ private final int bucketId;
+ private final int numBuckets;
+ private final List<String> partitionDimensions;
+ private final ObjectMapper jsonMapper;
+
+ @JsonCreator
+ public BuildingHashBasedNumberedShardSpec(
+ @JsonProperty("partitionId") int partitionId,
+ @JsonProperty("bucketId") int bucketId,
+ @JsonProperty("numBuckets") int numBuckets,
+ @JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions,
+ @JacksonInject ObjectMapper jsonMapper
+ )
+ {
+ this.partitionId = partitionId;
+ this.bucketId = bucketId;
+ this.numBuckets = numBuckets;
+ this.partitionDimensions = partitionDimensions == null
+ ? HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS
+ : partitionDimensions;
+ this.jsonMapper = jsonMapper;
+ }
+
+ @JsonProperty("partitionId")
+ @Override
+ public int getPartitionNum()
+ {
+ return partitionId;
+ }
+
+ @Override
+ @JsonProperty
+ public int getBucketId()
+ {
+ return bucketId;
+ }
+
+ @JsonProperty
+ public int getNumBuckets()
+ {
+ return numBuckets;
+ }
+
+ @JsonProperty
+ public List<String> getPartitionDimensions()
+ {
+ return partitionDimensions;
+ }
+
+ @Override
+ public <T> PartitionChunk<T> createChunk(T obj)
+ {
+ // This method can be called in AppenderatorImpl to create a sinkTimeline.
+ // The sinkTimeline doesn't seem in use in batch ingestion, let's set 'chunks' to 0 for now.
+ // HashBasedNumberedShardSpec is using NumberedPartitionChunk, so we use it here too.
+ return new NumberedPartitionChunk<>(partitionId, 0, obj);
+ }
+
+ @Override
+ public HashBasedNumberedShardSpec convert(int numCorePartitions)
+ {
+ return new HashBasedNumberedShardSpec(
+ partitionId,
+ numCorePartitions,
+ bucketId,
+ numBuckets,
+ partitionDimensions,
+ jsonMapper
+ );
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BuildingHashBasedNumberedShardSpec that = (BuildingHashBasedNumberedShardSpec) o;
+ return partitionId == that.partitionId &&
+ bucketId == that.bucketId &&
+ numBuckets == that.numBuckets &&
+ Objects.equals(partitionDimensions, that.partitionDimensions);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(partitionId, bucketId, numBuckets, partitionDimensions);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "BuildingHashBasedNumberedShardSpec{" +
+ "partitionId=" + partitionId +
+ ", bucketId=" + bucketId +
+ ", numBuckets=" + numBuckets +
+ ", partitionDimensions=" + partitionDimensions +
+ '}';
+ }
+}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpec.java
index 4604f23..a179d3c 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpec.java
@@ -20,45 +20,20 @@
package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
-import com.google.common.collect.RangeSet;
-import org.apache.druid.data.input.InputRow;
-import java.util.List;
-import java.util.Map;
import java.util.Objects;
/**
- * This is a special shardSpec which is temporarily used during batch ingestion. In Druid, there is a concept
- * of core partition set which is a set of segments atomically becoming queryable together in Brokers. The core
- * partition set is represented as a range of partitionIds. For {@link NumberedShardSpec}, the core partition set
- * is [0, {@link NumberedShardSpec#partitions}).
+ * See {@link BuildingShardSpec} for how this class is used.
*
- * The NumberedShardSpec is used for dynamic partitioning which is based on the number of rows in each segment.
- * In streaming ingestion, the core partition set size cannot be determined since it's impossible to know how many
- * segments will be created per time chunk. However, in batch ingestion with time chunk locking, the core partition
- * set is the set of segments created by an initial task or an overwriting task. Since the core partition set is
- * determined when the task publishes segments at the end, the task postpones creating proper NumberedShardSpec
- * until the end.
- *
- * This shardSpec is used for such use case. A non-appending batch task can use this shardSpec until it publishes
- * segments at last. When it publishes segments, it should convert the shardSpec of those segments to NumberedShardSpec.
- * See {@code SegmentPublisherHelper#annotateShardSpec} for converting to NumberedShardSpec. Note that, when
- * the segment lock is used, the Overlord coordinates the segment allocation and this class is never used. Instead,
- * the task sends {@link PartialShardSpec} to the Overlord to allocate a new segment. The result segment could have
- * either a {@link ShardSpec} (for root generation segments) or an {@link OverwriteShardSpec} (for non-root
- * generation segments).
- *
- * This class should be Jackson-serializable as the subtasks can send it to the parallel task in parallel ingestion.
- *
- * Finally, this shardSpec has only partitionId which is same as {@link LinearShardSpec}. The difference between
+ * This shardSpec has only partitionId which is same as {@link LinearShardSpec}. The difference between
* them is this shardSpec should never be published and so never be used in other places such as Broker timeline.
*
* @see NumberedShardSpec
*/
-public class BuildingNumberedShardSpec implements ShardSpec
+public class BuildingNumberedShardSpec implements BuildingShardSpec<NumberedShardSpec>
{
public static final String TYPE = "building_numbered";
@@ -71,7 +46,15 @@ public class BuildingNumberedShardSpec implements ShardSpec
this.partitionId = partitionId;
}
- public NumberedShardSpec toNumberedShardSpec(int numTotalPartitions)
+ @Override
+ public int getBucketId()
+ {
+ // This method is currently not called when the shardSpec type is this class.
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public NumberedShardSpec convert(int numTotalPartitions)
{
return new NumberedShardSpec(partitionId, numTotalPartitions);
}
@@ -92,39 +75,6 @@ public class BuildingNumberedShardSpec implements ShardSpec
}
@Override
- public ShardSpecLookup getLookup(List<ShardSpec> shardSpecs)
- {
- return NumberedShardSpec.createLookup(shardSpecs);
- }
-
- // The below methods are used on the query side, and so must not be called for this shardSpec.
-
- @Override
- public boolean isInChunk(long timestamp, InputRow inputRow)
- {
- throw new UnsupportedOperationException();
- }
-
- @JsonIgnore
- @Override
- public List<String> getDomainDimensions()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean possibleInDomain(Map<String, RangeSet<String>> domain)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isCompatible(Class<? extends ShardSpec> other)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public boolean equals(Object o)
{
if (this == o) {
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/BuildingShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/BuildingShardSpec.java
new file mode 100644
index 0000000..973fdf4
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/timeline/partition/BuildingShardSpec.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.RangeSet;
+import org.apache.druid.data.input.InputRow;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is one of the special shardSpecs which are temporarily used during batch ingestion. In Druid, there is a
+ * concept of core partition set which is a set of segments atomically becoming queryable together in Brokers. The core
+ * partition set is represented as a range of partitionIds, i.e., [0, {@link ShardSpec#getNumCorePartitions()}).
+ *
+ * In streaming ingestion, the core partition set size cannot be determined since it's impossible to know how many
+ * segments will be created per time chunk upfront. However, in batch ingestion with time chunk locking, the core
+ * partition set is the set of segments created by an initial task or an overwriting task. Since the core partition
+ * set is determined when the task publishes segments at the end, the task postpones creating proper {@link ShardSpec}
+ * until the end.
+ *
+ * This BuildingShardSpec is used for such use case. A non-appending batch task can use this shardSpec until it
+ * publishes segments at last. When it publishes segments, it should convert the buildingShardSpec of those segments
+ * to a proper shardSpec type {@link T}. See {@code SegmentPublisherHelper#annotateShardSpec} for converting shardSpec.
+ * Note that, when the segment lock is used, the Overlord coordinates the segment allocation and this class is never
+ * used. Instead, the task sends {@link PartialShardSpec} to the Overlord to allocate a new segment. The result segment
+ * could have either a {@link ShardSpec} (for root generation segments) or an {@link OverwriteShardSpec} (for non-root
+ * generation segments).
+ *
+ * This class should be Jackson-serializable as the subtasks can send it to the parallel task in parallel ingestion.
+ *
+ * This interface doesn't really have to extend {@link ShardSpec}. The only reason is the ShardSpec is used in many
+ * places such as {@link org.apache.druid.timeline.DataSegment}, and we have to modify those places to allow other
+ * types than ShardSpec which seems pretty invasive. Maybe we could clean up this mess someday in the future.
+ *
+ * @see BucketNumberedShardSpec
+ */
+public interface BuildingShardSpec<T extends ShardSpec> extends ShardSpec
+{
+ int getBucketId();
+
+ T convert(int numCorePartitions);
+
+ @Override
+ default int getNumCorePartitions()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * {@link BucketNumberedShardSpec} should be used for shard spec lookup.
+ */
+ @Override
+ default ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ // The below methods are used on the query side, and so must not be called for this shardSpec.
+
+ @Override
+ default boolean isInChunk(long timestamp, InputRow inputRow)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @JsonIgnore
+ @Override
+ default List<String> getDomainDimensions()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default boolean possibleInDomain(Map<String, RangeSet<String>> domain)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default boolean isCompatible(Class<? extends ShardSpec> other)
+ {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionPartialShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/BuildingSingleDimensionShardSpec.java
similarity index 53%
copy from core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionPartialShardSpec.java
copy to core/src/main/java/org/apache/druid/timeline/partition/BuildingSingleDimensionShardSpec.java
index 22ca97d..6dd0992 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionPartialShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/BuildingSingleDimensionShardSpec.java
@@ -21,99 +21,87 @@ package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
import javax.annotation.Nullable;
import java.util.Objects;
-public class SingleDimensionPartialShardSpec implements PartialShardSpec
+/**
+ * See {@link BuildingShardSpec} for how this class is used.
+ *
+ * @see SingleDimensionShardSpec
+ */
+public class BuildingSingleDimensionShardSpec implements BuildingShardSpec<SingleDimensionShardSpec>
{
- private final String partitionDimension;
+ public static final String TYPE = "building_single_dim";
+
private final int bucketId;
+ private final String dimension;
@Nullable
private final String start;
@Nullable
private final String end;
- private final int numBuckets;
+ private final int partitionId;
@JsonCreator
- public SingleDimensionPartialShardSpec(
- @JsonProperty("partitionDimension") String partitionDimension,
+ public BuildingSingleDimensionShardSpec(
@JsonProperty("bucketId") int bucketId,
+ @JsonProperty("dimension") String dimension,
@JsonProperty("start") @Nullable String start,
@JsonProperty("end") @Nullable String end,
- @JsonProperty("numBuckets") int numBuckets
+ @JsonProperty("partitionNum") int partitionNum
)
{
- this.partitionDimension = partitionDimension;
this.bucketId = bucketId;
+ this.dimension = dimension;
this.start = start;
this.end = end;
- this.numBuckets = numBuckets;
- }
-
- @JsonProperty
- public String getPartitionDimension()
- {
- return partitionDimension;
+ this.partitionId = partitionNum;
}
- @JsonProperty
- public int getBucketId()
+ @JsonProperty("dimension")
+ public String getDimension()
{
- return bucketId;
+ return dimension;
}
- @JsonProperty
@Nullable
+ @JsonProperty("start")
public String getStart()
{
return start;
}
- @JsonProperty
@Nullable
+ @JsonProperty("end")
public String getEnd()
{
return end;
}
- @JsonProperty
- public int getNumBuckets()
+ @Override
+ @JsonProperty("partitionNum")
+ public int getPartitionNum()
{
- return numBuckets;
+ return partitionId;
}
@Override
- public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId)
+ @JsonProperty("bucketId")
+ public int getBucketId()
{
- final int partitionId;
- if (specOfPreviousMaxPartitionId != null) {
- assert specOfPreviousMaxPartitionId instanceof SingleDimensionShardSpec;
- final SingleDimensionShardSpec prevSpec = (SingleDimensionShardSpec) specOfPreviousMaxPartitionId;
- partitionId = prevSpec.getPartitionNum() + 1;
- } else {
- partitionId = 0;
- }
- return complete(objectMapper, partitionId);
+ return bucketId;
}
@Override
- public ShardSpec complete(ObjectMapper objectMapper, int partitionId)
+ public SingleDimensionShardSpec convert(int numCorePartitions)
{
- // TODO: bucketId and numBuckets should be added to SingleDimensionShardSpec in a follow-up PR.
- return new SingleDimensionShardSpec(
- partitionDimension,
- start,
- end,
- partitionId
- );
+ return new SingleDimensionShardSpec(dimension, start, end, partitionId, numCorePartitions);
}
@Override
- public Class<? extends ShardSpec> getShardSpecClass()
+ public <T> PartitionChunk<T> createChunk(T obj)
{
- return SingleDimensionShardSpec.class;
+ return new NumberedPartitionChunk<>(partitionId, 0, obj);
}
@Override
@@ -125,10 +113,10 @@ public class SingleDimensionPartialShardSpec implements PartialShardSpec
if (o == null || getClass() != o.getClass()) {
return false;
}
- SingleDimensionPartialShardSpec that = (SingleDimensionPartialShardSpec) o;
+ BuildingSingleDimensionShardSpec that = (BuildingSingleDimensionShardSpec) o;
return bucketId == that.bucketId &&
- numBuckets == that.numBuckets &&
- Objects.equals(partitionDimension, that.partitionDimension) &&
+ partitionId == that.partitionId &&
+ Objects.equals(dimension, that.dimension) &&
Objects.equals(start, that.start) &&
Objects.equals(end, that.end);
}
@@ -136,6 +124,18 @@ public class SingleDimensionPartialShardSpec implements PartialShardSpec
@Override
public int hashCode()
{
- return Objects.hash(partitionDimension, bucketId, start, end, numBuckets);
+ return Objects.hash(bucketId, dimension, start, end, partitionId);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "BuildingSingleDimensionShardSpec{" +
+ "bucketId=" + bucketId +
+ ", dimension='" + dimension + '\'' +
+ ", start='" + start + '\'' +
+ ", end='" + end + '\'' +
+ ", partitionNum=" + partitionId +
+ '}';
}
}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpec.java
index 495a852..0e32ee0 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpec.java
@@ -33,15 +33,18 @@ public class HashBasedNumberedPartialShardSpec implements PartialShardSpec
@Nullable
private final List<String> partitionDimensions;
+ private final int bucketId;
private final int numBuckets;
@JsonCreator
public HashBasedNumberedPartialShardSpec(
@JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions,
+ @JsonProperty("bucketId") int bucketId,
@JsonProperty("numPartitions") int numBuckets
)
{
this.partitionDimensions = partitionDimensions;
+ this.bucketId = bucketId;
this.numBuckets = numBuckets;
}
@@ -52,6 +55,12 @@ public class HashBasedNumberedPartialShardSpec implements PartialShardSpec
return partitionDimensions;
}
+ @JsonProperty
+ public int getBucketId()
+ {
+ return bucketId;
+ }
+
@JsonProperty("numPartitions")
public int getNumBuckets()
{
@@ -61,9 +70,16 @@ public class HashBasedNumberedPartialShardSpec implements PartialShardSpec
@Override
public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId)
{
- final HashBasedNumberedShardSpec prevSpec = (HashBasedNumberedShardSpec) specOfPreviousMaxPartitionId;
+ // The shardSpec is created by the Overlord.
+ // For batch tasks, this code can be executed only with segment locking (forceTimeChunkLock = false).
+ // In this mode, you can have 2 or more tasks concurrently ingesting into the same time chunk of
+ // the same datasource. Since there is no restriction for those tasks in segment allocation, the
+ // allocated IDs for each task can interleave. As a result, the core partition set cannot be
+ // represented as a range. We always set 0 for the core partition set size if this is an initial segment.
return new HashBasedNumberedShardSpec(
- prevSpec == null ? 0 : prevSpec.getPartitionNum() + 1,
+ specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getPartitionNum() + 1,
+ specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getNumCorePartitions(),
+ bucketId,
numBuckets,
partitionDimensions,
objectMapper
@@ -73,7 +89,7 @@ public class HashBasedNumberedPartialShardSpec implements PartialShardSpec
@Override
public ShardSpec complete(ObjectMapper objectMapper, int partitionId)
{
- return new HashBasedNumberedShardSpec(partitionId, numBuckets, partitionDimensions, objectMapper);
+ return new HashBasedNumberedShardSpec(partitionId, 0, bucketId, numBuckets, partitionDimensions, objectMapper);
}
@Override
@@ -92,13 +108,14 @@ public class HashBasedNumberedPartialShardSpec implements PartialShardSpec
return false;
}
HashBasedNumberedPartialShardSpec that = (HashBasedNumberedPartialShardSpec) o;
- return numBuckets == that.numBuckets &&
+ return bucketId == that.bucketId &&
+ numBuckets == that.numBuckets &&
Objects.equals(partitionDimensions, that.partitionDimensions);
}
@Override
public int hashCode()
{
- return Objects.hash(partitionDimensions, numBuckets);
+ return Objects.hash(partitionDimensions, bucketId, numBuckets);
}
}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
index 2d8f525..23cdb4e 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
@@ -35,29 +35,55 @@ import org.apache.druid.data.input.Rows;
import javax.annotation.Nullable;
import java.util.List;
+import java.util.Objects;
public class HashBasedNumberedShardSpec extends NumberedShardSpec
{
+ static final List<String> DEFAULT_PARTITION_DIMENSIONS = ImmutableList.of();
+
private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
- private static final List<String> DEFAULT_PARTITION_DIMENSIONS = ImmutableList.of();
+ private final int bucketId;
+ /**
+ * Number of hash buckets
+ */
+ private final int numBuckets;
private final ObjectMapper jsonMapper;
@JsonIgnore
private final List<String> partitionDimensions;
@JsonCreator
public HashBasedNumberedShardSpec(
- @JsonProperty("partitionNum") int partitionNum, // partitionId
- @JsonProperty("partitions") int partitions, // # of partitions
+ @JsonProperty("partitionNum") int partitionNum, // partitionId, hash bucketId
+ @JsonProperty("partitions") int partitions, // core partition set size
+ @JsonProperty("bucketId") @Nullable Integer bucketId, // nullable for backward compatibility
+ @JsonProperty("numBuckets") @Nullable Integer numBuckets, // nullable for backward compatibility
@JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions,
@JacksonInject ObjectMapper jsonMapper
)
{
super(partitionNum, partitions);
+ // Use partitionId as bucketId if it's missing.
+ this.bucketId = bucketId == null ? partitionNum : bucketId;
+ // If numBuckets is missing, assume that any hash bucket is not empty.
+ // Use the core partition set size as the number of buckets.
+ this.numBuckets = numBuckets == null ? partitions : numBuckets;
this.jsonMapper = jsonMapper;
this.partitionDimensions = partitionDimensions == null ? DEFAULT_PARTITION_DIMENSIONS : partitionDimensions;
}
+ @JsonProperty
+ public int getBucketId()
+ {
+ return bucketId;
+ }
+
+ @JsonProperty
+ public int getNumBuckets()
+ {
+ return numBuckets;
+ }
+
@JsonProperty("partitionDimensions")
public List<String> getPartitionDimensions()
{
@@ -73,7 +99,7 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
@Override
public boolean isInChunk(long timestamp, InputRow inputRow)
{
- return (((long) hash(timestamp, inputRow)) - getPartitionNum()) % getPartitions() == 0;
+ return (((long) hash(timestamp, inputRow)) - bucketId) % numBuckets == 0;
}
/**
@@ -88,7 +114,12 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
*/
protected int hash(long timestamp, InputRow inputRow)
{
- final List<Object> groupKey = getGroupKey(timestamp, inputRow);
+ return hash(jsonMapper, partitionDimensions, timestamp, inputRow);
+ }
+
+ public static int hash(ObjectMapper jsonMapper, List<String> partitionDimensions, long timestamp, InputRow inputRow)
+ {
+ final List<Object> groupKey = getGroupKey(partitionDimensions, timestamp, inputRow);
try {
return hash(jsonMapper, groupKey);
}
@@ -98,7 +129,7 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
}
@VisibleForTesting
- List<Object> getGroupKey(final long timestamp, final InputRow inputRow)
+ static List<Object> getGroupKey(final List<String> partitionDimensions, final long timestamp, final InputRow inputRow)
{
if (partitionDimensions.isEmpty()) {
return Rows.toGroupKey(timestamp, inputRow);
@@ -114,21 +145,57 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
}
@Override
- public String toString()
+ public ShardSpecLookup getLookup(final List<? extends ShardSpec> shardSpecs)
{
- return "HashBasedNumberedShardSpec{" +
- "partitionNum=" + getPartitionNum() +
- ", partitions=" + getPartitions() +
- ", partitionDimensions=" + getPartitionDimensions() +
- '}';
+ return createHashLookup(jsonMapper, partitionDimensions, shardSpecs, numBuckets);
}
- @Override
- public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
+ static ShardSpecLookup createHashLookup(
+ ObjectMapper jsonMapper,
+ List<String> partitionDimensions,
+ List<? extends ShardSpec> shardSpecs,
+ int numBuckets
+ )
{
return (long timestamp, InputRow row) -> {
- int index = Math.abs(hash(timestamp, row) % getPartitions());
+ int index = Math.abs(hash(jsonMapper, partitionDimensions, timestamp, row) % numBuckets);
return shardSpecs.get(index);
};
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ HashBasedNumberedShardSpec that = (HashBasedNumberedShardSpec) o;
+ return bucketId == that.bucketId &&
+ numBuckets == that.numBuckets &&
+ Objects.equals(partitionDimensions, that.partitionDimensions);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(super.hashCode(), bucketId, numBuckets, partitionDimensions);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "HashBasedNumberedShardSpec{" +
+ "partitionNum=" + getPartitionNum() +
+ ", partitions=" + getNumCorePartitions() +
+ ", bucketId=" + bucketId +
+ ", numBuckets=" + numBuckets +
+ ", partitionDimensions=" + partitionDimensions +
+ '}';
+ }
}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/HashBucketShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/HashBucketShardSpec.java
new file mode 100644
index 0000000..324c020
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/timeline/partition/HashBucketShardSpec.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.InputRow;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * See {@link BucketNumberedShardSpec} for how this class is used.
+ *
+ * @see BuildingHashBasedNumberedShardSpec
+ */
+public class HashBucketShardSpec implements BucketNumberedShardSpec<BuildingHashBasedNumberedShardSpec>
+{
+ public static final String TYPE = "bucket_hash";
+
+ private final int bucketId;
+ private final int numBuckets;
+ private final List<String> partitionDimensions;
+ private final ObjectMapper jsonMapper;
+
+ @JsonCreator
+ public HashBucketShardSpec(
+ @JsonProperty("bucketId") int bucketId,
+ @JsonProperty("numBuckets") int numBuckets,
+ @JsonProperty("partitionDimensions") List<String> partitionDimensions,
+ @JacksonInject ObjectMapper jsonMapper
+ )
+ {
+ this.bucketId = bucketId;
+ this.numBuckets = numBuckets;
+ this.partitionDimensions = partitionDimensions == null
+ ? HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS
+ : partitionDimensions;
+ this.jsonMapper = jsonMapper;
+ }
+
+ @Override
+ @JsonProperty
+ public int getBucketId()
+ {
+ return bucketId;
+ }
+
+ @JsonProperty
+ public int getNumBuckets()
+ {
+ return numBuckets;
+ }
+
+ @JsonProperty
+ public List<String> getPartitionDimensions()
+ {
+ return partitionDimensions;
+ }
+
+ @Override
+ public BuildingHashBasedNumberedShardSpec convert(int partitionId)
+ {
+ return new BuildingHashBasedNumberedShardSpec(partitionId, bucketId, numBuckets, partitionDimensions, jsonMapper);
+ }
+
+ @Override
+ public boolean isInChunk(long timestamp, InputRow inputRow)
+ {
+ // not in use
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs)
+ {
+ return HashBasedNumberedShardSpec.createHashLookup(jsonMapper, partitionDimensions, shardSpecs, numBuckets);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HashBucketShardSpec that = (HashBucketShardSpec) o;
+ return bucketId == that.bucketId &&
+ numBuckets == that.numBuckets &&
+ Objects.equals(partitionDimensions, that.partitionDimensions);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(bucketId, numBuckets, partitionDimensions);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "HashBucket{" +
+ ", bucketId=" + bucketId +
+ ", numBuckets=" + numBuckets +
+ ", partitionDimensions=" + partitionDimensions +
+ '}';
+ }
+}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/LinearShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/LinearShardSpec.java
index 1ebb24e..95b0bd8 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/LinearShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/LinearShardSpec.java
@@ -50,7 +50,13 @@ public final class LinearShardSpec implements ShardSpec
}
@Override
- public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
+ public int getNumCorePartitions()
+ {
+ return 0;
+ }
+
+ @Override
+ public ShardSpecLookup getLookup(final List<? extends ShardSpec> shardSpecs)
{
return (long timestamp, InputRow row) -> shardSpecs.get(0);
}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java
index dde9216..d530118 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java
@@ -66,7 +66,13 @@ public class NoneShardSpec implements ShardSpec
}
@Override
- public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
+ public int getNumCorePartitions()
+ {
+ return 0;
+ }
+
+ @Override
+ public ShardSpecLookup getLookup(final List<? extends ShardSpec> shardSpecs)
{
return (long timestamp, InputRow row) -> shardSpecs.get(0);
}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpec.java
index dbbb8f6..adb0d28 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpec.java
@@ -188,7 +188,7 @@ public class NumberedOverwriteShardSpec implements OverwriteShardSpec
}
@Override
- public ShardSpecLookup getLookup(List<ShardSpec> shardSpecs)
+ public ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs)
{
return (long timestamp, InputRow row) -> shardSpecs.get(0);
}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/NumberedPartialShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/NumberedPartialShardSpec.java
index 7c7b975..7305028 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/NumberedPartialShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/NumberedPartialShardSpec.java
@@ -50,7 +50,7 @@ public class NumberedPartialShardSpec implements PartialShardSpec
return new NumberedShardSpec(0, 0);
} else {
final NumberedShardSpec prevSpec = (NumberedShardSpec) specOfPreviousMaxPartitionId;
- return new NumberedShardSpec(prevSpec.getPartitionNum() + 1, prevSpec.getPartitions());
+ return new NumberedShardSpec(prevSpec.getPartitionNum() + 1, prevSpec.getNumCorePartitions());
}
}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java
index 6f8898e..5db6bf6 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java
@@ -66,12 +66,17 @@ public class NumberedShardSpec implements ShardSpec
}
@Override
- public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
+ public ShardSpecLookup getLookup(final List<? extends ShardSpec> shardSpecs)
+ {
+ return createNumberedLookup(shardSpecs);
+ }
+
+ static ShardSpecLookup createNumberedLookup(List<? extends ShardSpec> shardSpecs)
{
return createLookup(shardSpecs);
}
- static ShardSpecLookup createLookup(List<ShardSpec> shardSpecs)
+ static ShardSpecLookup createLookup(List<? extends ShardSpec> shardSpecs)
{
return (long timestamp, InputRow row) -> shardSpecs.get(0);
}
@@ -94,8 +99,9 @@ public class NumberedShardSpec implements ShardSpec
return other == NumberedShardSpec.class || other == NumberedOverwriteShardSpec.class;
}
+ @Override
@JsonProperty("partitions")
- public int getPartitions()
+ public int getNumCorePartitions()
{
return partitions;
}
@@ -127,16 +133,12 @@ public class NumberedShardSpec implements ShardSpec
if (this == o) {
return true;
}
-
- if (!(o instanceof NumberedShardSpec)) {
- return false;
- }
-
- final NumberedShardSpec that = (NumberedShardSpec) o;
- if (partitionNum != that.partitionNum) {
+ if (o == null || getClass() != o.getClass()) {
return false;
}
- return partitions == that.partitions;
+ NumberedShardSpec that = (NumberedShardSpec) o;
+ return partitionNum == that.partitionNum &&
+ partitions == that.partitions;
}
@Override
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/OverwriteShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/OverwriteShardSpec.java
index 0fea564..6a77ea5 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/OverwriteShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/OverwriteShardSpec.java
@@ -28,6 +28,17 @@ package org.apache.druid.timeline.partition;
*/
public interface OverwriteShardSpec extends ShardSpec
{
+ /**
+ * The core partition concept is not used with segment locking. Instead, the {@link AtomicUpdateGroup} is used
+ * to atomically overshadow segments. Here, we always returns 0 so that the {@link PartitionHolder} skips checking
+ * the completeness of the core partitions.
+ */
+ @Override
+ default int getNumCorePartitions()
+ {
+ return 0;
+ }
+
default OverwriteShardSpec withAtomicUpdateGroupSize(int atomicUpdateGroupSize)
{
return withAtomicUpdateGroupSize((short) atomicUpdateGroupSize);
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/RangeBucketShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/RangeBucketShardSpec.java
new file mode 100644
index 0000000..a329131
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/timeline/partition/RangeBucketShardSpec.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.InputRow;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * See {@link BucketNumberedShardSpec} for how this class is used.
+ *
+ * @see BuildingSingleDimensionShardSpec
+ */
+public class RangeBucketShardSpec implements BucketNumberedShardSpec<BuildingSingleDimensionShardSpec>
+{
+ public static final String TYPE = "bucket_single_dim";
+
+ private final int bucketId;
+ private final String dimension;
+ @Nullable
+ private final String start;
+ @Nullable
+ private final String end;
+
+ @JsonCreator
+ public RangeBucketShardSpec(
+ @JsonProperty("bucketId") int bucketId,
+ @JsonProperty("dimension") String dimension,
+ @JsonProperty("start") @Nullable String start,
+ @JsonProperty("end") @Nullable String end
+ )
+ {
+ this.bucketId = bucketId;
+ this.dimension = dimension;
+ this.start = start;
+ this.end = end;
+ }
+
+ @Override
+ @JsonProperty
+ public int getBucketId()
+ {
+ return bucketId;
+ }
+
+ @JsonProperty
+ public String getDimension()
+ {
+ return dimension;
+ }
+
+ @Nullable
+ @JsonProperty
+ public String getStart()
+ {
+ return start;
+ }
+
+ @Nullable
+ @JsonProperty
+ public String getEnd()
+ {
+ return end;
+ }
+
+ @Override
+ public BuildingSingleDimensionShardSpec convert(int partitionId)
+ {
+ return new BuildingSingleDimensionShardSpec(bucketId, dimension, start, end, partitionId);
+ }
+
+ @Override
+ public boolean isInChunk(long timestamp, InputRow inputRow)
+ {
+ return SingleDimensionShardSpec.isInChunk(dimension, start, end, inputRow);
+ }
+
+ @Override
+ public ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs)
+ {
+ return SingleDimensionShardSpec.createLookup(shardSpecs);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RangeBucketShardSpec bucket = (RangeBucketShardSpec) o;
+ return bucketId == bucket.bucketId &&
+ Objects.equals(dimension, bucket.dimension) &&
+ Objects.equals(start, bucket.start) &&
+ Objects.equals(end, bucket.end);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(bucketId, dimension, start, end);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "RangeBucket{" +
+ ", bucketId=" + bucketId +
+ ", dimension='" + dimension + '\'' +
+ ", start='" + start + '\'' +
+ ", end='" + end + '\'' +
+ '}';
+ }
+}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
index 06ff8dd..8f59d39 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
@@ -40,7 +40,16 @@ import java.util.Map;
@JsonSubTypes.Type(name = "numbered", value = NumberedShardSpec.class),
@JsonSubTypes.Type(name = "hashed", value = HashBasedNumberedShardSpec.class),
@JsonSubTypes.Type(name = NumberedOverwriteShardSpec.TYPE, value = NumberedOverwriteShardSpec.class),
- @JsonSubTypes.Type(name = BuildingNumberedShardSpec.TYPE, value = BuildingNumberedShardSpec.class)
+ // BuildingShardSpecs are the shardSpec with missing numCorePartitions, and thus must not be published.
+ // See BuildingShardSpec for more details.
+ @JsonSubTypes.Type(name = BuildingNumberedShardSpec.TYPE, value = BuildingNumberedShardSpec.class),
+ @JsonSubTypes.Type(name = BuildingHashBasedNumberedShardSpec.TYPE, value = BuildingHashBasedNumberedShardSpec.class),
+ @JsonSubTypes.Type(name = BuildingSingleDimensionShardSpec.TYPE, value = BuildingSingleDimensionShardSpec.class),
+ // BucketShardSpecs are the shardSpec with missing partitionId and numCorePartitions.
+ // These shardSpecs must not be used in segment push.
+ // See BucketShardSpec for more details.
+ @JsonSubTypes.Type(name = HashBucketShardSpec.TYPE, value = HashBucketShardSpec.class),
+ @JsonSubTypes.Type(name = RangeBucketShardSpec.TYPE, value = RangeBucketShardSpec.class)
})
public interface ShardSpec
{
@@ -55,6 +64,8 @@ public interface ShardSpec
*/
int getPartitionNum();
+ int getNumCorePartitions();
+
/**
* Returns the start root partition ID of the atomic update group which this segment belongs to.
*
@@ -96,7 +107,7 @@ public interface ShardSpec
}
@JsonIgnore
- ShardSpecLookup getLookup(List<ShardSpec> shardSpecs);
+ ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs);
/**
* Get dimensions who have possible range for the rows this shard contains.
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionPartialShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionPartialShardSpec.java
index 22ca97d..e2ebd29 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionPartialShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionPartialShardSpec.java
@@ -87,15 +87,19 @@ public class SingleDimensionPartialShardSpec implements PartialShardSpec
@Override
public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId)
{
- final int partitionId;
- if (specOfPreviousMaxPartitionId != null) {
- assert specOfPreviousMaxPartitionId instanceof SingleDimensionShardSpec;
- final SingleDimensionShardSpec prevSpec = (SingleDimensionShardSpec) specOfPreviousMaxPartitionId;
- partitionId = prevSpec.getPartitionNum() + 1;
- } else {
- partitionId = 0;
- }
- return complete(objectMapper, partitionId);
+ // The shardSpec is created by the Overlord.
+ // For batch tasks, this code can be executed only with segment locking (forceTimeChunkLock = false).
+ // In this mode, you can have 2 or more tasks concurrently ingesting into the same time chunk of
+ // the same datasource. Since there is no restriction for those tasks in segment allocation, the
+ // allocated IDs for each task can interleave. As a result, the core partition set cannot be
+ // represented as a range. We always set 0 for the core partition set size if this is an initial segment.
+ return new SingleDimensionShardSpec(
+ partitionDimension,
+ start,
+ end,
+ specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getPartitionNum() + 1,
+ specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getNumCorePartitions()
+ );
}
@Override
@@ -106,7 +110,8 @@ public class SingleDimensionPartialShardSpec implements PartialShardSpec
partitionDimension,
start,
end,
- partitionId
+ partitionId,
+ 0
);
}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
index 9db390c..1a00534 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
@@ -38,12 +38,15 @@ import java.util.Objects;
*/
public class SingleDimensionShardSpec implements ShardSpec
{
+ public static final int UNKNOWN_NUM_CORE_PARTITIONS = -1;
+
private final String dimension;
@Nullable
private final String start;
@Nullable
private final String end;
private final int partitionNum;
+ private final int numCorePartitions;
/**
* @param dimension partition dimension
@@ -56,7 +59,8 @@ public class SingleDimensionShardSpec implements ShardSpec
@JsonProperty("dimension") String dimension,
@JsonProperty("start") @Nullable String start,
@JsonProperty("end") @Nullable String end,
- @JsonProperty("partitionNum") int partitionNum
+ @JsonProperty("partitionNum") int partitionNum,
+ @JsonProperty("numCorePartitions") @Nullable Integer numCorePartitions // nullable for backward compatibility
)
{
Preconditions.checkArgument(partitionNum >= 0, "partitionNum >= 0");
@@ -64,6 +68,18 @@ public class SingleDimensionShardSpec implements ShardSpec
this.start = start;
this.end = end;
this.partitionNum = partitionNum;
+ this.numCorePartitions = numCorePartitions == null ? UNKNOWN_NUM_CORE_PARTITIONS : numCorePartitions;
+ }
+
+ public SingleDimensionShardSpec withNumCorePartitions(int numCorePartitions)
+ {
+ return new SingleDimensionShardSpec(
+ dimension,
+ start,
+ end,
+ partitionNum,
+ numCorePartitions
+ );
}
@JsonProperty("dimension")
@@ -94,7 +110,19 @@ public class SingleDimensionShardSpec implements ShardSpec
}
@Override
- public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
+ @JsonProperty
+ public int getNumCorePartitions()
+ {
+ return numCorePartitions;
+ }
+
+ @Override
+ public ShardSpecLookup getLookup(final List<? extends ShardSpec> shardSpecs)
+ {
+ return createLookup(shardSpecs);
+ }
+
+ static ShardSpecLookup createLookup(List<? extends ShardSpec> shardSpecs)
{
return (long timestamp, InputRow row) -> {
for (ShardSpec spec : shardSpecs) {
@@ -146,22 +174,20 @@ public class SingleDimensionShardSpec implements ShardSpec
@Override
public <T> PartitionChunk<T> createChunk(T obj)
{
- return new StringPartitionChunk<T>(start, end, partitionNum, obj);
+ if (numCorePartitions == UNKNOWN_NUM_CORE_PARTITIONS) {
+ return new StringPartitionChunk<>(start, end, partitionNum, obj);
+ } else {
+ return new NumberedPartitionChunk<>(partitionNum, numCorePartitions, obj);
+ }
}
@Override
public boolean isInChunk(long timestamp, InputRow inputRow)
{
- final List<String> values = inputRow.getDimension(dimension);
-
- if (values == null || values.size() != 1) {
- return checkValue(null);
- } else {
- return checkValue(values.get(0));
- }
+ return isInChunk(dimension, start, end, inputRow);
}
- private boolean checkValue(String value)
+ private static boolean checkValue(@Nullable String start, @Nullable String end, String value)
{
if (value == null) {
return start == null;
@@ -175,15 +201,20 @@ public class SingleDimensionShardSpec implements ShardSpec
(end == null || value.compareTo(end) < 0);
}
- @Override
- public String toString()
+ public static boolean isInChunk(
+ String dimension,
+ @Nullable String start,
+ @Nullable String end,
+ InputRow inputRow
+ )
{
- return "SingleDimensionShardSpec{" +
- "dimension='" + dimension + '\'' +
- ", start='" + start + '\'' +
- ", end='" + end + '\'' +
- ", partitionNum=" + partitionNum +
- '}';
+ final List<String> values = inputRow.getDimension(dimension);
+
+ if (values == null || values.size() != 1) {
+ return checkValue(start, end, null);
+ } else {
+ return checkValue(start, end, values.get(0));
+ }
}
@Override
@@ -195,16 +226,29 @@ public class SingleDimensionShardSpec implements ShardSpec
if (o == null || getClass() != o.getClass()) {
return false;
}
- SingleDimensionShardSpec that = (SingleDimensionShardSpec) o;
- return partitionNum == that.partitionNum &&
- Objects.equals(dimension, that.dimension) &&
- Objects.equals(start, that.start) &&
- Objects.equals(end, that.end);
+ SingleDimensionShardSpec shardSpec = (SingleDimensionShardSpec) o;
+ return partitionNum == shardSpec.partitionNum &&
+ numCorePartitions == shardSpec.numCorePartitions &&
+ Objects.equals(dimension, shardSpec.dimension) &&
+ Objects.equals(start, shardSpec.start) &&
+ Objects.equals(end, shardSpec.end);
}
@Override
public int hashCode()
{
- return Objects.hash(dimension, start, end, partitionNum);
+ return Objects.hash(dimension, start, end, partitionNum, numCorePartitions);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SingleDimensionShardSpec{" +
+ "dimension='" + dimension + '\'' +
+ ", start='" + start + '\'' +
+ ", end='" + end + '\'' +
+ ", partitionNum=" + partitionNum +
+ ", numCorePartitions=" + numCorePartitions +
+ '}';
}
}
diff --git a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
index 6d0af3f..c2b0b76 100644
--- a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
@@ -75,7 +75,13 @@ public class DataSegmentTest
}
@Override
- public ShardSpecLookup getLookup(List<ShardSpec> shardSpecs)
+ public int getNumCorePartitions()
+ {
+ return 0;
+ }
+
+ @Override
+ public ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs)
{
return null;
}
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpecTest.java
new file mode 100644
index 0000000..2c052d5
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpecTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues.Std;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BuildingHashBasedNumberedShardSpecTest
+{
+ private final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper();
+
+ @Test
+ public void testConvert()
+ {
+ Assert.assertEquals(
+ new HashBasedNumberedShardSpec(5, 10, 5, 12, ImmutableList.of("dim"), mapper),
+ new BuildingHashBasedNumberedShardSpec(5, 5, 12, ImmutableList.of("dim"), mapper).convert(10)
+ );
+ }
+
+ @Test
+ public void testCreateChunk()
+ {
+ Assert.assertEquals(
+ new NumberedPartitionChunk<>(5, 0, "test"),
+ new BuildingHashBasedNumberedShardSpec(5, 5, 12, ImmutableList.of("dim"), mapper)
+ .createChunk("test")
+ );
+ }
+
+ @Test
+ public void testSerde() throws JsonProcessingException
+ {
+ mapper.registerSubtypes(
+ new NamedType(BuildingHashBasedNumberedShardSpec.class, BuildingHashBasedNumberedShardSpec.TYPE)
+ );
+ mapper.setInjectableValues(new Std().addValue(ObjectMapper.class, mapper));
+ final BuildingHashBasedNumberedShardSpec original = new BuildingHashBasedNumberedShardSpec(
+ 3,
+ 5,
+ 12,
+ ImmutableList.of("dim"),
+ mapper
+ );
+ final String json = mapper.writeValueAsString(original);
+ final BuildingHashBasedNumberedShardSpec fromJson = (BuildingHashBasedNumberedShardSpec) mapper.readValue(
+ json,
+ ShardSpec.class
+ );
+ Assert.assertEquals(original, fromJson);
+ }
+
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(BuildingHashBasedNumberedShardSpec.class)
+ .withIgnoredFields("jsonMapper")
+ .withPrefabValues(ObjectMapper.class, new ObjectMapper(), new ObjectMapper())
+ .usingGetClass()
+ .verify();
+ }
+}
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpecTest.java
index 21c5a03..b608d4c 100644
--- a/core/src/test/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpecTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpecTest.java
@@ -23,22 +23,16 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
-import org.apache.druid.data.input.MapBasedInputRow;
-import org.apache.druid.java.util.common.DateTimes;
import org.junit.Assert;
import org.junit.Test;
-import java.util.List;
-
public class BuildingNumberedShardSpecTest
{
@Test
- public void testToNumberedShardSpec()
+ public void testConvert()
{
- Assert.assertEquals(new NumberedShardSpec(5, 10), new BuildingNumberedShardSpec(5).toNumberedShardSpec(10));
+ Assert.assertEquals(new NumberedShardSpec(5, 10), new BuildingNumberedShardSpec(5).convert(10));
}
@Test
@@ -51,32 +45,9 @@ public class BuildingNumberedShardSpecTest
}
@Test
- public void testShardSpecLookup()
- {
- final List<ShardSpec> shardSpecs = ImmutableList.of(
- new BuildingNumberedShardSpec(1),
- new BuildingNumberedShardSpec(2),
- new BuildingNumberedShardSpec(3)
- );
- final ShardSpecLookup lookup = shardSpecs.get(0).getLookup(shardSpecs);
- // Timestamp doesn't matter. It always returns the first shardSpec.
- final long currentTime = DateTimes.nowUtc().getMillis();
- Assert.assertEquals(
- shardSpecs.get(0),
- lookup.getShardSpec(
- currentTime,
- new MapBasedInputRow(
- currentTime,
- ImmutableList.of("dim"), ImmutableMap.of("dim", "val", "time", currentTime)
- )
- )
- );
- }
-
- @Test
public void testSerde() throws JsonProcessingException
{
- final ObjectMapper mapper = new ObjectMapper();
+ final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.registerSubtypes(new NamedType(BuildingNumberedShardSpec.class, BuildingNumberedShardSpec.TYPE));
final BuildingNumberedShardSpec original = new BuildingNumberedShardSpec(5);
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/BuildingSingleDimensionShardSpecTest.java
similarity index 52%
copy from core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java
copy to core/src/test/java/org/apache/druid/timeline/partition/BuildingSingleDimensionShardSpecTest.java
index d605105..d70a42f 100644
--- a/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/partition/BuildingSingleDimensionShardSpecTest.java
@@ -20,34 +20,53 @@
package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Assert;
import org.junit.Test;
-public class NumberedOverwriteShardSpecTest
+public class BuildingSingleDimensionShardSpecTest
{
@Test
- public void testEquals()
+ public void testConvert()
{
- EqualsVerifier.forClass(NumberedOverwriteShardSpec.class).usingGetClass().verify();
+ Assert.assertEquals(
+ new SingleDimensionShardSpec("dim", "start", "end", 5, 10),
+ new BuildingSingleDimensionShardSpec(1, "dim", "start", "end", 5).convert(10)
+ );
+ }
+
+ @Test
+ public void testCreateChunk()
+ {
+ Assert.assertEquals(
+ new NumberedPartitionChunk<>(5, 0, "test"),
+ new BuildingSingleDimensionShardSpec(1, "dim", "start", "end", 5).createChunk("test")
+ );
}
@Test
public void testSerde() throws JsonProcessingException
{
- final ObjectMapper mapper = new ObjectMapper();
- mapper.registerSubtypes(new NamedType(NumberedOverwriteShardSpec.class, NumberedOverwriteShardSpec.TYPE));
- final NumberedOverwriteShardSpec original = new NumberedOverwriteShardSpec(
- PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + 2,
- 0,
- 10,
- (short) 1,
- (short) 3
+ final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper();
+ mapper.registerSubtypes(
+ new NamedType(BuildingSingleDimensionShardSpec.class, BuildingSingleDimensionShardSpec.TYPE)
);
+ mapper.setInjectableValues(new Std().addValue(ObjectMapper.class, mapper));
+ final BuildingSingleDimensionShardSpec original = new BuildingSingleDimensionShardSpec(1, "dim", "start", "end", 5);
final String json = mapper.writeValueAsString(original);
- final NumberedOverwriteShardSpec fromJson = (NumberedOverwriteShardSpec) mapper.readValue(json, ShardSpec.class);
+ final BuildingSingleDimensionShardSpec fromJson = (BuildingSingleDimensionShardSpec) mapper.readValue(
+ json,
+ ShardSpec.class
+ );
Assert.assertEquals(original, fromJson);
}
+
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(BuildingSingleDimensionShardSpec.class).usingGetClass().verify();
+ }
}
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpecTest.java
index 9b2c664..551992b 100644
--- a/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpecTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpecTest.java
@@ -46,6 +46,7 @@ public class HashBasedNumberedPartialShardSpecTest
{
final HashBasedNumberedPartialShardSpec expected = new HashBasedNumberedPartialShardSpec(
ImmutableList.of("dim1", "dim2"),
+ 1,
3
);
final byte[] json = MAPPER.writeValueAsBytes(expected);
@@ -61,14 +62,16 @@ public class HashBasedNumberedPartialShardSpecTest
{
final HashBasedNumberedPartialShardSpec expected = new HashBasedNumberedPartialShardSpec(
ImmutableList.of("dim1", "dim2"),
+ 1,
3
);
final byte[] json = MAPPER.writeValueAsBytes(expected);
//noinspection unchecked
final Map<String, Object> map = MAPPER.readValue(json, Map.class);
- Assert.assertEquals(3, map.size());
+ Assert.assertEquals(4, map.size());
Assert.assertEquals(HashBasedNumberedPartialShardSpec.TYPE, map.get("type"));
Assert.assertEquals(expected.getPartitionDimensions(), map.get("partitionDimensions"));
+ Assert.assertEquals(expected.getBucketId(), map.get("bucketId"));
Assert.assertEquals(expected.getNumBuckets(), map.get("numPartitions"));
}
}
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/HashBucketShardSpecTest.java
similarity index 53%
copy from core/src/test/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpecTest.java
copy to core/src/test/java/org/apache/druid/timeline/partition/HashBucketShardSpecTest.java
index 21c5a03..df2207b 100644
--- a/core/src/test/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpecTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/partition/HashBucketShardSpecTest.java
@@ -20,7 +20,7 @@
package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
@@ -33,12 +33,17 @@ import org.junit.Test;
import java.util.List;
-public class BuildingNumberedShardSpecTest
+public class HashBucketShardSpecTest
{
+ private final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper();
+
@Test
- public void testToNumberedShardSpec()
+ public void testConvert()
{
- Assert.assertEquals(new NumberedShardSpec(5, 10), new BuildingNumberedShardSpec(5).toNumberedShardSpec(10));
+ Assert.assertEquals(
+ new BuildingHashBasedNumberedShardSpec(3, 5, 12, ImmutableList.of("dim"), mapper),
+ new HashBucketShardSpec(5, 12, ImmutableList.of("dim"), mapper).convert(3)
+ );
}
@Test
@@ -46,7 +51,7 @@ public class BuildingNumberedShardSpecTest
{
Assert.assertEquals(
new NumberedPartitionChunk<>(5, 0, "test"),
- new BuildingNumberedShardSpec(5).createChunk("test")
+ new HashBucketShardSpec(5, 12, ImmutableList.of("dim"), mapper).createChunk("test")
);
}
@@ -54,20 +59,39 @@ public class BuildingNumberedShardSpecTest
public void testShardSpecLookup()
{
final List<ShardSpec> shardSpecs = ImmutableList.of(
- new BuildingNumberedShardSpec(1),
- new BuildingNumberedShardSpec(2),
- new BuildingNumberedShardSpec(3)
+ new HashBucketShardSpec(0, 3, ImmutableList.of("dim"), mapper),
+ new HashBucketShardSpec(1, 3, ImmutableList.of("dim"), mapper),
+ new HashBucketShardSpec(2, 3, ImmutableList.of("dim"), mapper)
);
final ShardSpecLookup lookup = shardSpecs.get(0).getLookup(shardSpecs);
- // Timestamp doesn't matter. It always returns the first shardSpec.
final long currentTime = DateTimes.nowUtc().getMillis();
Assert.assertEquals(
+ shardSpecs.get(1),
+ lookup.getShardSpec(
+ currentTime,
+ new MapBasedInputRow(
+ currentTime,
+ ImmutableList.of("dim"), ImmutableMap.of("dim", "1", "time", currentTime)
+ )
+ )
+ );
+ Assert.assertEquals(
+ shardSpecs.get(2),
+ lookup.getShardSpec(
+ currentTime,
+ new MapBasedInputRow(
+ currentTime,
+ ImmutableList.of("dim"), ImmutableMap.of("dim", "2", "time", currentTime)
+ )
+ )
+ );
+ Assert.assertEquals(
shardSpecs.get(0),
lookup.getShardSpec(
currentTime,
new MapBasedInputRow(
currentTime,
- ImmutableList.of("dim"), ImmutableMap.of("dim", "val", "time", currentTime)
+ ImmutableList.of("dim"), ImmutableMap.of("dim", "3", "time", currentTime)
)
)
);
@@ -76,18 +100,22 @@ public class BuildingNumberedShardSpecTest
@Test
public void testSerde() throws JsonProcessingException
{
- final ObjectMapper mapper = new ObjectMapper();
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- mapper.registerSubtypes(new NamedType(BuildingNumberedShardSpec.class, BuildingNumberedShardSpec.TYPE));
- final BuildingNumberedShardSpec original = new BuildingNumberedShardSpec(5);
+ mapper.registerSubtypes(new NamedType(HashBucketShardSpec.class, HashBucketShardSpec.TYPE));
+ mapper.setInjectableValues(new Std().addValue(ObjectMapper.class, mapper));
+
+ final HashBucketShardSpec original = new HashBucketShardSpec(5, 12, ImmutableList.of("dim"), mapper);
final String json = mapper.writeValueAsString(original);
- final BuildingNumberedShardSpec fromJson = (BuildingNumberedShardSpec) mapper.readValue(json, ShardSpec.class);
+ final HashBucketShardSpec fromJson = (HashBucketShardSpec) mapper.readValue(json, ShardSpec.class);
Assert.assertEquals(original, fromJson);
}
@Test
public void testEquals()
{
- EqualsVerifier.forClass(BuildingNumberedShardSpec.class).usingGetClass().verify();
+ EqualsVerifier.forClass(HashBucketShardSpec.class)
+ .withIgnoredFields("jsonMapper")
+ .withPrefabValues(ObjectMapper.class, new ObjectMapper(), new ObjectMapper())
+ .usingGetClass()
+ .verify();
}
}
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java
index d605105..c6d7935 100644
--- a/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java
@@ -37,7 +37,7 @@ public class NumberedOverwriteShardSpecTest
@Test
public void testSerde() throws JsonProcessingException
{
- final ObjectMapper mapper = new ObjectMapper();
+ final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper();
mapper.registerSubtypes(new NamedType(NumberedOverwriteShardSpec.class, NumberedOverwriteShardSpec.TYPE));
final NumberedOverwriteShardSpec original = new NumberedOverwriteShardSpec(
PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + 2,
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java b/core/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java
new file mode 100644
index 0000000..38b9a47
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class PartitionHolderCompletenessTest
+{
+ @Parameterized.Parameters(name = "{1}")
+ public static Iterable<Object[]> constructorFeeder()
+ {
+ return ImmutableList.of(
+ new Object[]{
+ ImmutableList.of(
+ new NumberedShardSpec(0, 3),
+ new NumberedShardSpec(1, 3),
+ new NumberedShardSpec(2, 3)
+ ),
+ NumberedShardSpec.class.getSimpleName()
+ },
+ new Object[]{
+ // Simulate empty hash buckets
+ ImmutableList.of(
+ new HashBasedNumberedShardSpec(0, 3, 0, 5, null, new ObjectMapper()),
+ new HashBasedNumberedShardSpec(1, 3, 2, 5, null, new ObjectMapper()),
+ new HashBasedNumberedShardSpec(2, 3, 3, 5, null, new ObjectMapper())
+ ),
+ HashBasedNumberedShardSpec.class.getSimpleName()
+ },
+ new Object[]{
+ // Simulate empty range buckets
+ ImmutableList.of(
+ new SingleDimensionShardSpec("dim", null, "aaa", 0, 3),
+ new SingleDimensionShardSpec("dim", "bbb", "fff", 1, 3),
+ new SingleDimensionShardSpec("dim", "ttt", "zzz", 2, 3)
+ ),
+ StringUtils.format(
+ "%s with empty buckets",
+ SingleDimensionShardSpec.class.getSimpleName()
+ )
+ },
+ new Object[]{
+ // Simulate old format segments with missing numCorePartitions
+ ImmutableList.of(
+ new SingleDimensionShardSpec("dim", null, "bbb", 0, null),
+ new SingleDimensionShardSpec("dim", "bbb", "fff", 1, null),
+ new SingleDimensionShardSpec("dim", "fff", null, 2, null)
+ ),
+ StringUtils.format(
+ "%s with missing numCorePartitions",
+ SingleDimensionShardSpec.class.getSimpleName()
+ )
+ }
+ );
+ }
+
+ private final List<ShardSpec> shardSpecs;
+
+ public PartitionHolderCompletenessTest(List<ShardSpec> shardSpecs, String paramName)
+ {
+ this.shardSpecs = shardSpecs;
+ }
+
+ @Test
+ public void testIsComplete()
+ {
+ final PartitionHolder<OvershadowableInteger> holder = new PartitionHolder<>(
+ shardSpecs.get(0).createChunk(new OvershadowableInteger("version", shardSpecs.get(0).getPartitionNum(), 0))
+ );
+ for (int i = 0; i < shardSpecs.size() - 1; i++) {
+ Assert.assertFalse(holder.isComplete());
+ final ShardSpec shardSpec = shardSpecs.get(i + 1);
+ holder.add(shardSpec.createChunk(new OvershadowableInteger("version", shardSpec.getPartitionNum(), 0)));
+ }
+ Assert.assertTrue(holder.isComplete());
+ }
+}
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/RangeBucketShardSpecTest.java
similarity index 55%
copy from core/src/test/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpecTest.java
copy to core/src/test/java/org/apache/druid/timeline/partition/RangeBucketShardSpecTest.java
index 21c5a03..d2c06e0 100644
--- a/core/src/test/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpecTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/partition/RangeBucketShardSpecTest.java
@@ -20,7 +20,7 @@
package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
@@ -33,20 +33,23 @@ import org.junit.Test;
import java.util.List;
-public class BuildingNumberedShardSpecTest
+public class RangeBucketShardSpecTest
{
@Test
- public void testToNumberedShardSpec()
+ public void testConvert()
{
- Assert.assertEquals(new NumberedShardSpec(5, 10), new BuildingNumberedShardSpec(5).toNumberedShardSpec(10));
+ Assert.assertEquals(
+ new BuildingSingleDimensionShardSpec(1, "dim", "start", "end", 5),
+ new RangeBucketShardSpec(1, "dim", "start", "end").convert(5)
+ );
}
@Test
public void testCreateChunk()
{
Assert.assertEquals(
- new NumberedPartitionChunk<>(5, 0, "test"),
- new BuildingNumberedShardSpec(5).createChunk("test")
+ new NumberedPartitionChunk<>(1, 0, "test"),
+ new RangeBucketShardSpec(1, "dim", "start", "end").createChunk("test")
);
}
@@ -54,12 +57,11 @@ public class BuildingNumberedShardSpecTest
public void testShardSpecLookup()
{
final List<ShardSpec> shardSpecs = ImmutableList.of(
- new BuildingNumberedShardSpec(1),
- new BuildingNumberedShardSpec(2),
- new BuildingNumberedShardSpec(3)
+ new RangeBucketShardSpec(0, "dim", null, "c"),
+ new RangeBucketShardSpec(1, "dim", "f", "i"),
+ new RangeBucketShardSpec(2, "dim", "i", null)
);
final ShardSpecLookup lookup = shardSpecs.get(0).getLookup(shardSpecs);
- // Timestamp doesn't matter. It always returns the first shardSpec.
final long currentTime = DateTimes.nowUtc().getMillis();
Assert.assertEquals(
shardSpecs.get(0),
@@ -67,7 +69,27 @@ public class BuildingNumberedShardSpecTest
currentTime,
new MapBasedInputRow(
currentTime,
- ImmutableList.of("dim"), ImmutableMap.of("dim", "val", "time", currentTime)
+ ImmutableList.of("dim"), ImmutableMap.of("dim", "a", "time", currentTime)
+ )
+ )
+ );
+ Assert.assertEquals(
+ shardSpecs.get(1),
+ lookup.getShardSpec(
+ currentTime,
+ new MapBasedInputRow(
+ currentTime,
+ ImmutableList.of("dim"), ImmutableMap.of("dim", "g", "time", currentTime)
+ )
+ )
+ );
+ Assert.assertEquals(
+ shardSpecs.get(2),
+ lookup.getShardSpec(
+ currentTime,
+ new MapBasedInputRow(
+ currentTime,
+ ImmutableList.of("dim"), ImmutableMap.of("dim", "k", "time", currentTime)
)
)
);
@@ -76,18 +98,18 @@ public class BuildingNumberedShardSpecTest
@Test
public void testSerde() throws JsonProcessingException
{
- final ObjectMapper mapper = new ObjectMapper();
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- mapper.registerSubtypes(new NamedType(BuildingNumberedShardSpec.class, BuildingNumberedShardSpec.TYPE));
- final BuildingNumberedShardSpec original = new BuildingNumberedShardSpec(5);
+ final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper();
+ mapper.registerSubtypes(new NamedType(RangeBucketShardSpec.class, RangeBucketShardSpec.TYPE));
+ mapper.setInjectableValues(new Std().addValue(ObjectMapper.class, mapper));
+ final RangeBucketShardSpec original = new RangeBucketShardSpec(1, "dim", "start", "end");
final String json = mapper.writeValueAsString(original);
- final BuildingNumberedShardSpec fromJson = (BuildingNumberedShardSpec) mapper.readValue(json, ShardSpec.class);
+ final RangeBucketShardSpec fromJson = (RangeBucketShardSpec) mapper.readValue(json, ShardSpec.class);
Assert.assertEquals(original, fromJson);
}
@Test
public void testEquals()
{
- EqualsVerifier.forClass(BuildingNumberedShardSpec.class).usingGetClass().verify();
+ EqualsVerifier.forClass(RangeBucketShardSpec.class).usingGetClass().verify();
}
}
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/ShardSpecTestUtils.java b/core/src/test/java/org/apache/druid/timeline/partition/ShardSpecTestUtils.java
new file mode 100644
index 0000000..2f15365
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/timeline/partition/ShardSpecTestUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
+public class ShardSpecTestUtils
+{
+ public static ObjectMapper initObjectMapper()
+ {
+ // Copied configurations from org.apache.druid.jackson.DefaultObjectMapper
+ final ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ mapper.configure(MapperFeature.AUTO_DETECT_GETTERS, false);
+ // See https://github.com/FasterXML/jackson-databind/issues/170
+ // configure(MapperFeature.AUTO_DETECT_CREATORS, false);
+ mapper.configure(MapperFeature.AUTO_DETECT_FIELDS, false);
+ mapper.configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false);
+ mapper.configure(MapperFeature.AUTO_DETECT_SETTERS, false);
+ mapper.configure(MapperFeature.ALLOW_FINAL_FIELDS_AS_MUTATORS, false);
+ mapper.configure(SerializationFeature.INDENT_OUTPUT, false);
+ mapper.configure(SerializationFeature.FLUSH_AFTER_WRITE_VALUE, false);
+ return mapper;
+ }
+
+ private ShardSpecTestUtils()
+ {
+ }
+}
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index 981f9f7..799b32e 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -302,7 +302,7 @@ and then by the hash value of `partitionDimensions` (secondary partition key) in
The partitioned data is stored in local storage of
the [middleManager](../design/middlemanager.md) or the [indexer](../design/indexer.md).
- The `partial segment merge` phase is similar to the Reduce phase in MapReduce.
-The Parallel task spawns a new set of worker tasks (type `partial_index_merge`) to merge the partitioned data
+The Parallel task spawns a new set of worker tasks (type `partial_index_generic_merge`) to merge the partitioned data
created in the previous phase. Here, the partitioned data is shuffled based on
the time chunk and the hash value of `partitionDimensions` to be merged; each worker task reads the data
falling in the same time chunk and the same hash value from multiple MiddleManager/Indexer processes and merges
diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
index ec27679..766f510 100644
--- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
+++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
@@ -140,7 +140,7 @@ public class MaterializedViewSupervisorTest
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
- new HashBasedNumberedShardSpec(0, 1, null, null),
+ new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null),
9,
1024
),
@@ -151,7 +151,7 @@ public class MaterializedViewSupervisorTest
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
- new HashBasedNumberedShardSpec(0, 1, null, null),
+ new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null),
9,
1024
),
@@ -162,7 +162,7 @@ public class MaterializedViewSupervisorTest
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
- new HashBasedNumberedShardSpec(0, 1, null, null),
+ new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null),
9,
1024
)
@@ -175,7 +175,7 @@ public class MaterializedViewSupervisorTest
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
- new HashBasedNumberedShardSpec(0, 1, null, null),
+ new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null),
9,
1024
),
@@ -186,7 +186,7 @@ public class MaterializedViewSupervisorTest
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
- new HashBasedNumberedShardSpec(0, 1, null, null),
+ new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null),
9,
1024
)
@@ -209,7 +209,7 @@ public class MaterializedViewSupervisorTest
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
- new HashBasedNumberedShardSpec(0, 1, null, null),
+ new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null),
9,
1024
)
@@ -225,7 +225,7 @@ public class MaterializedViewSupervisorTest
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
- new HashBasedNumberedShardSpec(0, 1, null, null),
+ new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null),
9,
1024
)
@@ -246,7 +246,7 @@ public class MaterializedViewSupervisorTest
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
- new HashBasedNumberedShardSpec(0, 1, null, null),
+ new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null),
9,
1024
)
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
index 9f20f17..62a13d3 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
@@ -196,6 +196,8 @@ public class DetermineHashedPartitionsJob implements Jobby
new HashBasedNumberedShardSpec(
i,
numberOfShards,
+ i,
+ numberOfShards,
null,
HadoopDruidIndexerConfig.JSON_MAPPER
),
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
index d9e6a14..0b246be 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
@@ -666,7 +666,10 @@ public class DeterminePartitionsJob implements Jobby
currentDimPartitions.dim,
currentDimPartitionStart,
dvc.value,
- currentDimPartitions.partitions.size()
+ currentDimPartitions.partitions.size(),
+ // Set unknown core partitions size so that the legacy way is used for checking partitionHolder
+ // completeness. See SingleDimensionShardSpec.createChunk().
+ SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS
);
log.info(
@@ -706,7 +709,10 @@ public class DeterminePartitionsJob implements Jobby
currentDimPartitions.dim,
previousShardSpec.getStart(),
null,
- previousShardSpec.getPartitionNum()
+ previousShardSpec.getPartitionNum(),
+ // Set unknown core partitions size so that the legacy way is used for checking partitionHolder
+ // completeness. See SingleDimensionShardSpec.createChunk().
+ SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS
);
log.info("Removing possible shard: %s", previousShardSpec);
@@ -719,7 +725,10 @@ public class DeterminePartitionsJob implements Jobby
currentDimPartitions.dim,
currentDimPartitionStart,
null,
- currentDimPartitions.partitions.size()
+ currentDimPartitions.partitions.size(),
+ // Set unknown core partitions size so that the legacy way is used for checking partitionHolder
+ // completeness. See SingleDimensionShardSpec.createChunk().
+ SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS
);
}
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
index a25f274..defb45f 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
@@ -81,6 +81,8 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
new HashBasedNumberedShardSpec(
i,
shardsPerInterval,
+ i,
+ shardsPerInterval,
config.getPartitionsSpec().getPartitionDimensions(),
HadoopDruidIndexerConfig.JSON_MAPPER
),
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
index 6ab20d2..4ac4f68 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
@@ -489,7 +489,7 @@ public class BatchDeltaIngestionTest
INTERVAL_FULL.getStartMillis(),
ImmutableList.of(
new HadoopyShardSpec(
- new HashBasedNumberedShardSpec(0, 1, null, HadoopDruidIndexerConfig.JSON_MAPPER),
+ new HashBasedNumberedShardSpec(0, 1, 0, 1, null, HadoopDruidIndexerConfig.JSON_MAPPER),
0
)
)
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java
index e3e357c..375151e 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java
@@ -63,7 +63,7 @@ public class HadoopDruidIndexerConfigTest
final int partitionCount = 10;
for (int i = 0; i < partitionCount; i++) {
shardSpecs.add(new HadoopyShardSpec(
- new HashBasedNumberedShardSpec(i, partitionCount, null, new DefaultObjectMapper()),
+ new HashBasedNumberedShardSpec(i, partitionCount, i, partitionCount, null, new DefaultObjectMapper()),
i
));
}
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
index a667c75..b804db3 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
@@ -562,6 +562,8 @@ public class IndexGeneratorJobTest
specs.add(new HashBasedNumberedShardSpec(
shardInfo[0],
shardInfo[1],
+ shardInfo[0],
+ shardInfo[1],
null,
HadoopDruidIndexerConfig.JSON_MAPPER
));
@@ -573,7 +575,8 @@ public class IndexGeneratorJobTest
"host",
shardInfo[0],
shardInfo[1],
- partitionNum++
+ partitionNum++,
+ shardInfoForEachShard.length
));
}
} else {
@@ -693,12 +696,12 @@ public class IndexGeneratorJobTest
if (forceExtendableShardSpecs) {
NumberedShardSpec spec = (NumberedShardSpec) dataSegment.getShardSpec();
Assert.assertEquals(i, spec.getPartitionNum());
- Assert.assertEquals(shardInfo.length, spec.getPartitions());
+ Assert.assertEquals(shardInfo.length, spec.getNumCorePartitions());
} else if ("hashed".equals(partitionType)) {
Integer[] hashShardInfo = (Integer[]) shardInfo[i];
HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec();
Assert.assertEquals((int) hashShardInfo[0], spec.getPartitionNum());
- Assert.assertEquals((int) hashShardInfo[1], spec.getPartitions());
+ Assert.assertEquals((int) hashShardInfo[1], spec.getNumCorePartitions());
} else if ("single".equals(partitionType)) {
String[] singleDimensionShardInfo = (String[]) shardInfo[i];
SingleDimensionShardSpec spec = (SingleDimensionShardSpec) dataSegment.getShardSpec();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocator.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocator.java
index 9040c9a..0ad2e99 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocator.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingLocalSegmentAllocator.java
@@ -20,6 +20,8 @@
package org.apache.druid.indexing.common.task;
import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskToolbox;
@@ -27,15 +29,17 @@ import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.SurrogateAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.task.batch.parallel.SupervisorTaskAccess;
+import org.apache.druid.indexing.common.task.batch.partition.CompletePartitionAnalysis;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -46,74 +50,70 @@ import java.util.stream.Collectors;
/**
* Allocates all necessary segments locally at the beginning and reuses them.
*/
-public class CachingLocalSegmentAllocator implements CachingSegmentAllocator
+public class CachingLocalSegmentAllocator implements SegmentAllocatorForBatch
{
- private final String taskId;
- private final Map<String, SegmentIdWithShardSpec> sequenceNameToSegmentId;
- private final ShardSpecs shardSpecs;
+ private final String dataSource;
+ private final Map<String, Pair<Interval, BucketNumberedShardSpec>> sequenceNameToBucket;
+ private final Function<Interval, String> versionFinder;
+ private final NonLinearlyPartitionedSequenceNameFunction sequenceNameFunction;
+ private final boolean isParallel;
- @FunctionalInterface
- interface IntervalToSegmentIdsCreator
- {
- /**
- * @param versionFinder Returns the version for the specified interval
- *
- * @return Information for segment preallocation
- */
- Map<Interval, List<SegmentIdWithShardSpec>> create(
- TaskToolbox toolbox,
- String dataSource,
- Function<Interval, String> versionFinder
- );
- }
+ private final Map<String, SegmentIdWithShardSpec> sequenceNameToSegmentId = new HashMap<>();
+ private final Object2IntMap<Interval> intervalToNextPartitionId = new Object2IntOpenHashMap<>();
CachingLocalSegmentAllocator(
TaskToolbox toolbox,
String dataSource,
String taskId,
- Granularity queryGranularity,
+ GranularitySpec granularitySpec,
@Nullable SupervisorTaskAccess supervisorTaskAccess,
- IntervalToSegmentIdsCreator intervalToSegmentIdsCreator
+ CompletePartitionAnalysis<?, ?> partitionAnalysis
) throws IOException
{
- this.taskId = taskId;
- this.sequenceNameToSegmentId = new HashMap<>();
+ this.dataSource = dataSource;
+ this.sequenceNameToBucket = new HashMap<>();
final TaskAction<List<TaskLock>> action;
if (supervisorTaskAccess == null) {
action = new LockListAction();
+ isParallel = false;
} else {
action = new SurrogateAction<>(supervisorTaskAccess.getSupervisorTaskId(), new LockListAction());
+ isParallel = true;
}
- final Map<Interval, String> intervalToVersion =
- toolbox.getTaskActionClient()
- .submit(action)
- .stream()
- .collect(Collectors.toMap(
- TaskLock::getInterval,
- TaskLock::getVersion
- ));
- Function<Interval, String> versionFinder = interval -> findVersion(intervalToVersion, interval);
-
- final Map<Interval, List<SegmentIdWithShardSpec>> intervalToIds = intervalToSegmentIdsCreator.create(
- toolbox,
- dataSource,
- versionFinder
+ this.versionFinder = createVersionFinder(toolbox, action);
+ final Map<Interval, List<BucketNumberedShardSpec<?>>> intervalToShardSpecs = partitionAnalysis.createBuckets(
+ toolbox
+ );
+
+ sequenceNameFunction = new NonLinearlyPartitionedSequenceNameFunction(
+ taskId,
+ new ShardSpecs(intervalToShardSpecs, granularitySpec.getQueryGranularity())
);
- final Map<Interval, List<ShardSpec>> shardSpecMap = new HashMap<>();
- for (Entry<Interval, List<SegmentIdWithShardSpec>> entry : intervalToIds.entrySet()) {
+ for (Entry<Interval, List<BucketNumberedShardSpec<?>>> entry : intervalToShardSpecs.entrySet()) {
final Interval interval = entry.getKey();
- final List<SegmentIdWithShardSpec> idsPerInterval = intervalToIds.get(interval);
+ final List<BucketNumberedShardSpec<?>> buckets = entry.getValue();
- for (SegmentIdWithShardSpec segmentIdentifier : idsPerInterval) {
- shardSpecMap.computeIfAbsent(interval, k -> new ArrayList<>()).add(segmentIdentifier.getShardSpec());
- // The shardSpecs for partitioning and publishing can be different if isExtendableShardSpecs = true.
- sequenceNameToSegmentId.put(getSequenceName(interval, segmentIdentifier.getShardSpec()), segmentIdentifier);
- }
+ buckets.forEach(bucket -> {
+ sequenceNameToBucket.put(sequenceNameFunction.getSequenceName(interval, bucket), Pair.of(interval, bucket));
+ });
}
- shardSpecs = new ShardSpecs(shardSpecMap, queryGranularity);
+ }
+
+ static Function<Interval, String> createVersionFinder(
+ TaskToolbox toolbox,
+ TaskAction<List<TaskLock>> lockListAction
+ ) throws IOException
+ {
+ final Map<Interval, String> intervalToVersion =
+ toolbox.getTaskActionClient()
+ .submit(lockListAction)
+ .stream()
+ .collect(Collectors.toMap(TaskLock::getInterval, TaskLock::getVersion));
+
+ return interval -> findVersion(intervalToVersion, interval);
}
private static String findVersion(Map<Interval, String> intervalToVersion, Interval interval)
@@ -133,28 +133,36 @@ public class CachingLocalSegmentAllocator implements CachingSegmentAllocator
boolean skipSegmentLineageCheck
)
{
- return Preconditions.checkNotNull(
- sequenceNameToSegmentId.get(sequenceName),
- "Missing segmentId for the sequence[%s]",
- sequenceName
+ return sequenceNameToSegmentId.computeIfAbsent(
+ sequenceName,
+ k -> {
+ final Pair<Interval, BucketNumberedShardSpec> pair = Preconditions.checkNotNull(
+ sequenceNameToBucket.get(sequenceName),
+ "Missing bucket for sequence[%s]",
+ sequenceName
+ );
+ final Interval interval = pair.lhs;
+ // Determines the partitionId if this segment allocator is used by the single-threaded task.
+ // In parallel ingestion, the partitionId is determined in the supervisor task.
+ // See ParallelIndexSupervisorTask.groupGenericPartitionLocationsPerPartition().
+ // This code... isn't pretty, but should be simple enough to understand.
+ final ShardSpec shardSpec = isParallel
+ ? pair.rhs
+ : pair.rhs.convert(
+ intervalToNextPartitionId.computeInt(
+ interval,
+ (i, nextPartitionId) -> nextPartitionId == null ? 0 : nextPartitionId + 1
+ )
+ );
+ final String version = versionFinder.apply(interval);
+ return new SegmentIdWithShardSpec(dataSource, interval, version, shardSpec);
+ }
);
}
- /**
- * Create a sequence name from the given shardSpec and interval.
- *
- * See {@link org.apache.druid.timeline.partition.HashBasedNumberedShardSpec} as an example of partitioning.
- */
- private String getSequenceName(Interval interval, ShardSpec shardSpec)
- {
- // Note: We do not use String format here since this can be called in a tight loop
- // and it's faster to add strings together than it is to use String#format
- return taskId + "_" + interval + "_" + shardSpec.getPartitionNum();
- }
-
@Override
- public ShardSpecs getShardSpecs()
+ public SequenceNameFunction getSequenceNameFunction()
{
- return shardSpecs;
+ return sequenceNameFunction;
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 14d806b..89d6960 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -97,7 +97,6 @@ import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
-import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
@@ -877,35 +876,33 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
final IndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
final long pushTimeout = tuningConfig.getPushTimeout();
- final SegmentAllocator segmentAllocator;
+ final SegmentAllocatorForBatch segmentAllocator;
final SequenceNameFunction sequenceNameFunction;
switch (partitionsSpec.getType()) {
case HASH:
case RANGE:
- final CachingSegmentAllocator localSegmentAllocator = SegmentAllocators.forNonLinearPartitioning(
+ final SegmentAllocatorForBatch localSegmentAllocator = SegmentAllocators.forNonLinearPartitioning(
toolbox,
getDataSource(),
getId(),
- dataSchema.getGranularitySpec().getQueryGranularity(),
+ dataSchema.getGranularitySpec(),
null,
(CompletePartitionAnalysis) partitionAnalysis
);
- sequenceNameFunction = new NonLinearlyPartitionedSequenceNameFunction(
- getId(),
- localSegmentAllocator.getShardSpecs()
- );
+ sequenceNameFunction = localSegmentAllocator.getSequenceNameFunction();
segmentAllocator = localSegmentAllocator;
break;
case LINEAR:
segmentAllocator = SegmentAllocators.forLinearPartitioning(
toolbox,
+ getId(),
null,
dataSchema,
getTaskLockHelper(),
ingestionSchema.getIOConfig().isAppendToExisting(),
partitionAnalysis.getPartitionsSpec()
);
- sequenceNameFunction = new LinearlyPartitionedSequenceNameFunction(getId());
+ sequenceNameFunction = segmentAllocator.getSequenceNameFunction();
break;
default:
throw new UOE("[%s] secondary partition type is not supported", partitionsSpec.getType());
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/LocalSegmentAllocator.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/LocalSegmentAllocator.java
index 2cb4db5..c2488e6 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/LocalSegmentAllocator.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/LocalSegmentAllocator.java
@@ -43,11 +43,12 @@ import java.util.stream.Collectors;
/**
* Segment allocator which allocates new segments locally per request.
*/
-class LocalSegmentAllocator implements SegmentAllocator
+class LocalSegmentAllocator implements SegmentAllocatorForBatch
{
private final SegmentAllocator internalAllocator;
+ private final SequenceNameFunction sequenceNameFunction;
- LocalSegmentAllocator(TaskToolbox toolbox, String dataSource, GranularitySpec granularitySpec) throws IOException
+ LocalSegmentAllocator(TaskToolbox toolbox, String taskId, String dataSource, GranularitySpec granularitySpec) throws IOException
{
final Map<Interval, String> intervalToVersion = toolbox
.getTaskActionClient()
@@ -80,6 +81,7 @@ class LocalSegmentAllocator implements SegmentAllocator
new BuildingNumberedShardSpec(partitionId)
);
};
+ sequenceNameFunction = new LinearlyPartitionedSequenceNameFunction(taskId);
}
@Nullable
@@ -93,4 +95,10 @@ class LocalSegmentAllocator implements SegmentAllocator
{
return internalAllocator.allocate(row, sequenceName, previousSegmentId, skipSegmentLineageCheck);
}
+
+ @Override
+ public SequenceNameFunction getSequenceNameFunction()
+ {
+ return sequenceNameFunction;
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NonLinearlyPartitionedSequenceNameFunction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NonLinearlyPartitionedSequenceNameFunction.java
index c34aa29..44fd520 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NonLinearlyPartitionedSequenceNameFunction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NonLinearlyPartitionedSequenceNameFunction.java
@@ -20,7 +20,7 @@
package org.apache.druid.indexing.common.task;
import org.apache.druid.data.input.InputRow;
-import org.apache.druid.timeline.partition.ShardSpec;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.joda.time.Interval;
/**
@@ -30,7 +30,6 @@ import org.joda.time.Interval;
* Note that all segment IDs should be allocated upfront to use this function.
*
* @see org.apache.druid.indexer.partitions.SecondaryPartitionType
- * @see CachingSegmentAllocator
*/
public class NonLinearlyPartitionedSequenceNameFunction implements SequenceNameFunction
{
@@ -55,10 +54,10 @@ public class NonLinearlyPartitionedSequenceNameFunction implements SequenceNameF
*
* See {@link org.apache.druid.timeline.partition.HashBasedNumberedShardSpec} as an example of partitioning.
*/
- private String getSequenceName(Interval interval, ShardSpec shardSpec)
+ public String getSequenceName(Interval interval, BucketNumberedShardSpec<?> bucket)
{
// Note: We do not use String format here since this can be called in a tight loop
// and it's faster to add strings together than it is to use String#format
- return taskId + "_" + interval + "_" + shardSpec.getPartitionNum();
+ return taskId + "_" + interval + "_" + bucket.getBucketId();
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
index 1598dee..87daaa8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
@@ -31,7 +31,6 @@ import org.apache.druid.indexing.common.task.batch.parallel.SupervisorTaskAccess
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
-import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.partition.NumberedOverwritePartialShardSpec;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
@@ -44,12 +43,14 @@ import java.io.IOException;
/**
* Segment allocator which allocates new segments using the overlord per request.
*/
-public class OverlordCoordinatingSegmentAllocator implements SegmentAllocator
+public class OverlordCoordinatingSegmentAllocator implements SegmentAllocatorForBatch
{
private final ActionBasedSegmentAllocator internalAllocator;
+ private final LinearlyPartitionedSequenceNameFunction sequenceNameFunction;
OverlordCoordinatingSegmentAllocator(
final TaskToolbox toolbox,
+ final String taskId,
final @Nullable SupervisorTaskAccess supervisorTaskAccess,
final DataSchema dataSchema,
final TaskLockHelper taskLockHelper,
@@ -101,6 +102,7 @@ public class OverlordCoordinatingSegmentAllocator implements SegmentAllocator
}
}
);
+ this.sequenceNameFunction = new LinearlyPartitionedSequenceNameFunction(taskId);
}
@Nullable
@@ -146,4 +148,10 @@ public class OverlordCoordinatingSegmentAllocator implements SegmentAllocator
);
}
}
+
+ @Override
+ public SequenceNameFunction getSequenceNameFunction()
+ {
+ return sequenceNameFunction;
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingSegmentAllocator.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SegmentAllocatorForBatch.java
similarity index 57%
rename from indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingSegmentAllocator.java
rename to indexing-service/src/main/java/org/apache/druid/indexing/common/task/SegmentAllocatorForBatch.java
index 176d45e..f2bf503 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CachingSegmentAllocator.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SegmentAllocatorForBatch.java
@@ -22,17 +22,13 @@ package org.apache.druid.indexing.common.task;
import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
/**
- * SegmentAllocator that allocates all necessary segments upfront. This allocator should be used for the hash or range
- * secondary partitioning.
*
- * In the hash or range secondary partitioning, the information about all partition buckets should be known before
- * the task starts to allocate segments. For example, for the hash partitioning, the task should know how many hash
- * buckets it will create, what is the hash value allocated for each bucket, etc. Similar for the range partitioning.
*/
-public interface CachingSegmentAllocator extends SegmentAllocator
+public interface SegmentAllocatorForBatch extends SegmentAllocator
{
/**
- * Returns the {@link org.apache.druid.timeline.partition.ShardSpec}s of all segments allocated upfront.
+ *
+ * @return
*/
- ShardSpecs getShardSpecs();
+ SequenceNameFunction getSequenceNameFunction();
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SegmentAllocators.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SegmentAllocators.java
index de88c68..47df3f2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SegmentAllocators.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SegmentAllocators.java
@@ -23,8 +23,8 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.batch.parallel.SupervisorTaskAccess;
import org.apache.druid.indexing.common.task.batch.partition.CompletePartitionAnalysis;
-import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
import javax.annotation.Nullable;
@@ -36,8 +36,9 @@ public final class SegmentAllocators
* Creates a new {@link SegmentAllocator} for the linear partitioning.
* supervisorTaskAccess can be null if this method is called by the {@link IndexTask}.
*/
- public static SegmentAllocator forLinearPartitioning(
+ public static SegmentAllocatorForBatch forLinearPartitioning(
final TaskToolbox toolbox,
+ final String taskId,
final @Nullable SupervisorTaskAccess supervisorTaskAccess,
final DataSchema dataSchema,
final TaskLockHelper taskLockHelper,
@@ -48,6 +49,7 @@ public final class SegmentAllocators
if (appendToExisting || taskLockHelper.isUseSegmentLock()) {
return new OverlordCoordinatingSegmentAllocator(
toolbox,
+ taskId,
supervisorTaskAccess,
dataSchema,
taskLockHelper,
@@ -58,12 +60,14 @@ public final class SegmentAllocators
if (supervisorTaskAccess == null) {
return new LocalSegmentAllocator(
toolbox,
+ taskId,
dataSchema.getDataSource(),
dataSchema.getGranularitySpec()
);
} else {
return new SupervisorTaskCoordinatingSegmentAllocator(
supervisorTaskAccess.getSupervisorTaskId(),
+ taskId,
supervisorTaskAccess.getTaskClient()
);
}
@@ -74,11 +78,11 @@ public final class SegmentAllocators
* Creates a new {@link SegmentAllocator} for the hash and range partitioning.
* supervisorTaskAccess can be null if this method is called by the {@link IndexTask}.
*/
- public static CachingSegmentAllocator forNonLinearPartitioning(
+ public static SegmentAllocatorForBatch forNonLinearPartitioning(
final TaskToolbox toolbox,
final String dataSource,
final String taskId,
- final Granularity queryGranularity,
+ final GranularitySpec granularitySpec,
final @Nullable SupervisorTaskAccess supervisorTaskAccess,
final CompletePartitionAnalysis partitionAnalysis
) throws IOException
@@ -87,9 +91,9 @@ public final class SegmentAllocators
toolbox,
dataSource,
taskId,
- queryGranularity,
+ granularitySpec,
supervisorTaskAccess,
- partitionAnalysis::convertToIntervalToSegmentIds
+ partitionAnalysis
);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ShardSpecs.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ShardSpecs.java
index 42f7ce1..3db4beb 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ShardSpecs.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ShardSpecs.java
@@ -22,7 +22,7 @@ package org.apache.druid.indexing.common.task;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
-import org.apache.druid.timeline.partition.ShardSpec;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.joda.time.Interval;
import java.util.List;
@@ -33,10 +33,10 @@ import java.util.Map;
*/
public class ShardSpecs
{
- private final Map<Interval, List<ShardSpec>> map;
- private Granularity queryGranularity;
+ private final Map<Interval, List<BucketNumberedShardSpec<?>>> map;
+ private final Granularity queryGranularity;
- ShardSpecs(final Map<Interval, List<ShardSpec>> map, Granularity queryGranularity)
+ ShardSpecs(final Map<Interval, List<BucketNumberedShardSpec<?>>> map, Granularity queryGranularity)
{
this.map = map;
this.queryGranularity = queryGranularity;
@@ -50,13 +50,13 @@ public class ShardSpecs
*
* @return a shardSpec
*/
- ShardSpec getShardSpec(Interval interval, InputRow row)
+ BucketNumberedShardSpec<?> getShardSpec(Interval interval, InputRow row)
{
- final List<ShardSpec> shardSpecs = map.get(interval);
+ final List<BucketNumberedShardSpec<?>> shardSpecs = map.get(interval);
if (shardSpecs == null || shardSpecs.isEmpty()) {
throw new ISE("Failed to get shardSpec for interval[%s]", interval);
}
final long truncatedTimestamp = queryGranularity.bucketStart(row.getTimestamp()).getMillis();
- return shardSpecs.get(0).getLookup(shardSpecs).getShardSpec(truncatedTimestamp, row);
+ return (BucketNumberedShardSpec<?>) shardSpecs.get(0).getLookup(shardSpecs).getShardSpec(truncatedTimestamp, row);
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SupervisorTaskCoordinatingSegmentAllocator.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SupervisorTaskCoordinatingSegmentAllocator.java
index 7fde4b8..d0956aa 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SupervisorTaskCoordinatingSegmentAllocator.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SupervisorTaskCoordinatingSegmentAllocator.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.common.task;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
-import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import java.io.IOException;
@@ -29,18 +28,21 @@ import java.io.IOException;
/**
* Segment allocator that allocates new segments using the supervisor task per request.
*/
-public class SupervisorTaskCoordinatingSegmentAllocator implements SegmentAllocator
+public class SupervisorTaskCoordinatingSegmentAllocator implements SegmentAllocatorForBatch
{
private final String supervisorTaskId;
private final ParallelIndexSupervisorTaskClient taskClient;
+ private final SequenceNameFunction sequenceNameFunction;
SupervisorTaskCoordinatingSegmentAllocator(
String supervisorTaskId,
+ String taskId,
ParallelIndexSupervisorTaskClient taskClient
)
{
this.supervisorTaskId = supervisorTaskId;
this.taskClient = taskClient;
+ this.sequenceNameFunction = new LinearlyPartitionedSequenceNameFunction(taskId);
}
@Override
@@ -53,4 +55,10 @@ public class SupervisorTaskCoordinatingSegmentAllocator implements SegmentAlloca
{
return taskClient.allocateSegment(supervisorTaskId, row.getTimestamp());
}
+
+ @Override
+ public SequenceNameFunction getSequenceNameFunction()
+ {
+ return sequenceNameFunction;
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 4f18c81..20a7da6 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -31,7 +31,6 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervi
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmentMergeTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask;
-import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentMergeTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialRangeSegmentGenerateTask;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTask;
import org.apache.druid.query.Query;
@@ -63,7 +62,6 @@ import java.util.Map;
// for backward compatibility
@Type(name = SinglePhaseSubTask.OLD_TYPE_NAME, value = LegacySinglePhaseSubTask.class),
@Type(name = PartialHashSegmentGenerateTask.TYPE, value = PartialHashSegmentGenerateTask.class),
- @Type(name = PartialHashSegmentMergeTask.TYPE, value = PartialHashSegmentMergeTask.class),
@Type(name = PartialRangeSegmentGenerateTask.TYPE, value = PartialRangeSegmentGenerateTask.class),
@Type(name = PartialDimensionDistributionTask.TYPE, value = PartialDimensionDistributionTask.class),
@Type(name = PartialGenericSegmentMergeTask.TYPE, value = PartialGenericSegmentMergeTask.class),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedHashPartitionsReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedHashPartitionsReport.java
deleted file mode 100644
index 85574b7..0000000
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedHashPartitionsReport.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.batch.parallel;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/**
- * Report containing the {@link HashPartitionStat}s created by a {@link PartialHashSegmentGenerateTask}.
- * This report is collected by {@link ParallelIndexSupervisorTask} and
- * used to generate {@link PartialHashSegmentMergeIOConfig}.
- */
-class GeneratedHashPartitionsReport extends GeneratedPartitionsReport<HashPartitionStat> implements SubTaskReport
-{
- public static final String TYPE = "generated_partitions";
-
- @JsonCreator
- GeneratedHashPartitionsReport(
- @JsonProperty("taskId") String taskId,
- @JsonProperty("partitionStats") List<HashPartitionStat> partitionStats
- )
- {
- super(taskId, partitionStats);
- }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocation.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocation.java
index bbfd1e2..74c4c17 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocation.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocation.java
@@ -22,14 +22,14 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.timeline.partition.ShardSpec;
+import org.apache.druid.timeline.partition.BuildingShardSpec;
import org.joda.time.Interval;
/**
* This class represents the intermediary data server where the partition of {@code interval} and {@code shardSpec}
* is stored.
*/
-public class GenericPartitionLocation extends PartitionLocation<ShardSpec>
+public class GenericPartitionLocation extends PartitionLocation<BuildingShardSpec>
{
@JsonCreator
public GenericPartitionLocation(
@@ -38,7 +38,7 @@ public class GenericPartitionLocation extends PartitionLocation<ShardSpec>
@JsonProperty("useHttps") boolean useHttps,
@JsonProperty("subTaskId") String subTaskId,
@JsonProperty("interval") Interval interval,
- @JsonProperty("shardSpec") ShardSpec shardSpec
+ @JsonProperty("shardSpec") BuildingShardSpec shardSpec
)
{
super(host, port, useHttps, subTaskId, interval, shardSpec);
@@ -46,13 +46,13 @@ public class GenericPartitionLocation extends PartitionLocation<ShardSpec>
@JsonIgnore
@Override
- public int getPartitionId()
+ public int getBucketId()
{
- return getSecondaryPartition().getPartitionNum();
+ return getSecondaryPartition().getBucketId();
}
@JsonProperty
- ShardSpec getShardSpec()
+ BuildingShardSpec getShardSpec()
{
return getSecondaryPartition();
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStat.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStat.java
index 5f4d16d..a4ac80b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStat.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStat.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
@@ -33,12 +34,12 @@ import java.util.Objects;
* partition key). The {@link ShardSpec} is later used by {@link PartialGenericSegmentMergeTask} to merge the partial
* segments.
*/
-public class GenericPartitionStat extends PartitionStat<ShardSpec>
+public class GenericPartitionStat extends PartitionStat<BucketNumberedShardSpec>
{
private static final String PROP_SHARD_SPEC = "shardSpec";
// Secondary partition key
- private final ShardSpec shardSpec;
+ private final BucketNumberedShardSpec shardSpec;
@JsonCreator
public GenericPartitionStat(
@@ -46,7 +47,7 @@ public class GenericPartitionStat extends PartitionStat<ShardSpec>
@JsonProperty("taskExecutorPort") int taskExecutorPort,
@JsonProperty("useHttps") boolean useHttps,
@JsonProperty("interval") Interval interval,
- @JsonProperty(PROP_SHARD_SPEC) ShardSpec shardSpec,
+ @JsonProperty(PROP_SHARD_SPEC) BucketNumberedShardSpec shardSpec,
@JsonProperty("numRows") @Nullable Integer numRows,
@JsonProperty("sizeBytes") @Nullable Long sizeBytes
)
@@ -56,14 +57,14 @@ public class GenericPartitionStat extends PartitionStat<ShardSpec>
}
@Override
- public int getPartitionId()
+ public int getBucketId()
{
- return shardSpec.getPartitionNum();
+ return shardSpec.getBucketId();
}
@JsonProperty(PROP_SHARD_SPEC)
@Override
- ShardSpec getSecondaryPartition()
+ BucketNumberedShardSpec getSecondaryPartition()
{
return shardSpec;
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionLocation.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionLocation.java
deleted file mode 100644
index 604eb7a..0000000
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionLocation.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.batch.parallel;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.joda.time.Interval;
-
-/**
- * This class represents the intermediary data server where the partition of {@code interval} and {@code partitionId}
- * is stored.
- */
-public class HashPartitionLocation extends PartitionLocation<Integer>
-{
- @JsonCreator
- public HashPartitionLocation(
- @JsonProperty("host") String host,
- @JsonProperty("port") int port,
- @JsonProperty("useHttps") boolean useHttps,
- @JsonProperty("subTaskId") String subTaskId,
- @JsonProperty("interval") Interval interval,
- @JsonProperty("partitionId") int partitionId
- )
- {
- super(host, port, useHttps, subTaskId, interval, partitionId);
- }
-
- @JsonProperty
- @Override
- public int getPartitionId()
- {
- return getSecondaryPartition();
- }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionStat.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionStat.java
deleted file mode 100644
index 21019ab..0000000
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionStat.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.batch.parallel;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.joda.time.Interval;
-
-import javax.annotation.Nullable;
-import java.util.Objects;
-
-/**
- * Statistics about a partition created by {@link PartialHashSegmentGenerateTask}. Each partition is a set of data
- * of the same time chunk (primary partition key) and the same partitionId (secondary partition key). This class
- * holds the statistics of a single partition created by a task.
- */
-public class HashPartitionStat extends PartitionStat<Integer>
-{
- // Secondary partition key
- private final int partitionId;
-
- @JsonCreator
- public HashPartitionStat(
- @JsonProperty("taskExecutorHost") String taskExecutorHost,
- @JsonProperty("taskExecutorPort") int taskExecutorPort,
- @JsonProperty("useHttps") boolean useHttps,
- @JsonProperty("interval") Interval interval,
- @JsonProperty("partitionId") int partitionId,
- @JsonProperty("numRows") @Nullable Integer numRows,
- @JsonProperty("sizeBytes") @Nullable Long sizeBytes
- )
- {
- super(taskExecutorHost, taskExecutorPort, useHttps, interval, numRows, sizeBytes);
- this.partitionId = partitionId;
- }
-
- @JsonProperty
- @Override
- public int getPartitionId()
- {
- return partitionId;
- }
-
- @JsonIgnore
- @Override
- Integer getSecondaryPartition()
- {
- return partitionId;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- HashPartitionStat that = (HashPartitionStat) o;
- return partitionId == that.partitionId;
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(super.hashCode(), partitionId);
- }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index cb1bc39..bed85de 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -29,6 +29,8 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputFormat;
@@ -79,6 +81,7 @@ import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BuildingNumberedShardSpec;
+import org.apache.druid.timeline.partition.BuildingShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionBoundaries;
import org.apache.druid.utils.CollectionUtils;
@@ -337,24 +340,6 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
}
@VisibleForTesting
- PartialHashSegmentMergeParallelIndexTaskRunner createPartialHashSegmentMergeRunner(
- TaskToolbox toolbox,
- List<PartialHashSegmentMergeIOConfig> ioConfigs
- )
- {
- return new PartialHashSegmentMergeParallelIndexTaskRunner(
- toolbox,
- getId(),
- getGroupId(),
- getIngestionSchema().getDataSchema(),
- ioConfigs,
- getIngestionSchema().getTuningConfig(),
- getContext(),
- indexingServiceClient
- );
- }
-
- @VisibleForTesting
PartialGenericSegmentMergeParallelIndexTaskRunner createPartialGenericSegmentMergeRunner(
TaskToolbox toolbox,
List<PartialGenericSegmentMergeIOConfig> ioConfigs
@@ -544,10 +529,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
{
// 1. Partial segment generation phase
- ParallelIndexTaskRunner<PartialHashSegmentGenerateTask, GeneratedHashPartitionsReport> indexingRunner = createRunner(
- toolbox,
- this::createPartialHashSegmentGenerateRunner
- );
+ ParallelIndexTaskRunner<PartialHashSegmentGenerateTask, GeneratedPartitionsReport<GenericPartitionStat>> indexingRunner
+ = createRunner(toolbox, this::createPartialHashSegmentGenerateRunner);
TaskState state = runNextPhase(indexingRunner);
if (state.isFailure()) {
@@ -557,16 +540,16 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
// 2. Partial segment merge phase
// partition (interval, partitionId) -> partition locations
- Map<Pair<Interval, Integer>, List<HashPartitionLocation>> partitionToLocations =
- groupHashPartitionLocationsPerPartition(indexingRunner.getReports());
- final List<PartialHashSegmentMergeIOConfig> ioConfigs = createHashMergeIOConfigs(
+ Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> partitionToLocations =
+ groupGenericPartitionLocationsPerPartition(indexingRunner.getReports());
+ final List<PartialGenericSegmentMergeIOConfig> ioConfigs = createGenericMergeIOConfigs(
ingestionSchema.getTuningConfig().getTotalNumMergeTasks(),
partitionToLocations
);
- final ParallelIndexTaskRunner<PartialHashSegmentMergeTask, PushedSegmentsReport> mergeRunner = createRunner(
+ final ParallelIndexTaskRunner<PartialGenericSegmentMergeTask, PushedSegmentsReport> mergeRunner = createRunner(
toolbox,
- tb -> createPartialHashSegmentMergeRunner(tb, ioConfigs)
+ tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs)
);
state = runNextPhase(mergeRunner);
if (state.isSuccess()) {
@@ -659,38 +642,35 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
return partitions;
}
- private static Map<Pair<Interval, Integer>, List<HashPartitionLocation>> groupHashPartitionLocationsPerPartition(
- Map<String, GeneratedHashPartitionsReport> subTaskIdToReport
- )
- {
- BiFunction<String, HashPartitionStat, HashPartitionLocation> createPartitionLocationFunction =
- (subtaskId, partitionStat) ->
- new HashPartitionLocation(
- partitionStat.getTaskExecutorHost(),
- partitionStat.getTaskExecutorPort(),
- partitionStat.isUseHttps(),
- subtaskId,
- partitionStat.getInterval(),
- partitionStat.getSecondaryPartition()
- );
-
- return groupPartitionLocationsPerPartition(subTaskIdToReport, createPartitionLocationFunction);
- }
-
private static Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> groupGenericPartitionLocationsPerPartition(
Map<String, GeneratedPartitionsReport<GenericPartitionStat>> subTaskIdToReport
)
{
- BiFunction<String, GenericPartitionStat, GenericPartitionLocation> createPartitionLocationFunction =
- (subtaskId, partitionStat) ->
- new GenericPartitionLocation(
- partitionStat.getTaskExecutorHost(),
- partitionStat.getTaskExecutorPort(),
- partitionStat.isUseHttps(),
- subtaskId,
- partitionStat.getInterval(),
- partitionStat.getSecondaryPartition()
- );
+ final Map<Pair<Interval, Integer>, BuildingShardSpec<?>> intervalAndIntegerToShardSpec = new HashMap<>();
+ final Object2IntMap<Interval> intervalToNextPartitionId = new Object2IntOpenHashMap<>();
+ final BiFunction<String, GenericPartitionStat, GenericPartitionLocation> createPartitionLocationFunction =
+ (subtaskId, partitionStat) -> {
+ final BuildingShardSpec<?> shardSpec = intervalAndIntegerToShardSpec.computeIfAbsent(
+ Pair.of(partitionStat.getInterval(), partitionStat.getBucketId()),
+ key -> {
+ // Lazily determine the partitionId to create packed partitionIds for the core partitions.
+ // See the Javadoc of BucketNumberedShardSpec for details.
+ final int partitionId = intervalToNextPartitionId.computeInt(
+ partitionStat.getInterval(),
+ ((interval, nextPartitionId) -> nextPartitionId == null ? 0 : nextPartitionId + 1)
+ );
+ return partitionStat.getSecondaryPartition().convert(partitionId);
+ }
+ );
+ return new GenericPartitionLocation(
+ partitionStat.getTaskExecutorHost(),
+ partitionStat.getTaskExecutorPort(),
+ partitionStat.isUseHttps(),
+ subtaskId,
+ partitionStat.getInterval(),
+ shardSpec
+ );
+ };
return groupPartitionLocationsPerPartition(subTaskIdToReport, createPartitionLocationFunction);
}
@@ -708,7 +688,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
final GeneratedPartitionsReport<S> report = entry.getValue();
for (S partitionStat : report.getPartitionStats()) {
final List<L> locationsOfSamePartition = partitionToLocations.computeIfAbsent(
- Pair.of(partitionStat.getInterval(), partitionStat.getPartitionId()),
+ Pair.of(partitionStat.getInterval(), partitionStat.getBucketId()),
k -> new ArrayList<>()
);
locationsOfSamePartition.add(createPartitionLocationFunction.apply(subTaskId, partitionStat));
@@ -718,18 +698,6 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
return partitionToLocations;
}
- private static List<PartialHashSegmentMergeIOConfig> createHashMergeIOConfigs(
- int totalNumMergeTasks,
- Map<Pair<Interval, Integer>, List<HashPartitionLocation>> partitionToLocations
- )
- {
- return createMergeIOConfigs(
- totalNumMergeTasks,
- partitionToLocations,
- PartialHashSegmentMergeIOConfig::new
- );
- }
-
private static List<PartialGenericSegmentMergeIOConfig> createGenericMergeIOConfigs(
int totalNumMergeTasks,
Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> partitionToLocations
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTaskRunner.java
index 49b2b48..05103e8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTaskRunner.java
@@ -38,8 +38,9 @@ import java.util.Set;
* uses {@link SinglePhaseParallelIndexTaskRunner} for it.
*
* For perfect rollup, parallel indexing is executed in multiple phases. The supervisor task currently uses
- * {@link PartialHashSegmentGenerateParallelIndexTaskRunner} and {@link PartialHashSegmentMergeParallelIndexTaskRunner},
- * and can use more runners in the future.
+ * {@link PartialHashSegmentGenerateParallelIndexTaskRunner}, {@link PartialRangeSegmentGenerateParallelIndexTaskRunner},
+ * and {@link PartialGenericSegmentMergeParallelIndexTaskRunner}.
+ * More runners can be added in the future.
*/
public interface ParallelIndexTaskRunner<SubTaskType extends Task, SubTaskReportType extends SubTaskReport>
{
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
index 73e2ac7..935eeb4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
@@ -57,14 +57,14 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
/**
* Max number of segments to merge at the same time.
- * Used only by {@link PartialHashSegmentMergeTask}.
+ * Used only by {@link PartialGenericSegmentMergeTask}.
* This configuration was temporarily added to avoid using too much memory while merging segments,
* and will be removed once {@link org.apache.druid.segment.IndexMerger} is improved to not use much memory.
*/
private final int maxNumSegmentsToMerge;
/**
- * Total number of tasks for partial segment merge (that is, number of {@link PartialHashSegmentMergeTask}s).
+ * Total number of tasks for partial segment merge (that is, number of {@link PartialGenericSegmentMergeTask}s).
* Used only when this task runs with shuffle.
*/
private final int totalNumMergeTasks;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java
index 858eff4..fed80d9 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java
@@ -29,6 +29,8 @@ import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.timeline.partition.BuildingShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
@@ -39,12 +41,12 @@ import java.util.Map;
/**
* {@link ParallelIndexTaskRunner} for the phase to merge generic partitioned segments in multi-phase parallel indexing.
*/
-public class PartialGenericSegmentMergeTask extends PartialSegmentMergeTask<ShardSpec, GenericPartitionLocation>
+public class PartialGenericSegmentMergeTask extends PartialSegmentMergeTask<BuildingShardSpec, GenericPartitionLocation>
{
public static final String TYPE = "partial_index_generic_merge";
private final PartialGenericSegmentMergeIngestionSpec ingestionSchema;
- private final Table<Interval, Integer, ShardSpec> intervalAndIntegerToShardSpec;
+ private final Table<Interval, Integer, BuildingShardSpec<?>> intervalAndIntegerToShardSpec;
@JsonCreator
public PartialGenericSegmentMergeTask(
@@ -82,24 +84,28 @@ public class PartialGenericSegmentMergeTask extends PartialSegmentMergeTask<Shar
);
}
- private static Table<Interval, Integer, ShardSpec> createIntervalAndIntegerToShardSpec(
+ private static Table<Interval, Integer, BuildingShardSpec<?>> createIntervalAndIntegerToShardSpec(
List<GenericPartitionLocation> partitionLocations
)
{
- Table<Interval, Integer, ShardSpec> intervalAndIntegerToShardSpec = HashBasedTable.create();
+ final Table<Interval, Integer, BuildingShardSpec<?>> intervalAndIntegerToShardSpec = HashBasedTable.create();
partitionLocations.forEach(
p -> {
- ShardSpec currShardSpec = intervalAndIntegerToShardSpec.get(p.getInterval(), p.getPartitionId());
- Preconditions.checkArgument(
- currShardSpec == null || p.getShardSpec().equals(currShardSpec),
- "interval %s, partitionId %s mismatched shard specs: %s",
- p.getInterval(),
- p.getPartitionId(),
- partitionLocations
- );
-
- intervalAndIntegerToShardSpec.put(p.getInterval(), p.getPartitionId(), p.getShardSpec());
+ final ShardSpec currShardSpec = intervalAndIntegerToShardSpec.get(p.getInterval(), p.getBucketId());
+ if (currShardSpec == null) {
+ intervalAndIntegerToShardSpec.put(p.getInterval(), p.getBucketId(), p.getShardSpec());
+ } else {
+ if (!p.getShardSpec().equals(currShardSpec)) {
+ throw new ISE(
+ "interval %s, bucketId %s mismatched shard specs: %s and %s",
+ p.getInterval(),
+ p.getBucketId(),
+ currShardSpec,
+ p.getShardSpec()
+ );
+ }
+ }
}
);
@@ -119,7 +125,7 @@ public class PartialGenericSegmentMergeTask extends PartialSegmentMergeTask<Shar
}
@Override
- ShardSpec createShardSpec(TaskToolbox toolbox, Interval interval, int partitionId)
+ BuildingShardSpec<?> createShardSpec(TaskToolbox toolbox, Interval interval, int partitionId)
{
return Preconditions.checkNotNull(
intervalAndIntegerToShardSpec.get(interval, partitionId),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java
index ef8869c..e067eb9 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java
@@ -27,11 +27,9 @@ import java.util.Map;
/**
* {@link ParallelIndexTaskRunner} for the phase to create hash partitioned segments in multi-phase parallel indexing.
- *
- * @see PartialHashSegmentMergeParallelIndexTaskRunner
*/
class PartialHashSegmentGenerateParallelIndexTaskRunner
- extends InputSourceSplitParallelIndexTaskRunner<PartialHashSegmentGenerateTask, GeneratedHashPartitionsReport>
+ extends InputSourceSplitParallelIndexTaskRunner<PartialHashSegmentGenerateTask, GeneratedPartitionsReport<GenericPartitionStat>>
{
private static final String PHASE_NAME = "partial segment generation";
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
index bf33fae..1bfda30 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
@@ -26,8 +26,8 @@ import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.common.task.CachingSegmentAllocator;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
+import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SegmentAllocators;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
@@ -35,6 +35,7 @@ import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalys
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.joda.time.Interval;
@@ -51,7 +52,7 @@ import java.util.stream.Collectors;
* hashing the segment granularity and partition dimensions in {@link HashedPartitionsSpec}. Partitioned segments are
* stored in local storage using {@link org.apache.druid.indexing.worker.ShuffleDataSegmentPusher}.
*/
-public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<GeneratedHashPartitionsReport>
+public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<GeneratedPartitionsMetadataReport>
{
public static final String TYPE = "partial_index_generate";
private static final String PROP_SPEC = "spec";
@@ -127,7 +128,7 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<G
}
@Override
- CachingSegmentAllocator createSegmentAllocator(TaskToolbox toolbox, ParallelIndexSupervisorTaskClient taskClient)
+ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelIndexSupervisorTaskClient taskClient)
throws IOException
{
final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
@@ -137,29 +138,29 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<G
toolbox,
getDataSource(),
getId(),
- granularitySpec.getQueryGranularity(),
+ granularitySpec,
new SupervisorTaskAccess(supervisorTaskId, taskClient),
createHashPartitionAnalysisFromPartitionsSpec(granularitySpec, partitionsSpec)
);
}
@Override
- GeneratedHashPartitionsReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments)
+ GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments)
{
- List<HashPartitionStat> partitionStats = segments.stream()
- .map(segment -> createPartitionStat(toolbox, segment))
- .collect(Collectors.toList());
- return new GeneratedHashPartitionsReport(getId(), partitionStats);
+ List<GenericPartitionStat> partitionStats = segments.stream()
+ .map(segment -> createPartitionStat(toolbox, segment))
+ .collect(Collectors.toList());
+ return new GeneratedPartitionsMetadataReport(getId(), partitionStats);
}
- private HashPartitionStat createPartitionStat(TaskToolbox toolbox, DataSegment segment)
+ private GenericPartitionStat createPartitionStat(TaskToolbox toolbox, DataSegment segment)
{
- return new HashPartitionStat(
+ return new GenericPartitionStat(
toolbox.getTaskExecutorNode().getHost(),
toolbox.getTaskExecutorNode().getPortToUse(),
toolbox.getTaskExecutorNode().isEnableTlsPort(),
segment.getInterval(),
- segment.getShardSpec().getPartitionNum(),
+ (BucketNumberedShardSpec) segment.getShardSpec(),
null, // numRows is not supported yet
null // sizeBytes is not supported yet
);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeIOConfig.java
deleted file mode 100644
index 2bc00ce..0000000
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeIOConfig.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.batch.parallel;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.druid.segment.indexing.IOConfig;
-
-import java.util.List;
-
-@JsonTypeName(PartialHashSegmentMergeTask.TYPE)
-class PartialHashSegmentMergeIOConfig extends PartialSegmentMergeIOConfig<HashPartitionLocation>
- implements IOConfig
-{
- @JsonCreator
- PartialHashSegmentMergeIOConfig(
- @JsonProperty("partitionLocations") List<HashPartitionLocation> partitionLocations
- )
- {
- super(partitionLocations);
- }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeIngestionSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeIngestionSpec.java
deleted file mode 100644
index abfef76..0000000
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeIngestionSpec.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.batch.parallel;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.segment.indexing.DataSchema;
-
-class PartialHashSegmentMergeIngestionSpec extends PartialSegmentMergeIngestionSpec<PartialHashSegmentMergeIOConfig>
-{
- @JsonCreator
- PartialHashSegmentMergeIngestionSpec(
- @JsonProperty("dataSchema") DataSchema dataSchema,
- @JsonProperty("ioConfig") PartialHashSegmentMergeIOConfig ioConfig,
- @JsonProperty("tuningConfig") ParallelIndexTuningConfig tuningConfig
- )
- {
- super(dataSchema, ioConfig, tuningConfig);
- }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeParallelIndexTaskRunner.java
deleted file mode 100644
index c693513..0000000
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeParallelIndexTaskRunner.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.batch.parallel;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.druid.client.indexing.IndexingServiceClient;
-import org.apache.druid.data.input.InputSplit;
-import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.segment.indexing.DataSchema;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * {@link ParallelIndexTaskRunner} for the phase to merge hash partitioned segments in multi-phase parallel indexing.
- *
- * @see PartialHashSegmentGenerateParallelIndexTaskRunner
- */
-class PartialHashSegmentMergeParallelIndexTaskRunner
- extends ParallelIndexPhaseRunner<PartialHashSegmentMergeTask, PushedSegmentsReport>
-{
- private static final String PHASE_NAME = "partial segment merge";
-
- private final DataSchema dataSchema;
- private final List<PartialHashSegmentMergeIOConfig> mergeIOConfigs;
-
- PartialHashSegmentMergeParallelIndexTaskRunner(
- TaskToolbox toolbox,
- String taskId,
- String groupId,
- DataSchema dataSchema,
- List<PartialHashSegmentMergeIOConfig> mergeIOConfigs,
- ParallelIndexTuningConfig tuningConfig,
- Map<String, Object> context,
- IndexingServiceClient indexingServiceClient
- )
- {
- super(toolbox, taskId, groupId, tuningConfig, context, indexingServiceClient);
-
- this.dataSchema = dataSchema;
- this.mergeIOConfigs = mergeIOConfigs;
- }
-
- @Override
- public String getName()
- {
- return PHASE_NAME;
- }
-
- @Override
- Iterator<SubTaskSpec<PartialHashSegmentMergeTask>> subTaskSpecIterator()
- {
- return mergeIOConfigs.stream().map(this::newTaskSpec).iterator();
- }
-
- @Override
- int estimateTotalNumSubTasks()
- {
- return mergeIOConfigs.size();
- }
-
- @VisibleForTesting
- SubTaskSpec<PartialHashSegmentMergeTask> newTaskSpec(PartialHashSegmentMergeIOConfig ioConfig)
- {
- final PartialHashSegmentMergeIngestionSpec ingestionSpec =
- new PartialHashSegmentMergeIngestionSpec(
- dataSchema,
- ioConfig,
- getTuningConfig()
- );
- return new SubTaskSpec<PartialHashSegmentMergeTask>(
- getTaskId() + "_" + getAndIncrementNextSpecId(),
- getGroupId(),
- getTaskId(),
- getContext(),
- new InputSplit<>(ioConfig.getPartitionLocations())
- )
- {
- @Override
- public PartialHashSegmentMergeTask newSubTask(int numAttempts)
- {
- return new PartialHashSegmentMergeTask(
- null,
- getGroupId(),
- null,
- getSupervisorTaskId(),
- numAttempts,
- ingestionSpec,
- getContext(),
- null,
- null,
- null
- );
- }
- };
- }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeTask.java
deleted file mode 100644
index cf5aaea..0000000
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeTask.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.batch.parallel;
-
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
-import org.apache.druid.client.indexing.IndexingServiceClient;
-import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
-import org.apache.druid.indexer.partitions.PartitionsSpec;
-import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
-import org.apache.druid.indexing.common.task.TaskResource;
-import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
-import org.joda.time.Interval;
-
-import javax.annotation.Nullable;
-import java.util.Map;
-
-/**
- * The worker task of {@link PartialHashSegmentMergeParallelIndexTaskRunner}. This task reads partitioned segments
- * created by {@link PartialHashSegmentGenerateTask}s, merges them, and pushes to deep storage. The pushed segments are
- * reported to {@link PartialHashSegmentMergeParallelIndexTaskRunner}.
- */
-
-public class PartialHashSegmentMergeTask
- extends PartialSegmentMergeTask<HashBasedNumberedShardSpec, HashPartitionLocation>
-{
- public static final String TYPE = "partial_index_merge";
-
- private final HashedPartitionsSpec partitionsSpec;
- private final PartialHashSegmentMergeIngestionSpec ingestionSchema;
-
- @JsonCreator
- public PartialHashSegmentMergeTask(
- // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
- @JsonProperty("id") @Nullable String id,
- @JsonProperty("groupId") final String groupId,
- @JsonProperty("resource") final TaskResource taskResource,
- @JsonProperty("supervisorTaskId") final String supervisorTaskId,
- @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
- @JsonProperty("spec") final PartialHashSegmentMergeIngestionSpec ingestionSchema,
- @JsonProperty("context") final Map<String, Object> context,
- @JacksonInject IndexingServiceClient indexingServiceClient,
- @JacksonInject IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> taskClientFactory,
- @JacksonInject ShuffleClient shuffleClient
- )
- {
- super(
- getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
- groupId,
- taskResource,
- supervisorTaskId,
- ingestionSchema.getDataSchema(),
- ingestionSchema.getIOConfig(),
- ingestionSchema.getTuningConfig(),
- numAttempts,
- context,
- indexingServiceClient,
- taskClientFactory,
- shuffleClient
- );
-
- this.ingestionSchema = ingestionSchema;
-
- PartitionsSpec inputPartitionsSpec = ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec();
- Preconditions.checkArgument(inputPartitionsSpec instanceof HashedPartitionsSpec, "hashed partitionsSpec required");
- partitionsSpec = (HashedPartitionsSpec) inputPartitionsSpec;
- Preconditions.checkNotNull(partitionsSpec.getNumShards(), "hashed partitionsSpec numShards required");
- }
-
- @JsonProperty("spec")
- private PartialHashSegmentMergeIngestionSpec getIngestionSchema()
- {
- return ingestionSchema;
- }
-
- @Override
- public String getType()
- {
- return TYPE;
- }
-
- @Override
- HashBasedNumberedShardSpec createShardSpec(TaskToolbox toolbox, Interval interval, int partitionId)
- {
- return new HashBasedNumberedShardSpec(
- partitionId,
- Preconditions.checkNotNull(partitionsSpec.getNumShards(), "numShards"),
- partitionsSpec.getPartitionDimensions(),
- toolbox.getJsonMapper()
- );
- }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateParallelIndexTaskRunner.java
index e0f9461..39a7e65 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateParallelIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateParallelIndexTaskRunner.java
@@ -29,8 +29,6 @@ import java.util.Map;
/**
* {@link ParallelIndexTaskRunner} for the phase to create range partitioned segments in multi-phase parallel indexing.
- *
- * @see PartialHashSegmentMergeParallelIndexTaskRunner
*/
class PartialRangeSegmentGenerateParallelIndexTaskRunner
extends InputSourceSplitParallelIndexTaskRunner<PartialRangeSegmentGenerateTask, GeneratedPartitionsReport<GenericPartitionStat>>
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
index 60bbb7a..949c374 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
@@ -28,8 +28,8 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.common.task.CachingSegmentAllocator;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
+import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SegmentAllocators;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder;
@@ -37,6 +37,7 @@ import org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnaly
import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionBoundaries;
import org.joda.time.Interval;
@@ -150,7 +151,7 @@ public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask<
}
@Override
- CachingSegmentAllocator createSegmentAllocator(TaskToolbox toolbox, ParallelIndexSupervisorTaskClient taskClient)
+ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelIndexSupervisorTaskClient taskClient)
throws IOException
{
final RangePartitionAnalysis partitionAnalysis = new RangePartitionAnalysis(
@@ -161,7 +162,7 @@ public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask<
toolbox,
getDataSource(),
getId(),
- ingestionSchema.getDataSchema().getGranularitySpec().getQueryGranularity(),
+ ingestionSchema.getDataSchema().getGranularitySpec(),
new SupervisorTaskAccess(supervisorTaskId, taskClient),
partitionAnalysis
);
@@ -183,7 +184,7 @@ public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask<
toolbox.getTaskExecutorNode().getPortToUse(),
toolbox.getTaskExecutorNode().isEnableTlsPort(),
segment.getInterval(),
- segment.getShardSpec(),
+ (BucketNumberedShardSpec) segment.getShardSpec(),
null, // numRows is not supported yet
null // sizeBytes is not supported yet
);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index 65e570a..cbde928 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -28,11 +28,10 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters;
import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.task.BatchAppenderators;
-import org.apache.druid.indexing.common.task.CachingSegmentAllocator;
import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.InputSourceProcessor;
-import org.apache.druid.indexing.common.task.NonLinearlyPartitionedSequenceNameFunction;
+import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SequenceNameFunction;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
@@ -129,7 +128,7 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
/**
* @return {@link SegmentAllocator} suitable for the desired segment partitioning strategy.
*/
- abstract CachingSegmentAllocator createSegmentAllocator(
+ abstract SegmentAllocatorForBatch createSegmentAllocator(
TaskToolbox toolbox,
ParallelIndexSupervisorTaskClient taskClient
) throws IOException;
@@ -171,11 +170,8 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
final PartitionsSpec partitionsSpec = tuningConfig.getGivenOrDefaultPartitionsSpec();
final long pushTimeout = tuningConfig.getPushTimeout();
- final CachingSegmentAllocator segmentAllocator = createSegmentAllocator(toolbox, taskClient);
- final SequenceNameFunction sequenceNameFunction = new NonLinearlyPartitionedSequenceNameFunction(
- getId(),
- segmentAllocator.getShardSpecs()
- );
+ final SegmentAllocatorForBatch segmentAllocator = createSegmentAllocator(toolbox, taskClient);
+ final SequenceNameFunction sequenceNameFunction = segmentAllocator.getSequenceNameFunction();
final Appenderator appenderator = BatchAppenderators.newAppenderator(
getId(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
index 96596be..b105d5e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
@@ -139,10 +139,10 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
// Group partitionLocations by interval and partitionId
- final Map<Interval, Int2ObjectMap<List<P>>> intervalToPartitions = new HashMap<>();
+ final Map<Interval, Int2ObjectMap<List<P>>> intervalToBuckets = new HashMap<>();
for (P location : ioConfig.getPartitionLocations()) {
- intervalToPartitions.computeIfAbsent(location.getInterval(), k -> new Int2ObjectOpenHashMap<>())
- .computeIfAbsent(location.getPartitionId(), k -> new ArrayList<>())
+ intervalToBuckets.computeIfAbsent(location.getInterval(), k -> new Int2ObjectOpenHashMap<>())
+ .computeIfAbsent(location.getBucketId(), k -> new ArrayList<>())
.add(location);
}
@@ -168,7 +168,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
final Stopwatch fetchStopwatch = Stopwatch.createStarted();
final Map<Interval, Int2ObjectMap<List<File>>> intervalToUnzippedFiles = fetchSegmentFiles(
toolbox,
- intervalToPartitions
+ intervalToBuckets
);
final long fetchTime = fetchStopwatch.elapsed(TimeUnit.SECONDS);
fetchStopwatch.stop();
@@ -202,7 +202,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
private Map<Interval, Int2ObjectMap<List<File>>> fetchSegmentFiles(
TaskToolbox toolbox,
- Map<Interval, Int2ObjectMap<List<P>>> intervalToPartitions
+ Map<Interval, Int2ObjectMap<List<P>>> intervalToBuckets
) throws IOException
{
final File tempDir = toolbox.getIndexingTmpDir();
@@ -211,26 +211,26 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
final Map<Interval, Int2ObjectMap<List<File>>> intervalToUnzippedFiles = new HashMap<>();
// Fetch partition files
- for (Entry<Interval, Int2ObjectMap<List<P>>> entryPerInterval : intervalToPartitions.entrySet()) {
+ for (Entry<Interval, Int2ObjectMap<List<P>>> entryPerInterval : intervalToBuckets.entrySet()) {
final Interval interval = entryPerInterval.getKey();
- for (Int2ObjectMap.Entry<List<P>> entryPerPartitionId :
+ for (Int2ObjectMap.Entry<List<P>> entryPerBucketId :
entryPerInterval.getValue().int2ObjectEntrySet()) {
- final int partitionId = entryPerPartitionId.getIntKey();
+ final int bucketId = entryPerBucketId.getIntKey();
final File partitionDir = FileUtils.getFile(
tempDir,
interval.getStart().toString(),
interval.getEnd().toString(),
- Integer.toString(partitionId)
+ Integer.toString(bucketId)
);
FileUtils.forceMkdir(partitionDir);
- for (P location : entryPerPartitionId.getValue()) {
+ for (P location : entryPerBucketId.getValue()) {
final File zippedFile = shuffleClient.fetchSegmentFile(partitionDir, supervisorTaskId, location);
try {
final File unzippedDir = new File(partitionDir, StringUtils.format("unzipped_%s", location.getSubTaskId()));
FileUtils.forceMkdir(unzippedDir);
CompressionUtils.unzip(zippedFile, unzippedDir);
intervalToUnzippedFiles.computeIfAbsent(interval, k -> new Int2ObjectOpenHashMap<>())
- .computeIfAbsent(partitionId, k -> new ArrayList<>())
+ .computeIfAbsent(bucketId, k -> new ArrayList<>())
.add(unzippedDir);
}
finally {
@@ -247,7 +247,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
/**
* Create a {@link ShardSpec} suitable for the desired secondary partitioning strategy.
*/
- abstract S createShardSpec(TaskToolbox toolbox, Interval interval, int partitionId);
+ abstract S createShardSpec(TaskToolbox toolbox, Interval interval, int bucketId);
private Set<DataSegment> mergeAndPushSegments(
TaskToolbox toolbox,
@@ -262,9 +262,9 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
final Set<DataSegment> pushedSegments = new HashSet<>();
for (Entry<Interval, Int2ObjectMap<List<File>>> entryPerInterval : intervalToUnzippedFiles.entrySet()) {
final Interval interval = entryPerInterval.getKey();
- for (Int2ObjectMap.Entry<List<File>> entryPerPartitionId : entryPerInterval.getValue().int2ObjectEntrySet()) {
- final int partitionId = entryPerPartitionId.getIntKey();
- final List<File> segmentFilesToMerge = entryPerPartitionId.getValue();
+ for (Int2ObjectMap.Entry<List<File>> entryPerBucketId : entryPerInterval.getValue().int2ObjectEntrySet()) {
+ final int bucketId = entryPerBucketId.getIntKey();
+ final List<File> segmentFilesToMerge = entryPerBucketId.getValue();
final Pair<File, List<String>> mergedFileAndDimensionNames = mergeSegmentsInSamePartition(
dataSchema,
tuningConfig,
@@ -290,7 +290,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
null, // will be filled in the segmentPusher
mergedFileAndDimensionNames.rhs,
metricNames,
- createShardSpec(toolbox, interval, partitionId),
+ createShardSpec(toolbox, interval, bucketId),
null, // will be filled in the segmentPusher
0 // will be filled in the segmentPusher
),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionLocation.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionLocation.java
index e6578c5..da382ce 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionLocation.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionLocation.java
@@ -29,7 +29,7 @@ import java.util.Objects;
/**
* This class represents the intermediary data server where the partition of {@link #interval} and
- * {@link #getPartitionId()} is stored.
+ * {@link #getBucketId()} is stored.
*/
abstract class PartitionLocation<T>
{
@@ -93,13 +93,13 @@ abstract class PartitionLocation<T>
return secondaryPartition;
}
- abstract int getPartitionId();
+ abstract int getBucketId();
final URI toIntermediaryDataServerURI(String supervisorTaskId)
{
return URI.create(
StringUtils.format(
- "%s://%s:%d/druid/worker/v1/shuffle/task/%s/%s/partition?startTime=%s&endTime=%s&partitionId=%d",
+ "%s://%s:%d/druid/worker/v1/shuffle/task/%s/%s/partition?startTime=%s&endTime=%s&bucketId=%d",
useHttps ? "https" : "http",
host,
port,
@@ -107,7 +107,7 @@ abstract class PartitionLocation<T>
StringUtils.urlEncode(subTaskId),
interval.getStart(),
interval.getEnd(),
- getPartitionId()
+ getBucketId()
)
);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionStat.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionStat.java
index 66974c2..c7f1a55 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionStat.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionStat.java
@@ -104,7 +104,7 @@ abstract class PartitionStat<T>
/**
* @return Uniquely identifying index from 0..N-1 of the N partitions
*/
- abstract int getPartitionId();
+ abstract int getBucketId();
/**
* @return Definition of secondary partition. For example, for range partitioning, this should include the start/end.
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 9f1dc52..61908a8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -316,6 +316,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
final boolean explicitIntervals = granularitySpec.bucketIntervals().isPresent();
final SegmentAllocator segmentAllocator = SegmentAllocators.forLinearPartitioning(
toolbox,
+ getId(),
new SupervisorTaskAccess(getSupervisorTaskId(), taskClient),
getIngestionSchema().getDataSchema(),
getTaskLockHelper(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SubTaskReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SubTaskReport.java
index 564b3af..26f20f6 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SubTaskReport.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SubTaskReport.java
@@ -30,7 +30,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
@JsonTypeInfo(use = Id.NAME, property = "type", defaultImpl = PushedSegmentsReport.class)
@JsonSubTypes(value = {
@Type(name = PushedSegmentsReport.TYPE, value = PushedSegmentsReport.class),
- @Type(name = GeneratedHashPartitionsReport.TYPE, value = GeneratedHashPartitionsReport.class),
@Type(name = DimensionDistributionReport.TYPE, value = DimensionDistributionReport.class),
@Type(name = GeneratedPartitionsMetadataReport.TYPE, value = GeneratedPartitionsMetadataReport.class)
})
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/CompletePartitionAnalysis.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/CompletePartitionAnalysis.java
index 2e43280..efecdda 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/CompletePartitionAnalysis.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/CompletePartitionAnalysis.java
@@ -21,12 +21,11 @@ package org.apache.druid.indexing.common.task.batch.partition;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
-import java.util.function.Function;
/**
* This interface represents the PartitionAnalysis that has the complete picture of secondary partitions to create.
@@ -35,9 +34,5 @@ import java.util.function.Function;
*/
public interface CompletePartitionAnalysis<T, P extends PartitionsSpec> extends PartitionAnalysis<T, P>
{
- Map<Interval, List<SegmentIdWithShardSpec>> convertToIntervalToSegmentIds(
- TaskToolbox toolbox,
- String dataSource,
- Function<Interval, String> versionFinder
- );
+ Map<Interval, List<BucketNumberedShardSpec<?>>> createBuckets(TaskToolbox toolbox);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/HashPartitionAnalysis.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/HashPartitionAnalysis.java
index a4b3a86..5773f09 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/HashPartitionAnalysis.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/HashPartitionAnalysis.java
@@ -23,8 +23,8 @@ import com.google.common.collect.Maps;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
-import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
+import org.apache.druid.timeline.partition.HashBucketShardSpec;
import org.joda.time.Interval;
import java.util.Collections;
@@ -33,7 +33,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
-import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -92,37 +91,23 @@ public class HashPartitionAnalysis implements CompletePartitionAnalysis<Integer,
}
@Override
- public Map<Interval, List<SegmentIdWithShardSpec>> convertToIntervalToSegmentIds(
- TaskToolbox toolbox,
- String dataSource,
- Function<Interval, String> versionFinder
- )
+ public Map<Interval, List<BucketNumberedShardSpec<?>>> createBuckets(TaskToolbox toolbox)
{
- final Map<Interval, List<SegmentIdWithShardSpec>> intervalToSegmentIds =
- Maps.newHashMapWithExpectedSize(getNumTimePartitions());
-
+ final Map<Interval, List<BucketNumberedShardSpec<?>>> intervalToLookup = Maps.newHashMapWithExpectedSize(
+ intervalToNumBuckets.size()
+ );
forEach((interval, numBuckets) -> {
- intervalToSegmentIds.put(
- interval,
- IntStream.range(0, numBuckets)
- .mapToObj(i -> {
- final HashBasedNumberedShardSpec shardSpec = new HashBasedNumberedShardSpec(
- i,
- numBuckets,
- partitionsSpec.getPartitionDimensions(),
- toolbox.getJsonMapper()
- );
- return new SegmentIdWithShardSpec(
- dataSource,
- interval,
- versionFinder.apply(interval),
- shardSpec
- );
- })
- .collect(Collectors.toList())
- );
+ final List<BucketNumberedShardSpec<?>> buckets = IntStream
+ .range(0, numBuckets)
+ .mapToObj(i -> new HashBucketShardSpec(
+ i,
+ numBuckets,
+ partitionsSpec.getPartitionDimensions(),
+ toolbox.getJsonMapper()
+ ))
+ .collect(Collectors.toList());
+ intervalToLookup.put(interval, buckets);
});
-
- return intervalToSegmentIds;
+ return intervalToLookup;
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/RangePartitionAnalysis.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/RangePartitionAnalysis.java
index b275393..c8a2b88 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/RangePartitionAnalysis.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/RangePartitionAnalysis.java
@@ -23,19 +23,17 @@ import com.google.common.collect.Maps;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionBoundaries;
-import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
+import org.apache.druid.timeline.partition.RangeBucketShardSpec;
import org.joda.time.Interval;
-import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
-import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -90,44 +88,13 @@ public class RangePartitionAnalysis
return intervalToPartitionBoundaries.size();
}
- @Override
- public Map<Interval, List<SegmentIdWithShardSpec>> convertToIntervalToSegmentIds(
- TaskToolbox toolbox,
- String dataSource,
- Function<Interval, String> versionFinder
- )
- {
- final String partitionDimension = partitionsSpec.getPartitionDimension();
- final Map<Interval, List<SegmentIdWithShardSpec>> intervalToSegmentIds = Maps.newHashMapWithExpectedSize(
- getNumTimePartitions()
- );
-
- forEach((interval, partitionBoundaries) ->
- intervalToSegmentIds.put(
- interval,
- translatePartitionBoundaries(
- dataSource,
- interval,
- partitionDimension,
- partitionBoundaries,
- versionFinder
- )
- )
- );
-
- return intervalToSegmentIds;
- }
-
/**
* Translate {@link PartitionBoundaries} into the corresponding
* {@link SingleDimensionPartitionsSpec} with segment id.
*/
- private static List<SegmentIdWithShardSpec> translatePartitionBoundaries(
- String dataSource,
- Interval interval,
+ private static List<BucketNumberedShardSpec<?>> translatePartitionBoundaries(
String partitionDimension,
- PartitionBoundaries partitionBoundaries,
- Function<Interval, String> versionFinder
+ PartitionBoundaries partitionBoundaries
)
{
if (partitionBoundaries.isEmpty()) {
@@ -135,40 +102,30 @@ public class RangePartitionAnalysis
}
return IntStream.range(0, partitionBoundaries.size() - 1)
- .mapToObj(i -> createSegmentIdWithShardSpec(
- dataSource,
- interval,
- versionFinder.apply(interval),
+ .mapToObj(i -> new RangeBucketShardSpec(
+ i,
partitionDimension,
partitionBoundaries.get(i),
- partitionBoundaries.get(i + 1),
- i
+ partitionBoundaries.get(i + 1)
))
.collect(Collectors.toList());
}
- private static SegmentIdWithShardSpec createSegmentIdWithShardSpec(
- String dataSource,
- Interval interval,
- String version,
- String partitionDimension,
- String partitionStart,
- @Nullable String partitionEnd,
- int partitionNum
- )
+ @Override
+ public Map<Interval, List<BucketNumberedShardSpec<?>>> createBuckets(TaskToolbox toolbox)
{
- // The shardSpec created here will be reused in PartialGenericSegmentMergeTask. This is ok because
- // all PartialSegmentGenerateTasks create the same set of segmentIds (and thus shardSpecs).
- return new SegmentIdWithShardSpec(
- dataSource,
- interval,
- version,
- new SingleDimensionShardSpec(
- partitionDimension,
- partitionStart,
- partitionEnd,
- partitionNum
- )
+ final String partitionDimension = partitionsSpec.getPartitionDimension();
+ final Map<Interval, List<BucketNumberedShardSpec<?>>> intervalToSegmentIds = Maps.newHashMapWithExpectedSize(
+ getNumTimePartitions()
);
+
+ forEach((interval, partitionBoundaries) ->
+ intervalToSegmentIds.put(
+ interval,
+ translatePartitionBoundaries(partitionDimension, partitionBoundaries)
+ )
+ );
+
+ return intervalToSegmentIds;
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
index 78090ca..6df598c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
@@ -70,7 +70,7 @@ import java.util.stream.IntStream;
* and phase 2 tasks read those files via HTTP.
*
* The directory where segment files are placed is structured as
- * {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/partitionIdOfSegment.
+ * {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/bucketIdOfSegment.
*
* This class provides interfaces to store, find, and remove segment files.
* It also has a self-cleanup mechanism to clean up stale segment files. It periodically checks the last access time
@@ -335,11 +335,11 @@ public class IntermediaryDataManager
}
@Nullable
- public File findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int partitionId)
+ public File findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int bucketId)
{
TaskIdUtils.validateId("supervisorTaskId", supervisorTaskId);
for (StorageLocation location : shuffleDataLocations) {
- final File partitionDir = new File(location.getPath(), getPartitionDir(supervisorTaskId, interval, partitionId));
+ final File partitionDir = new File(location.getPath(), getPartitionDir(supervisorTaskId, interval, bucketId));
if (partitionDir.exists()) {
supervisorTaskCheckTimes.put(supervisorTaskId, getExpiryTimeFromNow());
final File[] segmentFiles = partitionDir.listFiles();
@@ -384,23 +384,23 @@ public class IntermediaryDataManager
String supervisorTaskId,
String subTaskId,
Interval interval,
- int partitionId
+ int bucketId
)
{
- return Paths.get(getPartitionDir(supervisorTaskId, interval, partitionId), subTaskId).toString();
+ return Paths.get(getPartitionDir(supervisorTaskId, interval, bucketId), subTaskId).toString();
}
private static String getPartitionDir(
String supervisorTaskId,
Interval interval,
- int partitionId
+ int bucketId
)
{
return Paths.get(
supervisorTaskId,
interval.getStart().toString(),
interval.getEnd().toString(),
- String.valueOf(partitionId)
+ String.valueOf(bucketId)
).toString();
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java
index d1adcb9..0e0e936 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java
@@ -75,7 +75,7 @@ public class ShuffleResource
@PathParam("subTaskId") String subTaskId,
@QueryParam("startTime") String startTime,
@QueryParam("endTime") String endTime,
- @QueryParam("partitionId") int partitionId
+ @QueryParam("bucketId") int bucketId
)
{
final Interval interval = new Interval(DateTimes.of(startTime), DateTimes.of(endTime));
@@ -83,16 +83,16 @@ public class ShuffleResource
supervisorTaskId,
subTaskId,
interval,
- partitionId
+ bucketId
);
if (partitionFile == null) {
final String errorMessage = StringUtils.format(
- "Can't find the partition for supervisorTask[%s], subTask[%s], interval[%s], and partitionId[%s]",
+ "Can't find the partition for supervisorTask[%s], subTask[%s], interval[%s], and bucketId[%s]",
supervisorTaskId,
subTaskId,
interval,
- partitionId
+ bucketId
);
return Response.status(Status.NOT_FOUND).entity(errorMessage).build();
} else {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
index c5301bf..ce1e1fc 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
@@ -880,14 +880,14 @@ public class SegmentAllocateActionTest
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
- .shardSpec(new SingleDimensionShardSpec("foo", null, "bar", 0))
+ .shardSpec(new SingleDimensionShardSpec("foo", null, "bar", 0, 2))
.size(0)
.build(),
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
- .shardSpec(new SingleDimensionShardSpec("foo", "bar", null, 1))
+ .shardSpec(new SingleDimensionShardSpec("foo", "bar", null, 1, 2))
.size(0)
.build()
)
@@ -914,14 +914,14 @@ public class SegmentAllocateActionTest
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
- .shardSpec(new HashBasedNumberedShardSpec(0, 2, ImmutableList.of("dim1"), objectMapper))
+ .shardSpec(new HashBasedNumberedShardSpec(0, 2, 0, 2, ImmutableList.of("dim1"), objectMapper))
.size(0)
.build(),
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
- .shardSpec(new HashBasedNumberedShardSpec(1, 2, ImmutableList.of("dim1"), objectMapper))
+ .shardSpec(new HashBasedNumberedShardSpec(1, 2, 1, 2, ImmutableList.of("dim1"), objectMapper))
.size(0)
.build()
)
@@ -935,7 +935,7 @@ public class SegmentAllocateActionTest
"seq",
null,
true,
- new HashBasedNumberedPartialShardSpec(ImmutableList.of("dim1"), 2),
+ new HashBasedNumberedPartialShardSpec(ImmutableList.of("dim1"), 1, 2),
lockGranularity
);
final SegmentIdWithShardSpec segmentIdentifier = action.perform(task, taskActionTestKit.getTaskActionToolbox());
@@ -946,7 +946,7 @@ public class SegmentAllocateActionTest
Assert.assertTrue(shardSpec instanceof HashBasedNumberedShardSpec);
final HashBasedNumberedShardSpec hashBasedNumberedShardSpec = (HashBasedNumberedShardSpec) shardSpec;
- Assert.assertEquals(2, hashBasedNumberedShardSpec.getPartitions());
+ Assert.assertEquals(2, hashBasedNumberedShardSpec.getNumCorePartitions());
Assert.assertEquals(ImmutableList.of("dim1"), hashBasedNumberedShardSpec.getPartitionDimensions());
}
@@ -1029,10 +1029,7 @@ public class SegmentAllocateActionTest
if (expected.getShardSpec().getClass() == NumberedShardSpec.class
&& actual.getShardSpec().getClass() == NumberedShardSpec.class) {
- Assert.assertEquals(
- ((NumberedShardSpec) expected.getShardSpec()).getPartitions(),
- ((NumberedShardSpec) actual.getShardSpec()).getPartitions()
- );
+ Assert.assertEquals(expected.getShardSpec().getNumCorePartitions(), actual.getShardSpec().getNumCorePartitions());
} else if (expected.getShardSpec().getClass() == LinearShardSpec.class
&& actual.getShardSpec().getClass() == LinearShardSpec.class) {
// do nothing
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 6e3fcf5..513c13c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -239,13 +239,13 @@ public class IndexTaskTest extends IngestionTestBase
Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval());
Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass());
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
- Assert.assertEquals(2, ((NumberedShardSpec) segments.get(0).getShardSpec()).getPartitions());
+ Assert.assertEquals(2, ((NumberedShardSpec) segments.get(0).getShardSpec()).getNumCorePartitions());
Assert.assertEquals("test", segments.get(1).getDataSource());
Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(1).getInterval());
Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(1).getShardSpec().getClass());
Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum());
- Assert.assertEquals(2, ((NumberedShardSpec) segments.get(1).getShardSpec()).getPartitions());
+ Assert.assertEquals(2, ((NumberedShardSpec) segments.get(1).getShardSpec()).getNumCorePartitions());
}
@Test
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RangePartitionCachingLocalSegmentAllocatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RangePartitionCachingLocalSegmentAllocatorTest.java
index 12a1615..e841fba 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RangePartitionCachingLocalSegmentAllocatorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RangePartitionCachingLocalSegmentAllocatorTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.common.task;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
@@ -30,11 +31,13 @@ import org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnaly
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.granularity.NoneGranularity;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.PartitionBoundaries;
-import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
+import org.apache.druid.timeline.partition.RangeBucketShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
@@ -82,7 +85,7 @@ public class RangePartitionCachingLocalSegmentAllocatorTest
INTERVAL_NORMAL, NORMAL_PARTITIONS
);
- private CachingSegmentAllocator target;
+ private SegmentAllocator target;
private SequenceNameFunction sequenceNameFunction;
@Rule
@@ -105,11 +108,11 @@ public class RangePartitionCachingLocalSegmentAllocatorTest
toolbox,
DATASOURCE,
TASKID,
- new NoneGranularity(),
+ new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, ImmutableList.of()),
new SupervisorTaskAccessWithNullClient(SUPERVISOR_TASKID),
partitionAnalysis
);
- sequenceNameFunction = new NonLinearlyPartitionedSequenceNameFunction(TASKID, target.getShardSpecs());
+ sequenceNameFunction = ((CachingLocalSegmentAllocator) target).getSequenceNameFunction();
}
@Test
@@ -163,37 +166,37 @@ public class RangePartitionCachingLocalSegmentAllocatorTest
}
@SuppressWarnings("SameParameterValue")
- private void testAllocate(InputRow row, Interval interval, int partitionNum)
+ private void testAllocate(InputRow row, Interval interval, int bucketId)
{
- String partitionEnd = getPartitionEnd(interval, partitionNum);
- testAllocate(row, interval, partitionNum, partitionEnd);
+ String partitionEnd = getPartitionEnd(interval, bucketId);
+ testAllocate(row, interval, bucketId, partitionEnd);
}
@Nullable
- private static String getPartitionEnd(Interval interval, int partitionNum)
+ private static String getPartitionEnd(Interval interval, int bucketId)
{
PartitionBoundaries partitions = INTERVAL_TO_PARTITONS.get(interval);
- boolean isLastPartition = (partitionNum + 1) == partitions.size();
- return isLastPartition ? null : partitions.get(partitionNum + 1);
+ boolean isLastPartition = (bucketId + 1) == partitions.size();
+ return isLastPartition ? null : partitions.get(bucketId + 1);
}
- private void testAllocate(InputRow row, Interval interval, int partitionNum, @Nullable String partitionEnd)
+ private void testAllocate(InputRow row, Interval interval, int bucketId, @Nullable String partitionEnd)
{
- String partitionStart = getPartitionStart(interval, partitionNum);
- testAllocate(row, interval, partitionNum, partitionStart, partitionEnd);
+ String partitionStart = getPartitionStart(interval, bucketId);
+ testAllocate(row, interval, bucketId, partitionStart, partitionEnd);
}
@Nullable
- private static String getPartitionStart(Interval interval, int partitionNum)
+ private static String getPartitionStart(Interval interval, int bucketId)
{
- boolean isFirstPartition = partitionNum == 0;
- return isFirstPartition ? null : INTERVAL_TO_PARTITONS.get(interval).get(partitionNum);
+ boolean isFirstPartition = bucketId == 0;
+ return isFirstPartition ? null : INTERVAL_TO_PARTITONS.get(interval).get(bucketId);
}
private void testAllocate(
InputRow row,
Interval interval,
- int partitionNum,
+ int bucketId,
@Nullable String partitionStart,
@Nullable String partitionEnd
)
@@ -202,12 +205,12 @@ public class RangePartitionCachingLocalSegmentAllocatorTest
SegmentIdWithShardSpec segmentIdWithShardSpec = allocate(row, sequenceName);
Assert.assertEquals(
- SegmentId.of(DATASOURCE, interval, INTERVAL_TO_VERSION.get(interval), partitionNum),
+ SegmentId.of(DATASOURCE, interval, INTERVAL_TO_VERSION.get(interval), bucketId),
segmentIdWithShardSpec.asSegmentId()
);
- SingleDimensionShardSpec shardSpec = (SingleDimensionShardSpec) segmentIdWithShardSpec.getShardSpec();
+ RangeBucketShardSpec shardSpec = (RangeBucketShardSpec) segmentIdWithShardSpec.getShardSpec();
Assert.assertEquals(PARTITION_DIMENSION, shardSpec.getDimension());
- Assert.assertEquals(partitionNum, shardSpec.getPartitionNum());
+ Assert.assertEquals(bucketId, shardSpec.getBucketId());
Assert.assertEquals(partitionStart, shardSpec.getStart());
Assert.assertEquals(partitionEnd, shardSpec.getEnd());
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ShardSpecsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ShardSpecsTest.java
index 64b0a0c..0818605 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ShardSpecsTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ShardSpecsTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputRow;
@@ -28,7 +29,8 @@ import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
+import org.apache.druid.timeline.partition.HashBucketShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
@@ -50,11 +52,11 @@ public class ShardSpecsTest extends IngestionTestBase
@Test
public void testShardSpecSelectionWithNullPartitionDimension()
{
- ShardSpec spec1 = new HashBasedNumberedShardSpec(0, 2, null, jsonMapper);
- ShardSpec spec2 = new HashBasedNumberedShardSpec(1, 2, null, jsonMapper);
+ HashBucketShardSpec spec1 = new HashBucketShardSpec(0, 2, null, jsonMapper);
+ HashBucketShardSpec spec2 = new HashBucketShardSpec(1, 2, null, jsonMapper);
- Map<Interval, List<ShardSpec>> shardSpecMap = new HashMap<>();
- shardSpecMap.put(Intervals.of("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z"), Lists.newArrayList(spec1, spec2));
+ Map<Interval, List<BucketNumberedShardSpec<?>>> shardSpecMap = new HashMap<>();
+ shardSpecMap.put(Intervals.of("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z"), ImmutableList.of(spec1, spec2));
ShardSpecs shardSpecs = new ShardSpecs(shardSpecMap, Granularities.HOUR);
String visitorId = "visitorId";
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
index 7970e1d..64949de 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
@@ -117,31 +117,23 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
maxNumConcurrentSubTasks
);
+ return runTask(task, expectedTaskStatus);
+ }
+
+ Set<DataSegment> runTask(ParallelIndexSupervisorTask task, TaskState expectedTaskStatus)
+ {
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
TaskStatus taskStatus = getIndexingServiceClient().runAndWait(task);
Assert.assertEquals(expectedTaskStatus, taskStatus.getStatusCode());
return getIndexingServiceClient().getPublishedSegments(task);
}
- private ParallelIndexSupervisorTask newTask(
- @Nullable TimestampSpec timestampSpec,
- @Nullable DimensionsSpec dimensionsSpec,
- @Nullable InputFormat inputFormat,
- @Nullable ParseSpec parseSpec,
- Interval interval,
- File inputDir,
- String filter,
+ ParallelIndexTuningConfig newTuningConfig(
DimensionBasedPartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks
)
{
- GranularitySpec granularitySpec = new UniformGranularitySpec(
- Granularities.DAY,
- Granularities.MINUTE,
- interval == null ? null : Collections.singletonList(interval)
- );
-
- ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
+ return new ParallelIndexTuningConfig(
null,
null,
null,
@@ -169,6 +161,27 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
null,
null
);
+ }
+
+ private ParallelIndexSupervisorTask newTask(
+ @Nullable TimestampSpec timestampSpec,
+ @Nullable DimensionsSpec dimensionsSpec,
+ @Nullable InputFormat inputFormat,
+ @Nullable ParseSpec parseSpec,
+ Interval interval,
+ File inputDir,
+ String filter,
+ DimensionBasedPartitionsSpec partitionsSpec,
+ int maxNumConcurrentSubTasks
+ )
+ {
+ GranularitySpec granularitySpec = new UniformGranularitySpec(
+ Granularities.DAY,
+ Granularities.MINUTE,
+ interval == null ? null : Collections.singletonList(interval)
+ );
+
+ ParallelIndexTuningConfig tuningConfig = newTuningConfig(partitionsSpec, maxNumConcurrentSubTasks);
final ParallelIndexIngestionSpec ingestionSpec;
@@ -185,9 +198,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
"dataSource",
timestampSpec,
dimensionsSpec,
- new AggregatorFactory[]{
- new LongSumAggregatorFactory("val", "val")
- },
+ new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")},
granularitySpec,
null
),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index ca33815..470a003 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -521,7 +521,6 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
new NamedType(ParallelIndexSupervisorTask.class, ParallelIndexSupervisorTask.TYPE),
new NamedType(SinglePhaseSubTask.class, SinglePhaseSubTask.TYPE),
new NamedType(PartialHashSegmentGenerateTask.class, PartialHashSegmentGenerateTask.TYPE),
- new NamedType(PartialHashSegmentMergeTask.class, PartialHashSegmentMergeTask.TYPE),
new NamedType(PartialRangeSegmentGenerateTask.class, PartialRangeSegmentGenerateTask.TYPE),
new NamedType(PartialGenericSegmentMergeTask.class, PartialGenericSegmentMergeTask.TYPE),
new NamedType(PartialDimensionDistributionTask.class, PartialDimensionDistributionTask.TYPE)
@@ -646,7 +645,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
supervisorTaskId,
location.getSubTaskId(),
location.getInterval(),
- location.getPartitionId()
+ location.getBucketId()
);
if (zippedFile == null) {
throw new ISE("Can't find segment file for location[%s] at path[%s]", location);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedHashPartitionsReportTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedHashPartitionsReportTest.java
deleted file mode 100644
index 1343b94..0000000
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedHashPartitionsReportTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.batch.parallel;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.segment.TestHelper;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Collections;
-
-public class GeneratedHashPartitionsReportTest
-{
- private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper();
-
- private GeneratedHashPartitionsReport target;
-
- @Before
- public void setup()
- {
- target = new GeneratedHashPartitionsReport(
- "task-id",
- Collections.singletonList(
- new HashPartitionStat(
- ParallelIndexTestingFactory.TASK_EXECUTOR_HOST,
- ParallelIndexTestingFactory.TASK_EXECUTOR_PORT,
- ParallelIndexTestingFactory.USE_HTTPS,
- ParallelIndexTestingFactory.INTERVAL,
- ParallelIndexTestingFactory.PARTITION_ID,
- ParallelIndexTestingFactory.NUM_ROWS,
- ParallelIndexTestingFactory.SIZE_BYTES
- )
- )
- );
- }
-
- @Test
- public void serializesDeserializes()
- {
- TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
- }
-}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocationTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocationTest.java
index 956dbc8..4e46e38 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocationTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocationTest.java
@@ -53,6 +53,6 @@ public class GenericPartitionLocationTest
@Test
public void hasPartitionIdThatMatchesShardSpec()
{
- Assert.assertEquals(ParallelIndexTestingFactory.PARTITION_ID, target.getPartitionId());
+ Assert.assertEquals(ParallelIndexTestingFactory.PARTITION_ID, target.getBucketId());
}
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java
index 2bcac8e..ffeab43 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java
@@ -21,10 +21,13 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.timeline.partition.HashBucketShardSpec;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.util.Collections;
+
public class GenericPartitionStatTest
{
private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper();
@@ -39,7 +42,12 @@ public class GenericPartitionStatTest
ParallelIndexTestingFactory.TASK_EXECUTOR_PORT,
ParallelIndexTestingFactory.USE_HTTPS,
ParallelIndexTestingFactory.INTERVAL,
- ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC,
+ new HashBucketShardSpec(
+ ParallelIndexTestingFactory.PARTITION_ID,
+ ParallelIndexTestingFactory.PARTITION_ID + 1,
+ Collections.singletonList("dim"),
+ new ObjectMapper()
+ ),
ParallelIndexTestingFactory.NUM_ROWS,
ParallelIndexTestingFactory.SIZE_BYTES
);
@@ -54,6 +62,6 @@ public class GenericPartitionStatTest
@Test
public void hasPartitionIdThatMatchesSecondaryPartition()
{
- Assert.assertEquals(target.getSecondaryPartition().getPartitionNum(), target.getPartitionId());
+ Assert.assertEquals(target.getSecondaryPartition().getBucketId(), target.getBucketId());
}
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionAdjustingCorePartitionSizeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionAdjustingCorePartitionSizeTest.java
new file mode 100644
index 0000000..e19b208
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionAdjustingCorePartitionSizeTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+
+@RunWith(Parameterized.class)
+public class HashPartitionAdjustingCorePartitionSizeTest extends AbstractMultiPhaseParallelIndexingTest
+{
+ private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null);
+ private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
+ DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))
+ );
+ private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
+ Arrays.asList("ts", "dim1", "dim2", "val"),
+ null,
+ false,
+ false,
+ 0
+ );
+ private static final Interval INTERVAL_TO_INDEX = Intervals.of("2020-01-01/P1M");
+
+ @Parameterized.Parameters(name = "{0}, maxNumConcurrentSubTasks={1}")
+ public static Iterable<Object[]> constructorFeeder()
+ {
+ return ImmutableList.of(
+ new Object[]{LockGranularity.TIME_CHUNK, 2},
+ new Object[]{LockGranularity.TIME_CHUNK, 1},
+ new Object[]{LockGranularity.SEGMENT, 2}
+ );
+ }
+
+ private final int maxNumConcurrentSubTasks;
+
+ public HashPartitionAdjustingCorePartitionSizeTest(LockGranularity lockGranularity, int maxNumConcurrentSubTasks)
+ {
+ super(lockGranularity, true);
+ this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
+ }
+
+ @Test
+ public void testLessPartitionsThanBuckets() throws IOException
+ {
+ final File inputDir = temporaryFolder.newFolder();
+ for (int i = 0; i < 3; i++) {
+ try (final Writer writer =
+ Files.newBufferedWriter(new File(inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8)) {
+ writer.write(StringUtils.format("2020-01-01T00:00:00,%s,b1,%d\n", "a" + (i + 1), 10 * (i + 1)));
+ }
+ }
+ final DimensionBasedPartitionsSpec partitionsSpec = new HashedPartitionsSpec(
+ null,
+ 10,
+ ImmutableList.of("dim1")
+ );
+ final List<DataSegment> segments = new ArrayList<>(
+ runTestTask(
+ TIMESTAMP_SPEC,
+ DIMENSIONS_SPEC,
+ INPUT_FORMAT,
+ null,
+ INTERVAL_TO_INDEX,
+ inputDir,
+ "test_*",
+ partitionsSpec,
+ maxNumConcurrentSubTasks,
+ TaskState.SUCCESS
+ )
+ );
+ Assert.assertEquals(3, segments.size());
+ segments.sort(Comparator.comparing(segment -> segment.getShardSpec().getPartitionNum()));
+ int prevPartitionId = -1;
+ for (DataSegment segment : segments) {
+ Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
+ final HashBasedNumberedShardSpec shardSpec = (HashBasedNumberedShardSpec) segment.getShardSpec();
+ Assert.assertEquals(3, shardSpec.getNumCorePartitions());
+ Assert.assertEquals(10, shardSpec.getNumBuckets());
+ Assert.assertEquals(ImmutableList.of("dim1"), shardSpec.getPartitionDimensions());
+ Assert.assertEquals(prevPartitionId + 1, shardSpec.getPartitionNum());
+ prevPartitionId = shardSpec.getPartitionNum();
+ }
+ }
+
+ @Test
+ public void testEqualNumberOfPartitionsToBuckets() throws IOException
+ {
+ final File inputDir = temporaryFolder.newFolder();
+ for (int i = 0; i < 10; i++) {
+ try (final Writer writer =
+ Files.newBufferedWriter(new File(inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8)) {
+ writer.write(StringUtils.format("2020-01-01T00:00:00,%s,b1,%d\n", "aa" + (i + 10), 10 * (i + 1)));
+ }
+ }
+ final DimensionBasedPartitionsSpec partitionsSpec = new HashedPartitionsSpec(
+ null,
+ 5,
+ ImmutableList.of("dim1")
+ );
+ final Set<DataSegment> segments = runTestTask(
+ TIMESTAMP_SPEC,
+ DIMENSIONS_SPEC,
+ INPUT_FORMAT,
+ null,
+ INTERVAL_TO_INDEX,
+ inputDir,
+ "test_*",
+ partitionsSpec,
+ maxNumConcurrentSubTasks,
+ TaskState.SUCCESS
+ );
+ Assert.assertEquals(5, segments.size());
+ segments.forEach(segment -> {
+ Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
+ final HashBasedNumberedShardSpec shardSpec = (HashBasedNumberedShardSpec) segment.getShardSpec();
+ Assert.assertEquals(5, shardSpec.getNumCorePartitions());
+ Assert.assertEquals(5, shardSpec.getNumBuckets());
+ Assert.assertEquals(ImmutableList.of("dim1"), shardSpec.getPartitionDimensions());
+ });
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionCachingLocalSegmentAllocatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionCachingLocalSegmentAllocatorTest.java
index 16c20b3..ef8f095 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionCachingLocalSegmentAllocatorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionCachingLocalSegmentAllocatorTest.java
@@ -29,18 +29,19 @@ import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.common.task.CachingSegmentAllocator;
-import org.apache.druid.indexing.common.task.NonLinearlyPartitionedSequenceNameFunction;
+import org.apache.druid.indexing.common.task.CachingLocalSegmentAllocator;
import org.apache.druid.indexing.common.task.SegmentAllocators;
import org.apache.druid.indexing.common.task.SequenceNameFunction;
import org.apache.druid.indexing.common.task.SupervisorTaskAccessWithNullClient;
import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.granularity.NoneGranularity;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.SegmentId;
-import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.apache.druid.timeline.partition.HashBucketShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
@@ -70,7 +71,7 @@ public class HashPartitionCachingLocalSegmentAllocatorTest
Collections.singletonList(DIMENSION)
);
- private CachingSegmentAllocator target;
+ private SegmentAllocator target;
private SequenceNameFunction sequenceNameFunction;
@Before
@@ -83,11 +84,11 @@ public class HashPartitionCachingLocalSegmentAllocatorTest
toolbox,
DATASOURCE,
TASKID,
- new NoneGranularity(),
+ new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, ImmutableList.of()),
new SupervisorTaskAccessWithNullClient(SUPERVISOR_TASKID),
partitionAnalysis
);
- sequenceNameFunction = new NonLinearlyPartitionedSequenceNameFunction(TASKID, target.getShardSpecs());
+ sequenceNameFunction = ((CachingLocalSegmentAllocator) target).getSequenceNameFunction();
}
@Test
@@ -102,10 +103,10 @@ public class HashPartitionCachingLocalSegmentAllocatorTest
SegmentId.of(DATASOURCE, INTERVAL, VERSION, PARTITION_NUM),
segmentIdWithShardSpec.asSegmentId()
);
- HashBasedNumberedShardSpec shardSpec = (HashBasedNumberedShardSpec) segmentIdWithShardSpec.getShardSpec();
+ HashBucketShardSpec shardSpec = (HashBucketShardSpec) segmentIdWithShardSpec.getShardSpec();
Assert.assertEquals(PARTITION_DIMENSIONS, shardSpec.getPartitionDimensions());
- Assert.assertEquals(NUM_PARTITONS, shardSpec.getPartitions());
- Assert.assertEquals(PARTITION_NUM, shardSpec.getPartitionNum());
+ Assert.assertEquals(NUM_PARTITONS, shardSpec.getNumBuckets());
+ Assert.assertEquals(PARTITION_NUM, shardSpec.getBucketId());
}
@Test
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
index bcd3cbe..7fcabd0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
@@ -75,24 +75,31 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
false,
0
);
- private static final int MAX_NUM_CONCURRENT_SUB_TASKS = 2;
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
- new Object[]{LockGranularity.TIME_CHUNK, false},
- new Object[]{LockGranularity.TIME_CHUNK, true},
- new Object[]{LockGranularity.SEGMENT, true}
+ new Object[]{LockGranularity.TIME_CHUNK, false, 2},
+ new Object[]{LockGranularity.TIME_CHUNK, true, 2},
+ new Object[]{LockGranularity.TIME_CHUNK, true, 1},
+ new Object[]{LockGranularity.SEGMENT, true, 2}
);
}
+ private final int maxNumConcurrentSubTasks;
+
private File inputDir;
- public HashPartitionMultiPhaseParallelIndexingTest(LockGranularity lockGranularity, boolean useInputFormatApi)
+ public HashPartitionMultiPhaseParallelIndexingTest(
+ LockGranularity lockGranularity,
+ boolean useInputFormatApi,
+ int maxNumConcurrentSubTasks
+ )
{
super(lockGranularity, useInputFormatApi);
+ this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
}
@Before
@@ -132,7 +139,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
inputDir,
"test_*",
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
- MAX_NUM_CONCURRENT_SUB_TASKS,
+ maxNumConcurrentSubTasks,
TaskState.SUCCESS
);
} else {
@@ -145,7 +152,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
inputDir,
"test_*",
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
- MAX_NUM_CONCURRENT_SUB_TASKS,
+ maxNumConcurrentSubTasks,
TaskState.SUCCESS
);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionStatTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionStatTest.java
deleted file mode 100644
index 1eb6f86..0000000
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionStatTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.batch.parallel;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.segment.TestHelper;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class HashPartitionStatTest
-{
- private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper();
-
- private HashPartitionStat target;
-
- @Before
- public void setup()
- {
- target = new HashPartitionStat(
- ParallelIndexTestingFactory.TASK_EXECUTOR_HOST,
- ParallelIndexTestingFactory.TASK_EXECUTOR_PORT,
- ParallelIndexTestingFactory.USE_HTTPS,
- ParallelIndexTestingFactory.INTERVAL,
- ParallelIndexTestingFactory.PARTITION_ID,
- ParallelIndexTestingFactory.NUM_ROWS,
- ParallelIndexTestingFactory.SIZE_BYTES
- );
- }
-
- @Test
- public void serializesDeserializes()
- {
- TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
- }
-
- @Test
- public void hasPartitionIdThatMatchesSecondaryPartition()
- {
- Assert.assertEquals(target.getSecondaryPartition().intValue(), target.getPartitionId());
- }
-}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClientTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClientTest.java
index 45df76d..3ddb63b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClientTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClientTest.java
@@ -203,7 +203,7 @@ public class HttpShuffleClientTest
}
@Override
- int getPartitionId()
+ int getBucketId()
{
return getSecondaryPartition();
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index 88dac06..9f8a07d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -677,6 +677,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
.getGivenOrDefaultPartitionsSpec();
final SegmentAllocator segmentAllocator = SegmentAllocators.forLinearPartitioning(
toolbox,
+ getId(),
new SupervisorTaskAccess(getSupervisorTaskId(), taskClient),
getIngestionSchema().getDataSchema(),
getTaskLockHelper(),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index b882aac..3ae79a3 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -19,9 +19,11 @@
package org.apache.druid.indexing.common.task.batch.parallel;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
import org.hamcrest.Matchers;
import org.joda.time.Interval;
import org.junit.Assert;
@@ -45,8 +47,8 @@ public class ParallelIndexSupervisorTaskTest
public static class CreateMergeIoConfigsTest
{
private static final int TOTAL_NUM_MERGE_TASKS = 10;
- private static final Function<List<HashPartitionLocation>, PartialHashSegmentMergeIOConfig>
- CREATE_PARTIAL_SEGMENT_MERGE_IO_CONFIG = PartialHashSegmentMergeIOConfig::new;
+ private static final Function<List<GenericPartitionLocation>, PartialGenericSegmentMergeIOConfig>
+ CREATE_PARTIAL_SEGMENT_MERGE_IO_CONFIG = PartialGenericSegmentMergeIOConfig::new;
@Parameterized.Parameters(name = "count = {0}")
public static Iterable<? extends Object> data()
@@ -66,14 +68,14 @@ public class ParallelIndexSupervisorTaskTest
@Test
public void handlesLastPartitionCorrectly()
{
- List<PartialHashSegmentMergeIOConfig> assignedPartitionLocation = createMergeIOConfigs();
+ List<PartialGenericSegmentMergeIOConfig> assignedPartitionLocation = createMergeIOConfigs();
assertNoMissingPartitions(count, assignedPartitionLocation);
}
@Test
public void sizesPartitionsEvenly()
{
- List<PartialHashSegmentMergeIOConfig> assignedPartitionLocation = createMergeIOConfigs();
+ List<PartialGenericSegmentMergeIOConfig> assignedPartitionLocation = createMergeIOConfigs();
List<Integer> actualPartitionSizes = assignedPartitionLocation.stream()
.map(i -> i.getPartitionLocations().size())
.collect(Collectors.toList());
@@ -89,7 +91,7 @@ public class ParallelIndexSupervisorTaskTest
);
}
- private List<PartialHashSegmentMergeIOConfig> createMergeIOConfigs()
+ private List<PartialGenericSegmentMergeIOConfig> createMergeIOConfigs()
{
return ParallelIndexSupervisorTask.createMergeIOConfigs(
TOTAL_NUM_MERGE_TASKS,
@@ -98,7 +100,7 @@ public class ParallelIndexSupervisorTaskTest
);
}
- private static Map<Pair<Interval, Integer>, List<HashPartitionLocation>> createPartitionToLocations(int count)
+ private static Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> createPartitionToLocations(int count)
{
return IntStream.range(0, count).boxed().collect(
Collectors.toMap(
@@ -108,15 +110,15 @@ public class ParallelIndexSupervisorTaskTest
);
}
- private static HashPartitionLocation createPartitionLocation(int id)
+ private static GenericPartitionLocation createPartitionLocation(int id)
{
- return new HashPartitionLocation(
+ return new GenericPartitionLocation(
"host",
0,
false,
"subTaskId",
createInterval(id),
- id
+ new BuildingHashBasedNumberedShardSpec(id, id, id + 1, null, new ObjectMapper())
);
}
@@ -127,7 +129,7 @@ public class ParallelIndexSupervisorTaskTest
private static void assertNoMissingPartitions(
int count,
- List<PartialHashSegmentMergeIOConfig> assignedPartitionLocation
+ List<PartialGenericSegmentMergeIOConfig> assignedPartitionLocation
)
{
List<Integer> expectedIds = IntStream.range(0, count).boxed().collect(Collectors.toList());
@@ -136,7 +138,7 @@ public class ParallelIndexSupervisorTaskTest
.flatMap(
i -> i.getPartitionLocations()
.stream()
- .map(HashPartitionLocation::getPartitionId)
+ .map(GenericPartitionLocation::getBucketId)
)
.sorted()
.collect(Collectors.toList());
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
index 08dd92c..de7ee8b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
@@ -44,7 +44,7 @@ import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.transform.TransformSpec;
-import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.joda.time.Interval;
@@ -100,7 +100,8 @@ class ParallelIndexTestingFactory
private static final String SCHEMA_DIMENSION = "dim";
private static final String DATASOURCE = "datasource";
- static final HashBasedNumberedShardSpec HASH_BASED_NUMBERED_SHARD_SPEC = new HashBasedNumberedShardSpec(
+ static final BuildingHashBasedNumberedShardSpec HASH_BASED_NUMBERED_SHARD_SPEC = new BuildingHashBasedNumberedShardSpec(
+ PARTITION_ID,
PARTITION_ID,
PARTITION_ID + 1,
Collections.singletonList("dim"),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeIOConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeIOConfigTest.java
deleted file mode 100644
index 413c34d..0000000
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeIOConfigTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.batch.parallel;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.segment.TestHelper;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Collections;
-
-public class PartialHashSegmentMergeIOConfigTest
-{
- private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper();
- private static final HashPartitionLocation HASH_PARTITION_LOCATION = new HashPartitionLocation(
- ParallelIndexTestingFactory.HOST,
- ParallelIndexTestingFactory.PORT,
- ParallelIndexTestingFactory.USE_HTTPS,
- ParallelIndexTestingFactory.SUBTASK_ID,
- ParallelIndexTestingFactory.INTERVAL,
- ParallelIndexTestingFactory.PARTITION_ID
- );
-
- private PartialHashSegmentMergeIOConfig target;
-
- @Before
- public void setup()
- {
- target = new PartialHashSegmentMergeIOConfig(Collections.singletonList(HASH_PARTITION_LOCATION));
- }
-
- @Test
- public void serializesDeserializes()
- {
- TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
- }
-}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeIngestionSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeIngestionSpecTest.java
deleted file mode 100644
index d734739..0000000
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeIngestionSpecTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.batch.parallel;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
-import org.apache.druid.segment.TestHelper;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Collections;
-
-public class PartialHashSegmentMergeIngestionSpecTest
-{
- private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper();
- private static final HashPartitionLocation HASH_PARTITION_LOCATION = new HashPartitionLocation(
- ParallelIndexTestingFactory.HOST,
- ParallelIndexTestingFactory.PORT,
- ParallelIndexTestingFactory.USE_HTTPS,
- ParallelIndexTestingFactory.SUBTASK_ID,
- ParallelIndexTestingFactory.INTERVAL,
- ParallelIndexTestingFactory.PARTITION_ID
- );
- private static final PartialHashSegmentMergeIOConfig IO_CONFIG =
- new PartialHashSegmentMergeIOConfig(Collections.singletonList(HASH_PARTITION_LOCATION));
- private static final HashedPartitionsSpec PARTITIONS_SPEC = new HashedPartitionsSpec(
- null,
- 1,
- Collections.emptyList()
- );
-
- private PartialHashSegmentMergeIngestionSpec target;
-
- @Before
- public void setup()
- {
- target = new PartialHashSegmentMergeIngestionSpec(
- ParallelIndexTestingFactory.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS),
- IO_CONFIG,
- new ParallelIndexTestingFactory.TuningConfigBuilder()
- .partitionsSpec(PARTITIONS_SPEC)
- .build()
- );
- }
-
- @Test
- public void serializesDeserializes()
- {
- TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
- }
-}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeTaskTest.java
deleted file mode 100644
index d6fe0bb..0000000
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeTaskTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.batch.parallel;
-
-import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
-import org.apache.druid.segment.TestHelper;
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Collections;
-
-public class PartialHashSegmentMergeTaskTest extends AbstractParallelIndexSupervisorTaskTest
-{
- private static final HashPartitionLocation HASH_PARTITION_LOCATION = new HashPartitionLocation(
- ParallelIndexTestingFactory.HOST,
- ParallelIndexTestingFactory.PORT,
- ParallelIndexTestingFactory.USE_HTTPS,
- ParallelIndexTestingFactory.SUBTASK_ID,
- ParallelIndexTestingFactory.INTERVAL,
- ParallelIndexTestingFactory.PARTITION_ID
- );
- private static final PartialHashSegmentMergeIOConfig IO_CONFIG =
- new PartialHashSegmentMergeIOConfig(Collections.singletonList(HASH_PARTITION_LOCATION));
- private static final HashedPartitionsSpec PARTITIONS_SPEC = new HashedPartitionsSpec(
- null,
- 1,
- Collections.emptyList()
- );
- private static final PartialHashSegmentMergeIngestionSpec INGESTION_SPEC =
- new PartialHashSegmentMergeIngestionSpec(
- ParallelIndexTestingFactory.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS),
- IO_CONFIG,
- new ParallelIndexTestingFactory.TuningConfigBuilder()
- .partitionsSpec(PARTITIONS_SPEC)
- .build()
- );
-
- private PartialHashSegmentMergeTask target;
-
- @Before
- public void setup()
- {
- target = new PartialHashSegmentMergeTask(
- ParallelIndexTestingFactory.AUTOMATIC_ID,
- ParallelIndexTestingFactory.GROUP_ID,
- ParallelIndexTestingFactory.TASK_RESOURCE,
- ParallelIndexTestingFactory.SUPERVISOR_TASK_ID,
- ParallelIndexTestingFactory.NUM_ATTEMPTS,
- INGESTION_SPEC,
- ParallelIndexTestingFactory.CONTEXT,
- ParallelIndexTestingFactory.INDEXING_SERVICE_CLIENT,
- ParallelIndexTestingFactory.TASK_CLIENT_FACTORY,
- ParallelIndexTestingFactory.SHUFFLE_CLIENT
- );
- }
-
- @Test
- public void serializesDeserializes()
- {
- TestHelper.testSerializesDeserializes(getObjectMapper(), target);
- }
-
- @Test
- public void hasCorrectPrefixForAutomaticId()
- {
- String id = target.getId();
- Assert.assertThat(id, Matchers.startsWith(PartialHashSegmentMergeTask.TYPE));
- }
-}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionAdjustingCorePartitionSizeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionAdjustingCorePartitionSizeTest.java
new file mode 100644
index 0000000..2e4aa44
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionAdjustingCorePartitionSizeTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
+import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+@RunWith(Parameterized.class)
+public class RangePartitionAdjustingCorePartitionSizeTest extends AbstractMultiPhaseParallelIndexingTest
+{
+ private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null);
+ private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
+ DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))
+ );
+ private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
+ Arrays.asList("ts", "dim1", "dim2", "val"),
+ null,
+ false,
+ false,
+ 0
+ );
+ private static final Interval INTERVAL_TO_INDEX = Intervals.of("2020-01-01/P1M");
+
+ @Parameterized.Parameters(name = "{0}, maxNumConcurrentSubTasks={1}")
+ public static Iterable<Object[]> constructorFeeder()
+ {
+ return ImmutableList.of(
+ new Object[]{LockGranularity.TIME_CHUNK, 2},
+ new Object[]{LockGranularity.TIME_CHUNK, 1},
+ new Object[]{LockGranularity.SEGMENT, 2}
+ );
+ }
+
+ private final int maxNumConcurrentSubTasks;
+
+ public RangePartitionAdjustingCorePartitionSizeTest(LockGranularity lockGranularity, int maxNumConcurrentSubTasks)
+ {
+ super(lockGranularity, true);
+ this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
+ }
+
+ @Test
+ public void testLessPartitionsThanBuckets() throws IOException
+ {
+ final File inputDir = temporaryFolder.newFolder();
+ for (int i = 0; i < 2; i++) {
+ try (final Writer writer =
+ Files.newBufferedWriter(new File(inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8)) {
+ writer.write(StringUtils.format("2020-01-01T00:00:00,aaa,b1,10\n"));
+ }
+ }
+ for (int i = 0; i < 3; i++) {
+ try (final Writer writer =
+ Files.newBufferedWriter(new File(inputDir, "test_" + (i + 2)).toPath(), StandardCharsets.UTF_8)) {
+ writer.write(StringUtils.format("2020-01-01T00:00:00,zzz,b1,10\n"));
+ }
+ }
+ final DimensionBasedPartitionsSpec partitionsSpec = new SingleDimensionPartitionsSpec(
+ 2,
+ null,
+ "dim1",
+ false
+ );
+ final List<DataSegment> segments = new ArrayList<>(
+ runTestTask(
+ TIMESTAMP_SPEC,
+ DIMENSIONS_SPEC,
+ INPUT_FORMAT,
+ null,
+ INTERVAL_TO_INDEX,
+ inputDir,
+ "test_*",
+ partitionsSpec,
+ maxNumConcurrentSubTasks,
+ TaskState.SUCCESS
+ )
+ );
+ Assert.assertEquals(1, segments.size());
+ final DataSegment segment = segments.get(0);
+ Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
+ final SingleDimensionShardSpec shardSpec = (SingleDimensionShardSpec) segment.getShardSpec();
+ Assert.assertEquals(1, shardSpec.getNumCorePartitions());
+ Assert.assertEquals(0, shardSpec.getPartitionNum());
+ Assert.assertEquals("dim1", shardSpec.getDimension());
+ }
+
+ @Test
+ public void testEqualNumberOfPartitionsToBuckets() throws IOException
+ {
+ final File inputDir = temporaryFolder.newFolder();
+ for (int i = 0; i < 10; i++) {
+ try (final Writer writer =
+ Files.newBufferedWriter(new File(inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8)) {
+ writer.write(StringUtils.format("2020-01-01T00:00:00,%s,b1,%d\n", "aa" + (i + 10), 10 * (i + 1)));
+ }
+ }
+ final DimensionBasedPartitionsSpec partitionsSpec = new SingleDimensionPartitionsSpec(
+ 2,
+ null,
+ "dim1",
+ false
+ );
+ final Set<DataSegment> segments = runTestTask(
+ TIMESTAMP_SPEC,
+ DIMENSIONS_SPEC,
+ INPUT_FORMAT,
+ null,
+ INTERVAL_TO_INDEX,
+ inputDir,
+ "test_*",
+ partitionsSpec,
+ maxNumConcurrentSubTasks,
+ TaskState.SUCCESS
+ );
+ Assert.assertEquals(5, segments.size());
+ segments.forEach(segment -> {
+ Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
+ final SingleDimensionShardSpec shardSpec = (SingleDimensionShardSpec) segment.getShardSpec();
+ Assert.assertEquals(5, shardSpec.getNumCorePartitions());
+ Assert.assertTrue(shardSpec.getPartitionNum() < shardSpec.getNumCorePartitions());
+ Assert.assertEquals("dim1", shardSpec.getDimension());
+ });
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index e1e21a5..4abb539 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -221,14 +221,14 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
for (DataSegment segment : segmentsPerInterval) {
Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec();
- Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getPartitions());
+ Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getNumCorePartitions());
}
}
} else {
for (DataSegment segment : segments) {
Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec();
- Assert.assertEquals(0, shardSpec.getPartitions());
+ Assert.assertEquals(0, shardSpec.getNumCorePartitions());
}
}
}
@@ -248,7 +248,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
for (DataSegment segment : segmentsPerInterval) {
Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec();
- Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getPartitions());
+ Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getNumCorePartitions());
}
}
} else {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 00f76a4..ae8fc3b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -971,8 +971,8 @@ public class TaskLockboxTest
final Task task = NoopTask.create();
lockbox.add(task);
- allocateSegmentsAndAssert(task, "seq", 3, new HashBasedNumberedPartialShardSpec(null, 3));
- allocateSegmentsAndAssert(task, "seq2", 5, new HashBasedNumberedPartialShardSpec(null, 5));
+ allocateSegmentsAndAssert(task, "seq", 3, new HashBasedNumberedPartialShardSpec(null, 1, 3));
+ allocateSegmentsAndAssert(task, "seq2", 5, new HashBasedNumberedPartialShardSpec(null, 3, 5));
}
private void allocateSegmentsAndAssert(
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
index 7309e7c..5c64dcd 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
@@ -25,7 +25,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmentMergeTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask;
-import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentMergeTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialRangeSegmentGenerateTask;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTask;
import org.apache.druid.java.util.common.ISE;
@@ -312,7 +311,6 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest
return t.getType().equals(SinglePhaseSubTask.TYPE);
} else {
return t.getType().equalsIgnoreCase(PartialHashSegmentGenerateTask.TYPE)
- || t.getType().equalsIgnoreCase(PartialHashSegmentMergeTask.TYPE)
|| t.getType().equalsIgnoreCase(PartialDimensionDistributionTask.TYPE)
|| t.getType().equalsIgnoreCase(PartialRangeSegmentGenerateTask.TYPE)
|| t.getType().equalsIgnoreCase(PartialGenericSegmentMergeTask.TYPE);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelper.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelper.java
index 76093be..ec4a65a 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelper.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelper.java
@@ -21,7 +21,8 @@ package org.apache.druid.segment.realtime.appenderator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.BuildingNumberedShardSpec;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
+import org.apache.druid.timeline.partition.BuildingShardSpec;
import org.apache.druid.timeline.partition.OverwriteShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
@@ -42,7 +43,7 @@ public final class SegmentPublisherHelper
* This method fills missing information in the shard spec if necessary when publishing segments.
*
* - When time chunk lock is used, the non-appending task should set the proper size of the core partitions for
- * dynamically-partitioned segments. See {@link #annotateNumberedShardSpecFn}.
+ * dynamically-partitioned segments. See {@link #annotateCorePartitionSetSizeFn}.
* - When segment lock is used, the overwriting task should set the proper size of the atomic update group.
* See {@link #annotateAtomicUpdateGroupFn}.
*/
@@ -70,8 +71,10 @@ public final class SegmentPublisherHelper
final Function<DataSegment, DataSegment> annotateFn;
if (firstShardSpec instanceof OverwriteShardSpec) {
annotateFn = annotateAtomicUpdateGroupFn(segmentsPerInterval.size());
- } else if (firstShardSpec instanceof BuildingNumberedShardSpec) {
- annotateFn = annotateNumberedShardSpecFn(segmentsPerInterval.size());
+ } else if (firstShardSpec instanceof BuildingShardSpec) {
+ annotateFn = annotateCorePartitionSetSizeFn(segmentsPerInterval.size());
+ } else if (firstShardSpec instanceof BucketNumberedShardSpec) {
+ throw new ISE("Cannot publish segments with shardSpec[%s]", firstShardSpec);
} else {
annotateFn = null;
}
@@ -93,11 +96,11 @@ public final class SegmentPublisherHelper
};
}
- private static Function<DataSegment, DataSegment> annotateNumberedShardSpecFn(int corePartitionSetSize)
+ private static Function<DataSegment, DataSegment> annotateCorePartitionSetSizeFn(int corePartitionSetSize)
{
return segment -> {
- final BuildingNumberedShardSpec shardSpec = (BuildingNumberedShardSpec) segment.getShardSpec();
- return segment.withShardSpec(shardSpec.toNumberedShardSpec(corePartitionSetSize));
+ final BuildingShardSpec<?> shardSpec = (BuildingShardSpec<?>) segment.getShardSpec();
+ return segment.withShardSpec(shardSpec.convert(corePartitionSetSize));
};
}
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index ed6ecec..30fb765 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -125,10 +125,10 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.apache.druid.timeline.partition.NumberedPartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.apache.druid.timeline.partition.SingleElementPartitionChunk;
-import org.apache.druid.timeline.partition.StringPartitionChunk;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
@@ -1486,19 +1486,19 @@ public class CachingClusteredClientTest
QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest());
final DruidServer lastServer = servers[random.nextInt(servers.length)];
- ServerSelector selector1 = makeMockSingleDimensionSelector(lastServer, "dim1", null, "b", 1);
- ServerSelector selector2 = makeMockSingleDimensionSelector(lastServer, "dim1", "e", "f", 2);
- ServerSelector selector3 = makeMockSingleDimensionSelector(lastServer, "dim1", "hi", "zzz", 3);
- ServerSelector selector4 = makeMockSingleDimensionSelector(lastServer, "dim2", "a", "e", 4);
- ServerSelector selector5 = makeMockSingleDimensionSelector(lastServer, "dim2", null, null, 5);
- ServerSelector selector6 = makeMockSingleDimensionSelector(lastServer, "other", "b", null, 6);
-
- timeline.add(interval1, "v", new StringPartitionChunk<>(null, "a", 1, selector1));
- timeline.add(interval1, "v", new StringPartitionChunk<>("a", "b", 2, selector2));
- timeline.add(interval1, "v", new StringPartitionChunk<>("b", null, 3, selector3));
- timeline.add(interval2, "v", new StringPartitionChunk<>(null, "d", 4, selector4));
- timeline.add(interval2, "v", new StringPartitionChunk<>("d", null, 5, selector5));
- timeline.add(interval3, "v", new StringPartitionChunk<>(null, null, 6, selector6));
+ ServerSelector selector1 = makeMockSingleDimensionSelector(lastServer, "dim1", null, "b", 0);
+ ServerSelector selector2 = makeMockSingleDimensionSelector(lastServer, "dim1", "e", "f", 1);
+ ServerSelector selector3 = makeMockSingleDimensionSelector(lastServer, "dim1", "hi", "zzz", 2);
+ ServerSelector selector4 = makeMockSingleDimensionSelector(lastServer, "dim2", "a", "e", 0);
+ ServerSelector selector5 = makeMockSingleDimensionSelector(lastServer, "dim2", null, null, 1);
+ ServerSelector selector6 = makeMockSingleDimensionSelector(lastServer, "other", "b", null, 0);
+
+ timeline.add(interval1, "v", new NumberedPartitionChunk<>(0, 3, selector1));
+ timeline.add(interval1, "v", new NumberedPartitionChunk<>(1, 3, selector2));
+ timeline.add(interval1, "v", new NumberedPartitionChunk<>(2, 3, selector3));
+ timeline.add(interval2, "v", new NumberedPartitionChunk<>(0, 2, selector4));
+ timeline.add(interval2, "v", new NumberedPartitionChunk<>(1, 2, selector5));
+ timeline.add(interval3, "v", new NumberedPartitionChunk<>(0, 1, selector6));
final Capture<QueryPlus> capture = Capture.newInstance();
final Capture<ResponseContext> contextCap = Capture.newInstance();
@@ -1514,10 +1514,10 @@ public class CachingClusteredClientTest
EasyMock.replay(mockRunner);
List<SegmentDescriptor> descriptors = new ArrayList<>();
- descriptors.add(new SegmentDescriptor(interval1, "v", 1));
- descriptors.add(new SegmentDescriptor(interval1, "v", 3));
- descriptors.add(new SegmentDescriptor(interval2, "v", 5));
- descriptors.add(new SegmentDescriptor(interval3, "v", 6));
+ descriptors.add(new SegmentDescriptor(interval1, "v", 0));
+ descriptors.add(new SegmentDescriptor(interval1, "v", 2));
+ descriptors.add(new SegmentDescriptor(interval2, "v", 1));
+ descriptors.add(new SegmentDescriptor(interval3, "v", 0));
MultipleSpecificSegmentSpec expected = new MultipleSpecificSegmentSpec(descriptors);
runner.run(QueryPlus.wrap(query)).toList();
@@ -1538,7 +1538,13 @@ public class CachingClusteredClientTest
null,
null,
null,
- new SingleDimensionShardSpec(dimension, start, end, partitionNum),
+ new SingleDimensionShardSpec(
+ dimension,
+ start,
+ end,
+ partitionNum,
+ SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS
+ ),
null,
9,
0L
@@ -1966,7 +1972,7 @@ public class CachingClusteredClientTest
final ShardSpec shardSpec;
if (numChunks == 1) {
- shardSpec = new SingleDimensionShardSpec("dimAll", null, null, 0);
+ shardSpec = new SingleDimensionShardSpec("dimAll", null, null, 0, 1);
} else {
String start = null;
String end = null;
@@ -1976,7 +1982,7 @@ public class CachingClusteredClientTest
if (j + 1 < numChunks) {
end = String.valueOf(j + 1);
}
- shardSpec = new SingleDimensionShardSpec("dim" + k, start, end, j);
+ shardSpec = new SingleDimensionShardSpec("dim" + k, start, end, j, numChunks);
}
DataSegment mockSegment = makeMock(mocks, DataSegment.class);
ServerExpectation<Object> expectation = new ServerExpectation<>(
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 5eb34ffb..bd8e5ef 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -1032,7 +1032,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testAllocatePendingSegmentsForHashBasedNumberedShardSpec() throws IOException
{
- final PartialShardSpec partialShardSpec = new HashBasedNumberedPartialShardSpec(null, 5);
+ final PartialShardSpec partialShardSpec = new HashBasedNumberedPartialShardSpec(null, 2, 5);
final String dataSource = "ds";
final Interval interval = Intervals.of("2017-01-01/2017-02-01");
@@ -1048,7 +1048,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
HashBasedNumberedShardSpec shardSpec = (HashBasedNumberedShardSpec) id.getShardSpec();
Assert.assertEquals(0, shardSpec.getPartitionNum());
- Assert.assertEquals(5, shardSpec.getPartitions());
+ Assert.assertEquals(0, shardSpec.getNumCorePartitions());
+ Assert.assertEquals(5, shardSpec.getNumBuckets());
coordinator.announceHistoricalSegments(
Collections.singleton(
@@ -1078,7 +1079,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
shardSpec = (HashBasedNumberedShardSpec) id.getShardSpec();
Assert.assertEquals(1, shardSpec.getPartitionNum());
- Assert.assertEquals(5, shardSpec.getPartitions());
+ Assert.assertEquals(0, shardSpec.getNumCorePartitions());
+ Assert.assertEquals(5, shardSpec.getNumBuckets());
coordinator.announceHistoricalSegments(
Collections.singleton(
@@ -1101,13 +1103,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
"seq3",
null,
interval,
- new HashBasedNumberedPartialShardSpec(null, 3),
+ new HashBasedNumberedPartialShardSpec(null, 2, 3),
"version",
true
);
shardSpec = (HashBasedNumberedShardSpec) id.getShardSpec();
Assert.assertEquals(2, shardSpec.getPartitionNum());
- Assert.assertEquals(3, shardSpec.getPartitions());
+ Assert.assertEquals(0, shardSpec.getNumCorePartitions());
+ Assert.assertEquals(3, shardSpec.getNumBuckets());
}
}
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentIdWithShardSpecTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentIdWithShardSpecTest.java
index 55ff589..5587369 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentIdWithShardSpecTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentIdWithShardSpecTest.java
@@ -53,7 +53,7 @@ public class SegmentIdWithShardSpecTest
Assert.assertEquals(INTERVAL, id2.getInterval());
Assert.assertEquals(VERSION, id2.getVersion());
Assert.assertEquals(SHARD_SPEC_1.getPartitionNum(), id2.getShardSpec().getPartitionNum());
- Assert.assertEquals(SHARD_SPEC_1.getPartitions(), ((NumberedShardSpec) id2.getShardSpec()).getPartitions());
+ Assert.assertEquals(SHARD_SPEC_1.getNumCorePartitions(), ((NumberedShardSpec) id2.getShardSpec()).getNumCorePartitions());
}
@Test
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelperTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelperTest.java
new file mode 100644
index 0000000..1f2af72
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelperTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
+import org.apache.druid.timeline.partition.BuildingNumberedShardSpec;
+import org.apache.druid.timeline.partition.BuildingSingleDimensionShardSpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.apache.druid.timeline.partition.HashBucketShardSpec;
+import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.timeline.partition.PartitionIds;
+import org.apache.druid.timeline.partition.ShardSpec;
+import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Set;
+
+public class SegmentPublisherHelperTest
+{
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testAnnotateAtomicUpdateGroupSize()
+ {
+ final Set<DataSegment> segments = ImmutableSet.of(
+ newSegment(
+ new NumberedOverwriteShardSpec(
+ PartitionIds.NON_ROOT_GEN_START_PARTITION_ID,
+ 0,
+ 3,
+ (short) 1
+ )
+ ),
+ newSegment(
+ new NumberedOverwriteShardSpec(
+ PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + 1,
+ 0,
+ 3,
+ (short) 1
+ )
+ ),
+ newSegment(
+ new NumberedOverwriteShardSpec(
+ PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + 2,
+ 0,
+ 3,
+ (short) 1
+ )
+ )
+ );
+ final Set<DataSegment> annotated = SegmentPublisherHelper.annotateShardSpec(segments);
+ for (DataSegment segment : annotated) {
+ Assert.assertSame(NumberedOverwriteShardSpec.class, segment.getShardSpec().getClass());
+ final NumberedOverwriteShardSpec shardSpec = (NumberedOverwriteShardSpec) segment.getShardSpec();
+ Assert.assertEquals(3, shardSpec.getAtomicUpdateGroupSize());
+ }
+ }
+
+ @Test
+ public void testAnnotateCorePartitionSetSizeForNumberedShardSpec()
+ {
+ final Set<DataSegment> segments = ImmutableSet.of(
+ newSegment(new BuildingNumberedShardSpec(0)),
+ newSegment(new BuildingNumberedShardSpec(1)),
+ newSegment(new BuildingNumberedShardSpec(2))
+ );
+ final Set<DataSegment> annotated = SegmentPublisherHelper.annotateShardSpec(segments);
+ for (DataSegment segment : annotated) {
+ Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
+ final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec();
+ Assert.assertEquals(3, shardSpec.getNumCorePartitions());
+ }
+ }
+
+ @Test
+ public void testAnnotateCorePartitionSetSizeForHashNumberedShardSpec()
+ {
+ final Set<DataSegment> segments = ImmutableSet.of(
+ newSegment(new BuildingHashBasedNumberedShardSpec(0, 0, 3, null, new ObjectMapper())),
+ newSegment(new BuildingHashBasedNumberedShardSpec(1, 1, 3, null, new ObjectMapper())),
+ newSegment(new BuildingHashBasedNumberedShardSpec(2, 2, 3, null, new ObjectMapper()))
+ );
+ final Set<DataSegment> annotated = SegmentPublisherHelper.annotateShardSpec(segments);
+ for (DataSegment segment : annotated) {
+ Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
+ final HashBasedNumberedShardSpec shardSpec = (HashBasedNumberedShardSpec) segment.getShardSpec();
+ Assert.assertEquals(3, shardSpec.getNumCorePartitions());
+ }
+ }
+
+ @Test
+ public void testAnnotateCorePartitionSetSizeForSingleDimensionShardSpec()
+ {
+ final Set<DataSegment> segments = ImmutableSet.of(
+ newSegment(new BuildingSingleDimensionShardSpec(0, "dim", null, "ccc", 0)),
+ newSegment(new BuildingSingleDimensionShardSpec(1, "dim", null, "ccc", 1)),
+ newSegment(new BuildingSingleDimensionShardSpec(2, "dim", null, "ccc", 2))
+ );
+ final Set<DataSegment> annotated = SegmentPublisherHelper.annotateShardSpec(segments);
+ for (DataSegment segment : annotated) {
+ Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
+ final SingleDimensionShardSpec shardSpec = (SingleDimensionShardSpec) segment.getShardSpec();
+ Assert.assertEquals(3, shardSpec.getNumCorePartitions());
+ }
+ }
+
+ @Test
+ public void testAnnotateShardSpecDoNothing()
+ {
+ final Set<DataSegment> segments = ImmutableSet.of(
+ newSegment(new NumberedShardSpec(0, 0)),
+ newSegment(new NumberedShardSpec(1, 0)),
+ newSegment(new NumberedShardSpec(2, 0))
+ );
+ final Set<DataSegment> annotated = SegmentPublisherHelper.annotateShardSpec(segments);
+ Assert.assertEquals(segments, annotated);
+ }
+
+ @Test
+ public void testAnnotateShardSpecThrowingExceptionForBucketNumberedShardSpec()
+ {
+ final Set<DataSegment> segments = ImmutableSet.of(
+ newSegment(new HashBucketShardSpec(0, 3, null, new ObjectMapper())),
+ newSegment(new HashBucketShardSpec(1, 3, null, new ObjectMapper())),
+ newSegment(new HashBucketShardSpec(2, 3, null, new ObjectMapper()))
+ );
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Cannot publish segments with shardSpec");
+ SegmentPublisherHelper.annotateShardSpec(segments);
+ }
+
+ private static DataSegment newSegment(ShardSpec shardSpec)
+ {
+ return new DataSegment(
+ "datasource",
+ Intervals.of("2020-01-01/P1d"),
+ "version",
+ null,
+ ImmutableList.of("dim"),
+ ImmutableList.of("met"),
+ shardSpec,
+ 9,
+ 10L
+ );
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/shard/NumberedShardSpecTest.java b/server/src/test/java/org/apache/druid/server/shard/NumberedShardSpecTest.java
index 0cbfc3c..e94b806 100644
--- a/server/src/test/java/org/apache/druid/server/shard/NumberedShardSpecTest.java
+++ b/server/src/test/java/org/apache/druid/server/shard/NumberedShardSpecTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
+import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.server.ServerTestHelper;
import org.apache.druid.timeline.Overshadowable;
@@ -46,6 +47,12 @@ import java.util.Set;
public class NumberedShardSpecTest
{
@Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(NumberedShardSpec.class).usingGetClass().verify();
+ }
+
+ @Test
public void testSerdeRoundTrip() throws Exception
{
final ShardSpec spec = ServerTestHelper.MAPPER.readValue(
@@ -53,7 +60,7 @@ public class NumberedShardSpecTest
ShardSpec.class
);
Assert.assertEquals(1, spec.getPartitionNum());
- Assert.assertEquals(2, ((NumberedShardSpec) spec).getPartitions());
+ Assert.assertEquals(2, ((NumberedShardSpec) spec).getNumCorePartitions());
}
@Test
@@ -64,7 +71,7 @@ public class NumberedShardSpecTest
ShardSpec.class
);
Assert.assertEquals(1, spec.getPartitionNum());
- Assert.assertEquals(2, ((NumberedShardSpec) spec).getPartitions());
+ Assert.assertEquals(2, ((NumberedShardSpec) spec).getNumCorePartitions());
}
@Test
diff --git a/server/src/test/java/org/apache/druid/server/shard/SingleDimensionShardSpecTest.java b/server/src/test/java/org/apache/druid/server/shard/SingleDimensionShardSpecTest.java
index 2722179..0787cf2 100644
--- a/server/src/test/java/org/apache/druid/server/shard/SingleDimensionShardSpecTest.java
+++ b/server/src/test/java/org/apache/druid/server/shard/SingleDimensionShardSpecTest.java
@@ -158,7 +158,7 @@ public class SingleDimensionShardSpecTest
private SingleDimensionShardSpec makeSpec(String dimension, String start, String end)
{
- return new SingleDimensionShardSpec(dimension, start, end, 0);
+ return new SingleDimensionShardSpec(dimension, start, end, 0, SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS);
}
private Map<String, String> makeMap(String value)
diff --git a/server/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpecTest.java b/server/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpecTest.java
index 9ac4d27..ecb1102 100644
--- a/server/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpecTest.java
+++ b/server/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpecTest.java
@@ -19,10 +19,12 @@
package org.apache.druid.timeline.partition;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.Row;
@@ -42,6 +44,16 @@ import java.util.stream.IntStream;
public class HashBasedNumberedShardSpecTest
{
@Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(HashBasedNumberedShardSpec.class)
+ .withIgnoredFields("jsonMapper")
+ .withPrefabValues(ObjectMapper.class, new ObjectMapper(), new ObjectMapper())
+ .usingGetClass()
+ .verify();
+ }
+
+ @Test
public void testSerdeRoundTrip() throws Exception
{
@@ -50,6 +62,8 @@ public class HashBasedNumberedShardSpecTest
new HashBasedNumberedShardSpec(
1,
2,
+ 1,
+ 3,
ImmutableList.of("visitor_id"),
ServerTestHelper.MAPPER
)
@@ -57,7 +71,9 @@ public class HashBasedNumberedShardSpecTest
ShardSpec.class
);
Assert.assertEquals(1, spec.getPartitionNum());
- Assert.assertEquals(2, ((HashBasedNumberedShardSpec) spec).getPartitions());
+ Assert.assertEquals(2, spec.getNumCorePartitions());
+ Assert.assertEquals(1, ((HashBasedNumberedShardSpec) spec).getBucketId());
+ Assert.assertEquals(3, ((HashBasedNumberedShardSpec) spec).getNumBuckets());
Assert.assertEquals(ImmutableList.of("visitor_id"), ((HashBasedNumberedShardSpec) spec).getPartitionDimensions());
}
@@ -69,24 +85,28 @@ public class HashBasedNumberedShardSpecTest
ShardSpec.class
);
Assert.assertEquals(1, spec.getPartitionNum());
- Assert.assertEquals(2, ((HashBasedNumberedShardSpec) spec).getPartitions());
+ Assert.assertEquals(2, ((HashBasedNumberedShardSpec) spec).getNumCorePartitions());
final ShardSpec specWithPartitionDimensions = ServerTestHelper.MAPPER.readValue(
"{\"type\": \"hashed\", \"partitions\": 2, \"partitionNum\": 1, \"partitionDimensions\":[\"visitor_id\"]}",
ShardSpec.class
);
Assert.assertEquals(1, specWithPartitionDimensions.getPartitionNum());
- Assert.assertEquals(2, ((HashBasedNumberedShardSpec) specWithPartitionDimensions).getPartitions());
- Assert.assertEquals(ImmutableList.of("visitor_id"), ((HashBasedNumberedShardSpec) specWithPartitionDimensions).getPartitionDimensions());
+ Assert.assertEquals(2, ((HashBasedNumberedShardSpec) specWithPartitionDimensions).getNumCorePartitions());
+ Assert.assertEquals(2, ((HashBasedNumberedShardSpec) specWithPartitionDimensions).getNumBuckets());
+ Assert.assertEquals(
+ ImmutableList.of("visitor_id"),
+ ((HashBasedNumberedShardSpec) specWithPartitionDimensions).getPartitionDimensions()
+ );
}
@Test
public void testPartitionChunks()
{
final List<ShardSpec> specs = ImmutableList.of(
- new HashBasedNumberedShardSpec(0, 3, null, ServerTestHelper.MAPPER),
- new HashBasedNumberedShardSpec(1, 3, null, ServerTestHelper.MAPPER),
- new HashBasedNumberedShardSpec(2, 3, null, ServerTestHelper.MAPPER)
+ new HashBasedNumberedShardSpec(0, 3, 0, 3, null, ServerTestHelper.MAPPER),
+ new HashBasedNumberedShardSpec(1, 3, 1, 3, null, ServerTestHelper.MAPPER),
+ new HashBasedNumberedShardSpec(2, 3, 2, 3, null, ServerTestHelper.MAPPER)
);
final List<PartitionChunk<String>> chunks = Lists.transform(
@@ -157,35 +177,26 @@ public class HashBasedNumberedShardSpecTest
@Test
public void testGetGroupKey()
{
- final HashBasedNumberedShardSpec shardSpec1 = new HashBasedNumberedShardSpec(
- 1,
- 2,
- ImmutableList.of("visitor_id"),
- ServerTestHelper.MAPPER
- );
+ final List<String> partitionDimensions1 = ImmutableList.of("visitor_id");
final DateTime time = DateTimes.nowUtc();
final InputRow inputRow = new MapBasedInputRow(
time,
ImmutableList.of("visitor_id", "cnt"),
ImmutableMap.of("visitor_id", "v1", "cnt", 10)
);
- Assert.assertEquals(ImmutableList.of(Collections.singletonList("v1")), shardSpec1.getGroupKey(time.getMillis(), inputRow));
-
- final HashBasedNumberedShardSpec shardSpec2 = new HashBasedNumberedShardSpec(
- 1,
- 2,
- null,
- ServerTestHelper.MAPPER
+ Assert.assertEquals(
+ ImmutableList.of(Collections.singletonList("v1")),
+ HashBasedNumberedShardSpec.getGroupKey(partitionDimensions1, time.getMillis(), inputRow)
);
- Assert.assertEquals(ImmutableList.of(
+
+ Assert.assertEquals(
+ ImmutableList.of(
time.getMillis(),
- ImmutableMap.of(
- "cnt",
- Collections.singletonList(10),
- "visitor_id",
- Collections.singletonList("v1")
- )
- ).toString(), shardSpec2.getGroupKey(time.getMillis(), inputRow).toString());
+ ImmutableMap.of("cnt", Collections.singletonList(10), "visitor_id", Collections.singletonList("v1")))
+ .toString(),
+ // empty list when partitionDimensions is null
+ HashBasedNumberedShardSpec.getGroupKey(ImmutableList.of(), time.getMillis(), inputRow).toString()
+ );
}
public boolean assertExistsInOneSpec(List<ShardSpec> specs, InputRow row)
@@ -202,7 +213,7 @@ public class HashBasedNumberedShardSpecTest
{
public HashOverridenShardSpec(int partitionNum, int partitions)
{
- super(partitionNum, partitions, null, ServerTestHelper.MAPPER);
+ super(partitionNum, partitions, partitionNum, partitions, null, ServerTestHelper.MAPPER);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org