You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by GitBox <gi...@apache.org> on 2018/07/20 18:20:25 UTC

[GitHub] fjy closed pull request #6015: Check the kafka topic when comparing checkpoints from tasks with the one stored in metastore

fjy closed pull request #6015: Check the kafka topic when comparing checkpoints from tasks with the one stored in metastore
URL: https://github.com/apache/incubator-druid/pull/6015
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index ed287fa0591..046331e946f 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -514,9 +514,7 @@ public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckpoint, D
     Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint");
     Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null");
     Preconditions.checkArgument(
-        ioConfig.getTopic()
-                .equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions()
-                                                                     .getTopic()),
+        ioConfig.getTopic().equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()),
         "Supervisor topic [%s] and topic in checkpoint [%s] does not match",
         ioConfig.getTopic(),
         ((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()
@@ -661,6 +659,8 @@ public void handle() throws ExecutionException, InterruptedException
         int index = checkpoints.size();
         for (int sequenceId : checkpoints.descendingKeySet()) {
           Map<Integer, Long> checkpoint = checkpoints.get(sequenceId);
+          // We have already verified the topic of the current checkpoint is same with that in ioConfig.
+          // See checkpoint().
           if (checkpoint.equals(previousCheckpoint.getKafkaPartitions().getPartitionOffsetMap())) {
             break;
           }
@@ -1183,16 +1183,22 @@ public void onFailure(Throwable t)
       Futures.allAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
     }
     catch (Exception e) {
-      Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
 
     final KafkaDataSourceMetadata latestDataSourceMetadata = (KafkaDataSourceMetadata) indexerMetadataStorageCoordinator
         .getDataSourceMetadata(dataSource);
-    final Map<Integer, Long> latestOffsetsFromDb = (latestDataSourceMetadata == null
-                                                    || latestDataSourceMetadata.getKafkaPartitions() == null) ? null
-                                                                                                              : latestDataSourceMetadata
-                                                       .getKafkaPartitions()
-                                                       .getPartitionOffsetMap();
+    final boolean hasValidOffsetsFromDb = latestDataSourceMetadata != null &&
+                                          latestDataSourceMetadata.getKafkaPartitions() != null &&
+                                          ioConfig.getTopic().equals(
+                                              latestDataSourceMetadata.getKafkaPartitions().getTopic()
+                                          );
+    final Map<Integer, Long> latestOffsetsFromDb;
+    if (hasValidOffsetsFromDb) {
+      latestOffsetsFromDb = latestDataSourceMetadata.getKafkaPartitions().getPartitionOffsetMap();
+    } else {
+      latestOffsetsFromDb = null;
+    }
 
     // order tasks of this taskGroup by the latest sequenceId
     taskSequences.sort((o1, o2) -> o2.rhs.firstKey().compareTo(o1.rhs.firstKey()));
@@ -1203,22 +1209,21 @@ public void onFailure(Throwable t)
 
     while (taskIndex < taskSequences.size()) {
       if (earliestConsistentSequenceId.get() == -1) {
-        // find the first replica task with earliest sequenceId consistent with datasource metadata in the metadata store
+        // find the first replica task with earliest sequenceId consistent with datasource metadata in the metadata
+        // store
         if (taskSequences.get(taskIndex).rhs.entrySet().stream().anyMatch(
             sequenceCheckpoint -> sequenceCheckpoint.getValue().entrySet().stream().allMatch(
                 partitionOffset -> Longs.compare(
                     partitionOffset.getValue(),
-                    latestOffsetsFromDb == null
-                    ?
-                    partitionOffset.getValue()
-                    : latestOffsetsFromDb.getOrDefault(partitionOffset.getKey(), partitionOffset.getValue())
+                    latestOffsetsFromDb == null ?
+                    partitionOffset.getValue() :
+                    latestOffsetsFromDb.getOrDefault(partitionOffset.getKey(), partitionOffset.getValue())
                 ) == 0) && earliestConsistentSequenceId.compareAndSet(-1, sequenceCheckpoint.getKey())) || (
                 pendingCompletionTaskGroups.getOrDefault(groupId, EMPTY_LIST).size() > 0
                 && earliestConsistentSequenceId.compareAndSet(-1, taskSequences.get(taskIndex).rhs.firstKey()))) {
-          final SortedMap<Integer, Map<Integer, Long>> latestCheckpoints = new TreeMap<>(taskSequences.get(taskIndex).rhs
-                                                                                             .tailMap(
-                                                                                                 earliestConsistentSequenceId
-                                                                                                     .get()));
+          final SortedMap<Integer, Map<Integer, Long>> latestCheckpoints = new TreeMap<>(
+              taskSequences.get(taskIndex).rhs.tailMap(earliestConsistentSequenceId.get())
+          );
           log.info("Setting taskGroup sequences to [%s] for group [%d]", latestCheckpoints, groupId);
           taskGroup.sequenceOffsets.clear();
           taskGroup.sequenceOffsets.putAll(latestCheckpoints);
@@ -1262,7 +1267,8 @@ public void onFailure(Throwable t)
     taskSequences.stream().filter(taskIdSequences -> tasksToKill.contains(taskIdSequences.lhs)).forEach(
         sequenceCheckpoint -> {
           log.warn(
-              "Killing task [%s], as its checkpoints [%s] are not consistent with group checkpoints[%s] or latest persisted offsets in metadata store [%s]",
+              "Killing task [%s], as its checkpoints [%s] are not consistent with group checkpoints[%s] or latest "
+              + "persisted offsets in metadata store [%s]",
               sequenceCheckpoint.lhs,
               sequenceCheckpoint.rhs,
               taskGroup.sequenceOffsets,
@@ -1270,7 +1276,8 @@ public void onFailure(Throwable t)
           );
           killTask(sequenceCheckpoint.lhs);
           taskGroup.tasks.remove(sequenceCheckpoint.lhs);
-        });
+        }
+    );
   }
 
   private void addDiscoveredTaskToPendingCompletionTaskGroups(
@@ -1849,7 +1856,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc
   private long getOffsetFromStorageForPartition(int partition)
   {
     long offset;
-    Map<Integer, Long> metadataOffsets = getOffsetsFromMetadataStorage();
+    final Map<Integer, Long> metadataOffsets = getOffsetsFromMetadataStorage();
     if (metadataOffsets.get(partition) != null) {
       offset = metadataOffsets.get(partition);
       log.debug("Getting offset [%,d] from metadata storage for partition [%d]", offset, partition);
@@ -1877,8 +1884,8 @@ private long getOffsetFromStorageForPartition(int partition)
 
   private Map<Integer, Long> getOffsetsFromMetadataStorage()
   {
-    DataSourceMetadata dataSourceMetadata = indexerMetadataStorageCoordinator.getDataSourceMetadata(dataSource);
-    if (dataSourceMetadata != null && dataSourceMetadata instanceof KafkaDataSourceMetadata) {
+    final DataSourceMetadata dataSourceMetadata = indexerMetadataStorageCoordinator.getDataSourceMetadata(dataSource);
+    if (dataSourceMetadata instanceof KafkaDataSourceMetadata) {
       KafkaPartitions partitions = ((KafkaDataSourceMetadata) dataSourceMetadata).getKafkaPartitions();
       if (partitions != null) {
         if (!ioConfig.getTopic().equals(partitions.getTopic())) {
@@ -1887,14 +1894,14 @@ private long getOffsetFromStorageForPartition(int partition)
               partitions.getTopic(),
               ioConfig.getTopic()
           );
-          return ImmutableMap.of();
+          return Collections.emptyMap();
         } else if (partitions.getPartitionOffsetMap() != null) {
           return partitions.getPartitionOffsetMap();
         }
       }
     }
 
-    return ImmutableMap.of();
+    return Collections.emptyMap();
   }
 
   private long getOffsetFromKafkaForPartition(int partition, boolean useEarliestOffset)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org