You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/04/19 20:19:52 UTC

[incubator-druid] branch master updated: Adds backwards-compatible serde for SeekableStreamStartSequenceNumbers. (#7512)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fb5ec3  Adds backwards-compatible serde for SeekableStreamStartSequenceNumbers. (#7512)
1fb5ec3 is described below

commit 1fb5ec39890cde4b882c1f8359c6ef8f4d52b98f
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Fri Apr 19 13:19:45 2019 -0700

    Adds backwards-compatible serde for SeekableStreamStartSequenceNumbers. (#7512)
    
    This allows them to be deserialized by older Druid versions as
    KafkaPartitions objects.
    
    Fixes #7470.
---
 .../SeekableStreamEndSequenceNumbers.java          |  4 +-
 .../SeekableStreamStartSequenceNumbers.java        | 44 +++++++++++--
 .../SeekableStreamStartSequenceNumbersTest.java    | 77 ++++++++++++++++++++++
 3 files changed, 118 insertions(+), 7 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
index 3f87b27..7a4d3fa 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
@@ -105,7 +105,7 @@ public class SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetTyp
 
   /**
    * Identical to {@link #getStream()}. Here for backwards compatibility, so a serialized
-   * SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
+   * SeekableStreamEndSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
    */
   @JsonProperty
   public String getTopic()
@@ -182,7 +182,7 @@ public class SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetTyp
 
   /**
    * Identical to {@link #getPartitionSequenceNumberMap()} ()}. Here for backwards compatibility, so a serialized
-   * SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
+   * SeekableStreamEndSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
    */
   @JsonProperty
   public Map<PartitionIdType, SequenceOffsetType> getPartitionOffsetMap()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
index f737292..9a25771 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
@@ -50,16 +50,21 @@ public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
   @JsonCreator
   public SeekableStreamStartSequenceNumbers(
       @JsonProperty("stream") final String stream,
+      // kept for backward compatibility
+      @JsonProperty("topic") final String topic,
       @JsonProperty("partitionSequenceNumberMap")
       final Map<PartitionIdType, SequenceOffsetType> partitionSequenceNumberMap,
+      // kept for backward compatibility
+      @JsonProperty("partitionOffsetMap") final Map<PartitionIdType, SequenceOffsetType> partitionOffsetMap,
       @JsonProperty("exclusivePartitions") @Nullable final Set<PartitionIdType> exclusivePartitions
   )
   {
-    this.stream = Preconditions.checkNotNull(stream, "stream");
-    this.partitionSequenceNumberMap = Preconditions.checkNotNull(
-        partitionSequenceNumberMap,
-        "partitionIdToSequenceNumberMap"
-    );
+    this.stream = stream == null ? topic : stream;
+    this.partitionSequenceNumberMap = partitionOffsetMap == null ? partitionSequenceNumberMap : partitionOffsetMap;
+
+    Preconditions.checkNotNull(this.stream, "stream");
+    Preconditions.checkNotNull(this.partitionSequenceNumberMap, "partitionIdToSequenceNumberMap");
+
     // exclusiveOffset can be null if this class is deserialized from metadata store. Note that only end offsets are
     // stored in metadata store.
     // The default is true because there was only Kafka indexing service before in which the end offset is always
@@ -67,6 +72,15 @@ public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
     this.exclusivePartitions = exclusivePartitions == null ? Collections.emptySet() : exclusivePartitions;
   }
 
+  public SeekableStreamStartSequenceNumbers(
+      String stream,
+      Map<PartitionIdType, SequenceOffsetType> partitionSequenceNumberMap,
+      Set<PartitionIdType> exclusivePartitions
+  )
+  {
+    this(stream, null, partitionSequenceNumberMap, null, exclusivePartitions);
+  }
+
   @Override
   @JsonProperty
   public String getStream()
@@ -74,6 +88,16 @@ public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
     return stream;
   }
 
+  /**
+   * Identical to {@link #getStream()}. Here for backwards compatibility, so a serialized
+   * SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
+   */
+  @JsonProperty
+  public String getTopic()
+  {
+    return stream;
+  }
+
   @Override
   @JsonProperty
   public Map<PartitionIdType, SequenceOffsetType> getPartitionSequenceNumberMap()
@@ -81,6 +105,16 @@ public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
     return partitionSequenceNumberMap;
   }
 
+  /**
+   * Identical to {@link #getPartitionSequenceNumberMap()} ()}. Here for backwards compatibility, so a serialized
+   * SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
+   */
+  @JsonProperty
+  public Map<PartitionIdType, SequenceOffsetType> getPartitionOffsetMap()
+  {
+    return partitionSequenceNumberMap;
+  }
+
   @Override
   public SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> plus(
       SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java
new file mode 100644
index 0000000..f4342e2
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class SeekableStreamStartSequenceNumbersTest
+{
+  private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
+
+  @Test
+  public void testSerde() throws Exception
+  {
+    final String stream = "theStream";
+    final Map<Integer, Long> offsetMap = ImmutableMap.of(1, 2L, 3, 4L);
+
+    final SeekableStreamStartSequenceNumbers<Integer, Long> partitions = new SeekableStreamStartSequenceNumbers<>(
+        stream,
+        offsetMap,
+        ImmutableSet.of(6)
+    );
+    final String serializedString = OBJECT_MAPPER.writeValueAsString(partitions);
+
+    // Check round-trip.
+    final SeekableStreamStartSequenceNumbers<Integer, Long> partitions2 = OBJECT_MAPPER.readValue(
+        serializedString,
+        new TypeReference<SeekableStreamStartSequenceNumbers<Integer, Long>>() {}
+    );
+
+    Assert.assertEquals("Round trip", partitions, partitions2);
+
+    // Check backwards compatibility.
+    final Map<String, Object> asMap = OBJECT_MAPPER.readValue(
+        serializedString,
+        JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+    );
+
+    Assert.assertEquals(stream, asMap.get("stream"));
+    Assert.assertEquals(stream, asMap.get("topic"));
+
+    // Jackson will deserialize the maps as string -> int maps, not int -> long.
+    Assert.assertEquals(
+        offsetMap,
+        OBJECT_MAPPER.convertValue(asMap.get("partitionSequenceNumberMap"), new TypeReference<Map<Integer, Long>>() {})
+    );
+    Assert.assertEquals(
+        offsetMap,
+        OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new TypeReference<Map<Integer, Long>>() {})
+    );
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org