You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/04/19 20:19:52 UTC
[incubator-druid] branch master updated: Adds backwards-compatible
serde for SeekableStreamStartSequenceNumbers. (#7512)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1fb5ec3 Adds backwards-compatible serde for SeekableStreamStartSequenceNumbers. (#7512)
1fb5ec3 is described below
commit 1fb5ec39890cde4b882c1f8359c6ef8f4d52b98f
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Fri Apr 19 13:19:45 2019 -0700
Adds backwards-compatible serde for SeekableStreamStartSequenceNumbers. (#7512)
This allows them to be deserialized by older Druid versions as
KafkaPartitions objects.
Fixes #7470.
---
.../SeekableStreamEndSequenceNumbers.java | 4 +-
.../SeekableStreamStartSequenceNumbers.java | 44 +++++++++++--
.../SeekableStreamStartSequenceNumbersTest.java | 77 ++++++++++++++++++++++
3 files changed, 118 insertions(+), 7 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
index 3f87b27..7a4d3fa 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
@@ -105,7 +105,7 @@ public class SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetTyp
/**
* Identical to {@link #getStream()}. Here for backwards compatibility, so a serialized
- * SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
+ * SeekableStreamEndSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
*/
@JsonProperty
public String getTopic()
@@ -182,7 +182,7 @@ public class SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetTyp
/**
* Identical to {@link #getPartitionSequenceNumberMap()} ()}. Here for backwards compatibility, so a serialized
- * SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
+ * SeekableStreamEndSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
*/
@JsonProperty
public Map<PartitionIdType, SequenceOffsetType> getPartitionOffsetMap()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
index f737292..9a25771 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
@@ -50,16 +50,21 @@ public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
@JsonCreator
public SeekableStreamStartSequenceNumbers(
@JsonProperty("stream") final String stream,
+ // kept for backward compatibility
+ @JsonProperty("topic") final String topic,
@JsonProperty("partitionSequenceNumberMap")
final Map<PartitionIdType, SequenceOffsetType> partitionSequenceNumberMap,
+ // kept for backward compatibility
+ @JsonProperty("partitionOffsetMap") final Map<PartitionIdType, SequenceOffsetType> partitionOffsetMap,
@JsonProperty("exclusivePartitions") @Nullable final Set<PartitionIdType> exclusivePartitions
)
{
- this.stream = Preconditions.checkNotNull(stream, "stream");
- this.partitionSequenceNumberMap = Preconditions.checkNotNull(
- partitionSequenceNumberMap,
- "partitionIdToSequenceNumberMap"
- );
+ this.stream = stream == null ? topic : stream;
+ this.partitionSequenceNumberMap = partitionOffsetMap == null ? partitionSequenceNumberMap : partitionOffsetMap;
+
+ Preconditions.checkNotNull(this.stream, "stream");
+ Preconditions.checkNotNull(this.partitionSequenceNumberMap, "partitionIdToSequenceNumberMap");
+
// exclusiveOffset can be null if this class is deserialized from metadata store. Note that only end offsets are
// stored in metadata store.
// The default is true because there was only Kafka indexing service before in which the end offset is always
@@ -67,6 +72,15 @@ public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
this.exclusivePartitions = exclusivePartitions == null ? Collections.emptySet() : exclusivePartitions;
}
+ public SeekableStreamStartSequenceNumbers(
+ String stream,
+ Map<PartitionIdType, SequenceOffsetType> partitionSequenceNumberMap,
+ Set<PartitionIdType> exclusivePartitions
+ )
+ {
+ this(stream, null, partitionSequenceNumberMap, null, exclusivePartitions);
+ }
+
@Override
@JsonProperty
public String getStream()
@@ -74,6 +88,16 @@ public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
return stream;
}
+ /**
+ * Identical to {@link #getStream()}. Here for backwards compatibility, so a serialized
+ * SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
+ */
+ @JsonProperty
+ public String getTopic()
+ {
+ return stream;
+ }
+
@Override
@JsonProperty
public Map<PartitionIdType, SequenceOffsetType> getPartitionSequenceNumberMap()
@@ -81,6 +105,16 @@ public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
return partitionSequenceNumberMap;
}
+ /**
+ * Identical to {@link #getPartitionSequenceNumberMap()} ()}. Here for backwards compatibility, so a serialized
+ * SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
+ */
+ @JsonProperty
+ public Map<PartitionIdType, SequenceOffsetType> getPartitionOffsetMap()
+ {
+ return partitionSequenceNumberMap;
+ }
+
@Override
public SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> plus(
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java
new file mode 100644
index 0000000..f4342e2
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class SeekableStreamStartSequenceNumbersTest
+{
+ private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ final String stream = "theStream";
+ final Map<Integer, Long> offsetMap = ImmutableMap.of(1, 2L, 3, 4L);
+
+ final SeekableStreamStartSequenceNumbers<Integer, Long> partitions = new SeekableStreamStartSequenceNumbers<>(
+ stream,
+ offsetMap,
+ ImmutableSet.of(6)
+ );
+ final String serializedString = OBJECT_MAPPER.writeValueAsString(partitions);
+
+ // Check round-trip.
+ final SeekableStreamStartSequenceNumbers<Integer, Long> partitions2 = OBJECT_MAPPER.readValue(
+ serializedString,
+ new TypeReference<SeekableStreamStartSequenceNumbers<Integer, Long>>() {}
+ );
+
+ Assert.assertEquals("Round trip", partitions, partitions2);
+
+ // Check backwards compatibility.
+ final Map<String, Object> asMap = OBJECT_MAPPER.readValue(
+ serializedString,
+ JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+ );
+
+ Assert.assertEquals(stream, asMap.get("stream"));
+ Assert.assertEquals(stream, asMap.get("topic"));
+
+ // Jackson will deserialize the maps as string -> int maps, not int -> long.
+ Assert.assertEquals(
+ offsetMap,
+ OBJECT_MAPPER.convertValue(asMap.get("partitionSequenceNumberMap"), new TypeReference<Map<Integer, Long>>() {})
+ );
+ Assert.assertEquals(
+ offsetMap,
+ OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new TypeReference<Map<Integer, Long>>() {})
+ );
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org