You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/03/02 06:27:15 UTC
[incubator-druid] branch master updated: Fix two SeekableStream
serde issues. (#7176)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new fa218f5 Fix two SeekableStream serde issues. (#7176)
fa218f5 is described below
commit fa218f516026e5516229205836fc6a0d51001c0b
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Fri Mar 1 22:27:08 2019 -0800
Fix two SeekableStream serde issues. (#7176)
* Fix two SeekableStream serde issues.
1) Fix backwards-compatibility serde for SeekableStreamPartitions. It is needed
for split 0.13 / 0.14 clusters to work properly during a rolling update.
2) Abstract classes don't need JsonCreator constructors; remove them.
* Comment fixes.
---
.../druid/indexing/kafka/KafkaIndexTaskModule.java | 3 +
.../indexing/kafka/KafkaIndexTaskTuningConfig.java | 2 -
.../SeekableStreamDataSourceMetadata.java | 5 +-
.../seekablestream/SeekableStreamIndexTask.java | 32 ++++------
.../SeekableStreamIndexTaskIOConfig.java | 19 +++---
.../SeekableStreamIndexTaskTuningConfig.java | 38 ++++++------
.../seekablestream/SeekableStreamPartitions.java | 36 +++++++++--
.../SeekableStreamPartitionsTest.java | 72 ++++++++++++++++++++++
8 files changed, 145 insertions(+), 62 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java
index 47b5df8..34bc3bc 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java
@@ -44,6 +44,9 @@ public class KafkaIndexTaskModule implements DruidModule
new NamedType(KafkaIndexTask.class, "index_kafka"),
new NamedType(KafkaDataSourceMetadata.class, "kafka"),
new NamedType(KafkaIndexTaskIOConfig.class, "kafka"),
+ // "KafkaTuningConfig" is not the ideal name, but is needed for backwards compatibility.
+ // (Older versions of Druid didn't specify a type name and got this one by default.)
+ new NamedType(KafkaIndexTaskTuningConfig.class, "KafkaTuningConfig"),
new NamedType(KafkaSupervisorTuningConfig.class, "kafka"),
new NamedType(KafkaSupervisorSpec.class, "kafka")
)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
index 7cee877..2104759 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@@ -30,7 +29,6 @@ import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.File;
-@JsonTypeName("KafkaTuningConfig")
public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig
{
@JsonCreator
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java
index 2501048..b9e8d9a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java
@@ -19,7 +19,6 @@
package org.apache.druid.indexing.seekablestream;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.java.util.common.IAE;
@@ -33,9 +32,8 @@ public abstract class SeekableStreamDataSourceMetadata<PartitionIdType, Sequence
{
private final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> seekableStreamPartitions;
- @JsonCreator
public SeekableStreamDataSourceMetadata(
- @JsonProperty("partitions") SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> seekableStreamPartitions
+ SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> seekableStreamPartitions
)
{
this.seekableStreamPartitions = seekableStreamPartitions;
@@ -63,7 +61,6 @@ public abstract class SeekableStreamDataSourceMetadata<PartitionIdType, Sequence
return plus(other).equals(other.plus(this));
}
-
@Override
public DataSourceMetadata plus(DataSourceMetadata other)
{
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index bfabab1..d4659bf 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -19,8 +19,6 @@
package org.apache.druid.indexing.seekablestream;
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
@@ -58,17 +56,15 @@ import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.utils.CircularBuffer;
+import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType extends Comparable> extends AbstractTask
- implements ChatHandler
+public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType extends Comparable>
+ extends AbstractTask implements ChatHandler
{
public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
- private static final Random RANDOM = ThreadLocalRandom.current();
private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTask.class);
private final SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner;
@@ -82,18 +78,17 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
protected final RowIngestionMetersFactory rowIngestionMetersFactory;
protected final CircularBuffer<Throwable> savedParseExceptions;
- @JsonCreator
public SeekableStreamIndexTask(
- @JsonProperty("id") String id,
- @JsonProperty("resource") TaskResource taskResource,
- @JsonProperty("dataSchema") DataSchema dataSchema,
- @JsonProperty("tuningConfig") SeekableStreamIndexTaskTuningConfig tuningConfig,
- @JsonProperty("ioConfig") SeekableStreamIndexTaskIOConfig ioConfig,
- @JsonProperty("context") Map<String, Object> context,
- @JacksonInject ChatHandlerProvider chatHandlerProvider,
- @JacksonInject AuthorizerMapper authorizerMapper,
- @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
- String groupId
+ final String id,
+ @Nullable final TaskResource taskResource,
+ final DataSchema dataSchema,
+ final SeekableStreamIndexTaskTuningConfig tuningConfig,
+ final SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig,
+ @Nullable final Map<String, Object> context,
+ @Nullable final ChatHandlerProvider chatHandlerProvider,
+ final AuthorizerMapper authorizerMapper,
+ final RowIngestionMetersFactory rowIngestionMetersFactory,
+ @Nullable final String groupId
)
{
super(
@@ -119,7 +114,6 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
this.runner = createTaskRunner();
}
-
private static String makeTaskId(String dataSource, String type)
{
final String suffix = RandomIdUtils.getRandomId();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
index dde9702..7e16c35 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
@@ -19,7 +19,6 @@
package org.apache.druid.indexing.seekablestream;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -45,17 +44,15 @@ public abstract class SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
private final Optional<DateTime> maximumMessageTime;
private final Set<PartitionIdType> exclusiveStartSequenceNumberPartitions;
- @JsonCreator
public SeekableStreamIndexTaskIOConfig(
- @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // can be null for backward compabitility
- @JsonProperty("baseSequenceName") String baseSequenceName,
- @JsonProperty("startPartitions") SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> startPartitions,
- @JsonProperty("endPartitions") SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> endPartitions,
- @JsonProperty("useTransaction") Boolean useTransaction,
- @JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
- @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
- @JsonProperty("exclusiveStartSequenceNumberPartitions")
- Set<PartitionIdType> exclusiveStartSequenceNumberPartitions
+ final @Nullable Integer taskGroupId, // can be null for backward compabitility
+ final String baseSequenceName,
+ final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> startPartitions,
+ final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> endPartitions,
+ final Boolean useTransaction,
+ final DateTime minimumMessageTime,
+ final DateTime maximumMessageTime,
+ final Set<PartitionIdType> exclusiveStartSequenceNumberPartitions
)
{
this.taskGroupId = taskGroupId;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
index 3ebfe59..b594e42 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
@@ -19,7 +19,6 @@
package org.apache.druid.indexing.seekablestream;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
@@ -60,27 +59,26 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
private final int maxParseExceptions;
private final int maxSavedParseExceptions;
- @JsonCreator
public SeekableStreamIndexTaskTuningConfig(
- @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
- @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
- @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
- @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
- @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
- @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
- @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
- @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
+ @Nullable Integer maxRowsInMemory,
+ @Nullable Long maxBytesInMemory,
+ @Nullable Integer maxRowsPerSegment,
+ @Nullable Long maxTotalRows,
+ @Nullable Period intermediatePersistPeriod,
+ @Nullable File basePersistDirectory,
+ @Nullable Integer maxPendingPersists,
+ @Nullable IndexSpec indexSpec,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
- @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
- @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
- @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
- @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
- @JsonProperty("skipSequenceNumberAvailabilityCheck") Boolean skipSequenceNumberAvailabilityCheck,
- @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
- @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
- @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
- @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
- @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
+ @Deprecated @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
+ @Deprecated @Nullable Boolean reportParseExceptions,
+ @Nullable Long handoffConditionTimeout,
+ @Nullable Boolean resetOffsetAutomatically,
+ Boolean skipSequenceNumberAvailabilityCheck,
+ @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+ @Nullable Period intermediateHandoffPeriod,
+ @Nullable Boolean logParseExceptions,
+ @Nullable Integer maxParseExceptions,
+ @Nullable Integer maxSavedParseExceptions
)
{
// Cannot be a static because default basePersistDirectory is unique per-instance
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java
index 8c034da..28f5dde 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java
@@ -31,14 +31,13 @@ import java.util.Objects;
/**
* class that encapsulates a partitionIdToSequenceNumberMap of partitionId -> sequenceNumber.
* To be backward compatible with both Kafka and Kinesis datasource metadata when
- * deserializing json. Redundant constructor fields stream, topic and
+ * serializing and deserializing json, redundant constructor fields stream, topic,
* partitionSequenceNumberMap and partitionOffsetMap are created. Only one of topic, stream
* should have a non-null value and only one of partitionOffsetMap and partitionSequenceNumberMap
* should have a non-null value.
- * <p>
- * Redundant getters
- * are used for proper Jackson serialization/deserialization when processing terminologies
- * used by Kafka and kinesis (i.e. topic vs. stream)
+ *
+ * Redundant getters are used for proper Jackson serialization/deserialization when processing terminologies
+ * used by Kafka and Kinesis (i.e. topic vs. stream)
*
* @param <PartitionIdType> partition id type
* @param <SequenceOffsetType> sequence number type
@@ -81,7 +80,12 @@ public class SeekableStreamPartitions<PartitionIdType, SequenceOffsetType>
final Map<PartitionIdType, SequenceOffsetType> partitionOffsetMap
)
{
- this(stream, null, partitionOffsetMap, null);
+ this(
+ Preconditions.checkNotNull(stream, "stream"),
+ null,
+ Preconditions.checkNotNull(partitionOffsetMap, "partitionOffsetMap"),
+ null
+ );
}
@JsonProperty
@@ -90,12 +94,32 @@ public class SeekableStreamPartitions<PartitionIdType, SequenceOffsetType>
return stream;
}
+ /**
+ * Identical to {@link #getStream()}. Here for backwards compatibility, so a serialized SeekableStreamPartitions can
+ * be read by older Druid versions as a KafkaPartitions object.
+ */
+ @JsonProperty
+ public String getTopic()
+ {
+ return stream;
+ }
+
@JsonProperty
public Map<PartitionIdType, SequenceOffsetType> getPartitionSequenceNumberMap()
{
return partitionIdToSequenceNumberMap;
}
+ /**
+ * Identical to {@link #getPartitionSequenceNumberMap()} ()}. Here for backwards compatibility, so a serialized
+ * SeekableStreamPartitions can be read by older Druid versions as a KafkaPartitions object.
+ */
+ @JsonProperty
+ public Map<PartitionIdType, SequenceOffsetType> getPartitionOffsetMap()
+ {
+ return partitionIdToSequenceNumberMap;
+ }
+
@Override
public boolean equals(Object o)
{
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitionsTest.java
new file mode 100644
index 0000000..da5c3ec
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitionsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class SeekableStreamPartitionsTest
+{
+ private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ final String stream = "theStream";
+ final Map<Integer, Long> offsetMap = ImmutableMap.of(1, 2L, 3, 4L);
+
+ final SeekableStreamPartitions<Integer, Long> partitions = new SeekableStreamPartitions<>(stream, offsetMap);
+ final String serializedString = OBJECT_MAPPER.writeValueAsString(partitions);
+
+ // Check round-trip.
+ final SeekableStreamPartitions<Integer, Long> partitions2 = OBJECT_MAPPER.readValue(
+ serializedString,
+ new TypeReference<SeekableStreamPartitions<Integer, Long>>() {}
+ );
+
+ Assert.assertEquals("Round trip", partitions, partitions2);
+
+ // Check backwards compatibility.
+ final Map<String, Object> asMap = OBJECT_MAPPER.readValue(
+ serializedString,
+ JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+ );
+
+ Assert.assertEquals(stream, asMap.get("stream"));
+ Assert.assertEquals(stream, asMap.get("topic"));
+
+ // Jackson will deserialize the maps as string -> int maps, not int -> long.
+ Assert.assertEquals(
+ offsetMap,
+ OBJECT_MAPPER.convertValue(asMap.get("partitionSequenceNumberMap"), new TypeReference<Map<Integer, Long>>() {})
+ );
+ Assert.assertEquals(
+ offsetMap,
+ OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new TypeReference<Map<Integer, Long>>() {})
+ );
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org