You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2018/07/20 18:20:27 UTC

[incubator-druid] branch master updated: Check the kafka topic when compacring checkpoints from tasks and the one stored in metastore (#6015)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b7d42ed  Check the kafka topic when compacring checkpoints from tasks and the one stored in metastore (#6015)
b7d42ed is described below

commit b7d42edb0fbd279cb428ce5ea5069035148e55cc
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Fri Jul 20 11:20:23 2018 -0700

    Check the kafka topic when compacring checkpoints from tasks and the one stored in metastore (#6015)
---
 .../indexing/kafka/supervisor/KafkaSupervisor.java | 57 ++++++++++++----------
 1 file changed, 32 insertions(+), 25 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index ed287fa..046331e 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -514,9 +514,7 @@ public class KafkaSupervisor implements Supervisor
     Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint");
     Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null");
     Preconditions.checkArgument(
-        ioConfig.getTopic()
-                .equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions()
-                                                                     .getTopic()),
+        ioConfig.getTopic().equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()),
         "Supervisor topic [%s] and topic in checkpoint [%s] does not match",
         ioConfig.getTopic(),
         ((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()
@@ -661,6 +659,8 @@ public class KafkaSupervisor implements Supervisor
         int index = checkpoints.size();
         for (int sequenceId : checkpoints.descendingKeySet()) {
           Map<Integer, Long> checkpoint = checkpoints.get(sequenceId);
+          // We have already verified the topic of the current checkpoint is same with that in ioConfig.
+          // See checkpoint().
           if (checkpoint.equals(previousCheckpoint.getKafkaPartitions().getPartitionOffsetMap())) {
             break;
           }
@@ -1183,16 +1183,22 @@ public class KafkaSupervisor implements Supervisor
       Futures.allAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
     }
     catch (Exception e) {
-      Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
 
     final KafkaDataSourceMetadata latestDataSourceMetadata = (KafkaDataSourceMetadata) indexerMetadataStorageCoordinator
         .getDataSourceMetadata(dataSource);
-    final Map<Integer, Long> latestOffsetsFromDb = (latestDataSourceMetadata == null
-                                                    || latestDataSourceMetadata.getKafkaPartitions() == null) ? null
-                                                                                                              : latestDataSourceMetadata
-                                                       .getKafkaPartitions()
-                                                       .getPartitionOffsetMap();
+    final boolean hasValidOffsetsFromDb = latestDataSourceMetadata != null &&
+                                          latestDataSourceMetadata.getKafkaPartitions() != null &&
+                                          ioConfig.getTopic().equals(
+                                              latestDataSourceMetadata.getKafkaPartitions().getTopic()
+                                          );
+    final Map<Integer, Long> latestOffsetsFromDb;
+    if (hasValidOffsetsFromDb) {
+      latestOffsetsFromDb = latestDataSourceMetadata.getKafkaPartitions().getPartitionOffsetMap();
+    } else {
+      latestOffsetsFromDb = null;
+    }
 
     // order tasks of this taskGroup by the latest sequenceId
     taskSequences.sort((o1, o2) -> o2.rhs.firstKey().compareTo(o1.rhs.firstKey()));
@@ -1203,22 +1209,21 @@ public class KafkaSupervisor implements Supervisor
 
     while (taskIndex < taskSequences.size()) {
       if (earliestConsistentSequenceId.get() == -1) {
-        // find the first replica task with earliest sequenceId consistent with datasource metadata in the metadata store
+        // find the first replica task with earliest sequenceId consistent with datasource metadata in the metadata
+        // store
         if (taskSequences.get(taskIndex).rhs.entrySet().stream().anyMatch(
             sequenceCheckpoint -> sequenceCheckpoint.getValue().entrySet().stream().allMatch(
                 partitionOffset -> Longs.compare(
                     partitionOffset.getValue(),
-                    latestOffsetsFromDb == null
-                    ?
-                    partitionOffset.getValue()
-                    : latestOffsetsFromDb.getOrDefault(partitionOffset.getKey(), partitionOffset.getValue())
+                    latestOffsetsFromDb == null ?
+                    partitionOffset.getValue() :
+                    latestOffsetsFromDb.getOrDefault(partitionOffset.getKey(), partitionOffset.getValue())
                 ) == 0) && earliestConsistentSequenceId.compareAndSet(-1, sequenceCheckpoint.getKey())) || (
                 pendingCompletionTaskGroups.getOrDefault(groupId, EMPTY_LIST).size() > 0
                 && earliestConsistentSequenceId.compareAndSet(-1, taskSequences.get(taskIndex).rhs.firstKey()))) {
-          final SortedMap<Integer, Map<Integer, Long>> latestCheckpoints = new TreeMap<>(taskSequences.get(taskIndex).rhs
-                                                                                             .tailMap(
-                                                                                                 earliestConsistentSequenceId
-                                                                                                     .get()));
+          final SortedMap<Integer, Map<Integer, Long>> latestCheckpoints = new TreeMap<>(
+              taskSequences.get(taskIndex).rhs.tailMap(earliestConsistentSequenceId.get())
+          );
           log.info("Setting taskGroup sequences to [%s] for group [%d]", latestCheckpoints, groupId);
           taskGroup.sequenceOffsets.clear();
           taskGroup.sequenceOffsets.putAll(latestCheckpoints);
@@ -1262,7 +1267,8 @@ public class KafkaSupervisor implements Supervisor
     taskSequences.stream().filter(taskIdSequences -> tasksToKill.contains(taskIdSequences.lhs)).forEach(
         sequenceCheckpoint -> {
           log.warn(
-              "Killing task [%s], as its checkpoints [%s] are not consistent with group checkpoints[%s] or latest persisted offsets in metadata store [%s]",
+              "Killing task [%s], as its checkpoints [%s] are not consistent with group checkpoints[%s] or latest "
+              + "persisted offsets in metadata store [%s]",
               sequenceCheckpoint.lhs,
               sequenceCheckpoint.rhs,
               taskGroup.sequenceOffsets,
@@ -1270,7 +1276,8 @@ public class KafkaSupervisor implements Supervisor
           );
           killTask(sequenceCheckpoint.lhs);
           taskGroup.tasks.remove(sequenceCheckpoint.lhs);
-        });
+        }
+    );
   }
 
   private void addDiscoveredTaskToPendingCompletionTaskGroups(
@@ -1849,7 +1856,7 @@ public class KafkaSupervisor implements Supervisor
   private long getOffsetFromStorageForPartition(int partition)
   {
     long offset;
-    Map<Integer, Long> metadataOffsets = getOffsetsFromMetadataStorage();
+    final Map<Integer, Long> metadataOffsets = getOffsetsFromMetadataStorage();
     if (metadataOffsets.get(partition) != null) {
       offset = metadataOffsets.get(partition);
       log.debug("Getting offset [%,d] from metadata storage for partition [%d]", offset, partition);
@@ -1877,8 +1884,8 @@ public class KafkaSupervisor implements Supervisor
 
   private Map<Integer, Long> getOffsetsFromMetadataStorage()
   {
-    DataSourceMetadata dataSourceMetadata = indexerMetadataStorageCoordinator.getDataSourceMetadata(dataSource);
-    if (dataSourceMetadata != null && dataSourceMetadata instanceof KafkaDataSourceMetadata) {
+    final DataSourceMetadata dataSourceMetadata = indexerMetadataStorageCoordinator.getDataSourceMetadata(dataSource);
+    if (dataSourceMetadata instanceof KafkaDataSourceMetadata) {
       KafkaPartitions partitions = ((KafkaDataSourceMetadata) dataSourceMetadata).getKafkaPartitions();
       if (partitions != null) {
         if (!ioConfig.getTopic().equals(partitions.getTopic())) {
@@ -1887,14 +1894,14 @@ public class KafkaSupervisor implements Supervisor
               partitions.getTopic(),
               ioConfig.getTopic()
           );
-          return ImmutableMap.of();
+          return Collections.emptyMap();
         } else if (partitions.getPartitionOffsetMap() != null) {
           return partitions.getPartitionOffsetMap();
         }
       }
     }
 
-    return ImmutableMap.of();
+    return Collections.emptyMap();
   }
 
   private long getOffsetFromKafkaForPartition(int partition, boolean useEarliestOffset)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org