You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/03/02 06:27:15 UTC

[incubator-druid] branch master updated: Fix two SeekableStream serde issues. (#7176)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new fa218f5  Fix two SeekableStream serde issues. (#7176)
fa218f5 is described below

commit fa218f516026e5516229205836fc6a0d51001c0b
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Fri Mar 1 22:27:08 2019 -0800

    Fix two SeekableStream serde issues. (#7176)
    
    * Fix two SeekableStream serde issues.
    
    1) Fix backwards-compatibility serde for SeekableStreamPartitions. It is needed
       for split 0.13 / 0.14 clusters to work properly during a rolling update.
    2) Abstract classes don't need JsonCreator constructors; remove them.
    
    * Comment fixes.
---
 .../druid/indexing/kafka/KafkaIndexTaskModule.java |  3 +
 .../indexing/kafka/KafkaIndexTaskTuningConfig.java |  2 -
 .../SeekableStreamDataSourceMetadata.java          |  5 +-
 .../seekablestream/SeekableStreamIndexTask.java    | 32 ++++------
 .../SeekableStreamIndexTaskIOConfig.java           | 19 +++---
 .../SeekableStreamIndexTaskTuningConfig.java       | 38 ++++++------
 .../seekablestream/SeekableStreamPartitions.java   | 36 +++++++++--
 .../SeekableStreamPartitionsTest.java              | 72 ++++++++++++++++++++++
 8 files changed, 145 insertions(+), 62 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java
index 47b5df8..34bc3bc 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java
@@ -44,6 +44,9 @@ public class KafkaIndexTaskModule implements DruidModule
                 new NamedType(KafkaIndexTask.class, "index_kafka"),
                 new NamedType(KafkaDataSourceMetadata.class, "kafka"),
                 new NamedType(KafkaIndexTaskIOConfig.class, "kafka"),
+                // "KafkaTuningConfig" is not the ideal name, but is needed for backwards compatibility.
+                // (Older versions of Druid didn't specify a type name and got this one by default.)
+                new NamedType(KafkaIndexTaskTuningConfig.class, "KafkaTuningConfig"),
                 new NamedType(KafkaSupervisorTuningConfig.class, "kafka"),
                 new NamedType(KafkaSupervisorSpec.class, "kafka")
             )
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
index 7cee877..2104759 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.kafka;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@@ -30,7 +29,6 @@ import org.joda.time.Period;
 import javax.annotation.Nullable;
 import java.io.File;
 
-@JsonTypeName("KafkaTuningConfig")
 public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig
 {
   @JsonCreator
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java
index 2501048..b9e8d9a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.indexing.seekablestream;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.java.util.common.IAE;
@@ -33,9 +32,8 @@ public abstract class SeekableStreamDataSourceMetadata<PartitionIdType, Sequence
 {
   private final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> seekableStreamPartitions;
 
-  @JsonCreator
   public SeekableStreamDataSourceMetadata(
-      @JsonProperty("partitions") SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> seekableStreamPartitions
+      SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> seekableStreamPartitions
   )
   {
     this.seekableStreamPartitions = seekableStreamPartitions;
@@ -63,7 +61,6 @@ public abstract class SeekableStreamDataSourceMetadata<PartitionIdType, Sequence
     return plus(other).equals(other.plus(this));
   }
 
-
   @Override
   public DataSourceMetadata plus(DataSourceMetadata other)
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index bfabab1..d4659bf 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -19,8 +19,6 @@
 
 package org.apache.druid.indexing.seekablestream;
 
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
@@ -58,17 +56,15 @@ import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.utils.CircularBuffer;
 
+import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
 import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
 
 
-public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType extends Comparable> extends AbstractTask
-    implements ChatHandler
+public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType extends Comparable>
+    extends AbstractTask implements ChatHandler
 {
   public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
-  private static final Random RANDOM = ThreadLocalRandom.current();
   private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTask.class);
 
   private final SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner;
@@ -82,18 +78,17 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
   protected final RowIngestionMetersFactory rowIngestionMetersFactory;
   protected final CircularBuffer<Throwable> savedParseExceptions;
 
-  @JsonCreator
   public SeekableStreamIndexTask(
-      @JsonProperty("id") String id,
-      @JsonProperty("resource") TaskResource taskResource,
-      @JsonProperty("dataSchema") DataSchema dataSchema,
-      @JsonProperty("tuningConfig") SeekableStreamIndexTaskTuningConfig tuningConfig,
-      @JsonProperty("ioConfig") SeekableStreamIndexTaskIOConfig ioConfig,
-      @JsonProperty("context") Map<String, Object> context,
-      @JacksonInject ChatHandlerProvider chatHandlerProvider,
-      @JacksonInject AuthorizerMapper authorizerMapper,
-      @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
-      String groupId
+      final String id,
+      @Nullable final TaskResource taskResource,
+      final DataSchema dataSchema,
+      final SeekableStreamIndexTaskTuningConfig tuningConfig,
+      final SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig,
+      @Nullable final Map<String, Object> context,
+      @Nullable final ChatHandlerProvider chatHandlerProvider,
+      final AuthorizerMapper authorizerMapper,
+      final RowIngestionMetersFactory rowIngestionMetersFactory,
+      @Nullable final String groupId
   )
   {
     super(
@@ -119,7 +114,6 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
     this.runner = createTaskRunner();
   }
 
-
   private static String makeTaskId(String dataSource, String type)
   {
     final String suffix = RandomIdUtils.getRandomId();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
index dde9702..7e16c35 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.indexing.seekablestream;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -45,17 +44,15 @@ public abstract class SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
   private final Optional<DateTime> maximumMessageTime;
   private final Set<PartitionIdType> exclusiveStartSequenceNumberPartitions;
 
-  @JsonCreator
   public SeekableStreamIndexTaskIOConfig(
-      @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // can be null for backward compabitility
-      @JsonProperty("baseSequenceName") String baseSequenceName,
-      @JsonProperty("startPartitions") SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> startPartitions,
-      @JsonProperty("endPartitions") SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> endPartitions,
-      @JsonProperty("useTransaction") Boolean useTransaction,
-      @JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
-      @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
-      @JsonProperty("exclusiveStartSequenceNumberPartitions")
-          Set<PartitionIdType> exclusiveStartSequenceNumberPartitions
+      final @Nullable Integer taskGroupId, // can be null for backward compabitility
+      final String baseSequenceName,
+      final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> startPartitions,
+      final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> endPartitions,
+      final Boolean useTransaction,
+      final DateTime minimumMessageTime,
+      final DateTime maximumMessageTime,
+      final Set<PartitionIdType> exclusiveStartSequenceNumberPartitions
   )
   {
     this.taskGroupId = taskGroupId;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
index 3ebfe59..b594e42 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.indexing.seekablestream;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
@@ -60,27 +59,26 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
   private final int maxParseExceptions;
   private final int maxSavedParseExceptions;
 
-  @JsonCreator
   public SeekableStreamIndexTaskTuningConfig(
-      @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
-      @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
-      @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
-      @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
-      @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
-      @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
-      @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
-      @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
+      @Nullable Integer maxRowsInMemory,
+      @Nullable Long maxBytesInMemory,
+      @Nullable Integer maxRowsPerSegment,
+      @Nullable Long maxTotalRows,
+      @Nullable Period intermediatePersistPeriod,
+      @Nullable File basePersistDirectory,
+      @Nullable Integer maxPendingPersists,
+      @Nullable IndexSpec indexSpec,
       // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
-      @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
-      @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
-      @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
-      @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
-      @JsonProperty("skipSequenceNumberAvailabilityCheck") Boolean skipSequenceNumberAvailabilityCheck,
-      @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
-      @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
-      @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
-      @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
-      @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
+      @Deprecated @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
+      @Deprecated @Nullable Boolean reportParseExceptions,
+      @Nullable Long handoffConditionTimeout,
+      @Nullable Boolean resetOffsetAutomatically,
+      Boolean skipSequenceNumberAvailabilityCheck,
+      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      @Nullable Period intermediateHandoffPeriod,
+      @Nullable Boolean logParseExceptions,
+      @Nullable Integer maxParseExceptions,
+      @Nullable Integer maxSavedParseExceptions
   )
   {
     // Cannot be a static because default basePersistDirectory is unique per-instance
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java
index 8c034da..28f5dde 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java
@@ -31,14 +31,13 @@ import java.util.Objects;
 /**
  * class that encapsulates a partitionIdToSequenceNumberMap of partitionId -> sequenceNumber.
  * To be backward compatible with both Kafka and Kinesis datasource metadata when
- * deserializing json. Redundant constructor fields stream, topic and
+ * serializing and deserializing json, redundant constructor fields stream, topic,
  * partitionSequenceNumberMap and partitionOffsetMap are created. Only one of topic, stream
  * should have a non-null value and only one of partitionOffsetMap and partitionSequenceNumberMap
  * should have a non-null value.
- * <p>
- * Redundant getters
- * are used for proper Jackson serialization/deserialization when processing terminologies
- * used by Kafka and kinesis (i.e. topic vs. stream)
+ *
+ * Redundant getters are used for proper Jackson serialization/deserialization when processing terminologies
+ * used by Kafka and Kinesis (i.e. topic vs. stream)
  *
  * @param <PartitionIdType>    partition id type
  * @param <SequenceOffsetType> sequence number type
@@ -81,7 +80,12 @@ public class SeekableStreamPartitions<PartitionIdType, SequenceOffsetType>
       final Map<PartitionIdType, SequenceOffsetType> partitionOffsetMap
   )
   {
-    this(stream, null, partitionOffsetMap, null);
+    this(
+        Preconditions.checkNotNull(stream, "stream"),
+        null,
+        Preconditions.checkNotNull(partitionOffsetMap, "partitionOffsetMap"),
+        null
+    );
   }
 
   @JsonProperty
@@ -90,12 +94,32 @@ public class SeekableStreamPartitions<PartitionIdType, SequenceOffsetType>
     return stream;
   }
 
+  /**
+   * Identical to {@link #getStream()}. Here for backwards compatibility, so a serialized SeekableStreamPartitions can
+   * be read by older Druid versions as a KafkaPartitions object.
+   */
+  @JsonProperty
+  public String getTopic()
+  {
+    return stream;
+  }
+
   @JsonProperty
   public Map<PartitionIdType, SequenceOffsetType> getPartitionSequenceNumberMap()
   {
     return partitionIdToSequenceNumberMap;
   }
 
+  /**
+   * Identical to {@link #getPartitionSequenceNumberMap()} ()}. Here for backwards compatibility, so a serialized
+   * SeekableStreamPartitions can be read by older Druid versions as a KafkaPartitions object.
+   */
+  @JsonProperty
+  public Map<PartitionIdType, SequenceOffsetType> getPartitionOffsetMap()
+  {
+    return partitionIdToSequenceNumberMap;
+  }
+
   @Override
   public boolean equals(Object o)
   {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitionsTest.java
new file mode 100644
index 0000000..da5c3ec
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitionsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class SeekableStreamPartitionsTest
+{
+  private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
+
+  @Test
+  public void testSerde() throws Exception
+  {
+    final String stream = "theStream";
+    final Map<Integer, Long> offsetMap = ImmutableMap.of(1, 2L, 3, 4L);
+
+    final SeekableStreamPartitions<Integer, Long> partitions = new SeekableStreamPartitions<>(stream, offsetMap);
+    final String serializedString = OBJECT_MAPPER.writeValueAsString(partitions);
+
+    // Check round-trip.
+    final SeekableStreamPartitions<Integer, Long> partitions2 = OBJECT_MAPPER.readValue(
+        serializedString,
+        new TypeReference<SeekableStreamPartitions<Integer, Long>>() {}
+    );
+
+    Assert.assertEquals("Round trip", partitions, partitions2);
+
+    // Check backwards compatibility.
+    final Map<String, Object> asMap = OBJECT_MAPPER.readValue(
+        serializedString,
+        JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+    );
+
+    Assert.assertEquals(stream, asMap.get("stream"));
+    Assert.assertEquals(stream, asMap.get("topic"));
+
+    // Jackson will deserialize the maps as string -> int maps, not int -> long.
+    Assert.assertEquals(
+        offsetMap,
+        OBJECT_MAPPER.convertValue(asMap.get("partitionSequenceNumberMap"), new TypeReference<Map<Integer, Long>>() {})
+    );
+    Assert.assertEquals(
+        offsetMap,
+        OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new TypeReference<Map<Integer, Long>>() {})
+    );
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org