You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2018/07/20 18:20:27 UTC
[incubator-druid] branch master updated: Check the kafka topic when
compacring checkpoints from tasks and the one stored in metastore (#6015)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new b7d42ed Check the kafka topic when compacring checkpoints from tasks and the one stored in metastore (#6015)
b7d42ed is described below
commit b7d42edb0fbd279cb428ce5ea5069035148e55cc
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Fri Jul 20 11:20:23 2018 -0700
Check the kafka topic when compacring checkpoints from tasks and the one stored in metastore (#6015)
---
.../indexing/kafka/supervisor/KafkaSupervisor.java | 57 ++++++++++++----------
1 file changed, 32 insertions(+), 25 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index ed287fa..046331e 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -514,9 +514,7 @@ public class KafkaSupervisor implements Supervisor
Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint");
Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null");
Preconditions.checkArgument(
- ioConfig.getTopic()
- .equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions()
- .getTopic()),
+ ioConfig.getTopic().equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()),
"Supervisor topic [%s] and topic in checkpoint [%s] does not match",
ioConfig.getTopic(),
((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()
@@ -661,6 +659,8 @@ public class KafkaSupervisor implements Supervisor
int index = checkpoints.size();
for (int sequenceId : checkpoints.descendingKeySet()) {
Map<Integer, Long> checkpoint = checkpoints.get(sequenceId);
+ // We have already verified the topic of the current checkpoint is same with that in ioConfig.
+ // See checkpoint().
if (checkpoint.equals(previousCheckpoint.getKafkaPartitions().getPartitionOffsetMap())) {
break;
}
@@ -1183,16 +1183,22 @@ public class KafkaSupervisor implements Supervisor
Futures.allAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
}
catch (Exception e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
final KafkaDataSourceMetadata latestDataSourceMetadata = (KafkaDataSourceMetadata) indexerMetadataStorageCoordinator
.getDataSourceMetadata(dataSource);
- final Map<Integer, Long> latestOffsetsFromDb = (latestDataSourceMetadata == null
- || latestDataSourceMetadata.getKafkaPartitions() == null) ? null
- : latestDataSourceMetadata
- .getKafkaPartitions()
- .getPartitionOffsetMap();
+ final boolean hasValidOffsetsFromDb = latestDataSourceMetadata != null &&
+ latestDataSourceMetadata.getKafkaPartitions() != null &&
+ ioConfig.getTopic().equals(
+ latestDataSourceMetadata.getKafkaPartitions().getTopic()
+ );
+ final Map<Integer, Long> latestOffsetsFromDb;
+ if (hasValidOffsetsFromDb) {
+ latestOffsetsFromDb = latestDataSourceMetadata.getKafkaPartitions().getPartitionOffsetMap();
+ } else {
+ latestOffsetsFromDb = null;
+ }
// order tasks of this taskGroup by the latest sequenceId
taskSequences.sort((o1, o2) -> o2.rhs.firstKey().compareTo(o1.rhs.firstKey()));
@@ -1203,22 +1209,21 @@ public class KafkaSupervisor implements Supervisor
while (taskIndex < taskSequences.size()) {
if (earliestConsistentSequenceId.get() == -1) {
- // find the first replica task with earliest sequenceId consistent with datasource metadata in the metadata store
+ // find the first replica task with earliest sequenceId consistent with datasource metadata in the metadata
+ // store
if (taskSequences.get(taskIndex).rhs.entrySet().stream().anyMatch(
sequenceCheckpoint -> sequenceCheckpoint.getValue().entrySet().stream().allMatch(
partitionOffset -> Longs.compare(
partitionOffset.getValue(),
- latestOffsetsFromDb == null
- ?
- partitionOffset.getValue()
- : latestOffsetsFromDb.getOrDefault(partitionOffset.getKey(), partitionOffset.getValue())
+ latestOffsetsFromDb == null ?
+ partitionOffset.getValue() :
+ latestOffsetsFromDb.getOrDefault(partitionOffset.getKey(), partitionOffset.getValue())
) == 0) && earliestConsistentSequenceId.compareAndSet(-1, sequenceCheckpoint.getKey())) || (
pendingCompletionTaskGroups.getOrDefault(groupId, EMPTY_LIST).size() > 0
&& earliestConsistentSequenceId.compareAndSet(-1, taskSequences.get(taskIndex).rhs.firstKey()))) {
- final SortedMap<Integer, Map<Integer, Long>> latestCheckpoints = new TreeMap<>(taskSequences.get(taskIndex).rhs
- .tailMap(
- earliestConsistentSequenceId
- .get()));
+ final SortedMap<Integer, Map<Integer, Long>> latestCheckpoints = new TreeMap<>(
+ taskSequences.get(taskIndex).rhs.tailMap(earliestConsistentSequenceId.get())
+ );
log.info("Setting taskGroup sequences to [%s] for group [%d]", latestCheckpoints, groupId);
taskGroup.sequenceOffsets.clear();
taskGroup.sequenceOffsets.putAll(latestCheckpoints);
@@ -1262,7 +1267,8 @@ public class KafkaSupervisor implements Supervisor
taskSequences.stream().filter(taskIdSequences -> tasksToKill.contains(taskIdSequences.lhs)).forEach(
sequenceCheckpoint -> {
log.warn(
- "Killing task [%s], as its checkpoints [%s] are not consistent with group checkpoints[%s] or latest persisted offsets in metadata store [%s]",
+ "Killing task [%s], as its checkpoints [%s] are not consistent with group checkpoints[%s] or latest "
+ + "persisted offsets in metadata store [%s]",
sequenceCheckpoint.lhs,
sequenceCheckpoint.rhs,
taskGroup.sequenceOffsets,
@@ -1270,7 +1276,8 @@ public class KafkaSupervisor implements Supervisor
);
killTask(sequenceCheckpoint.lhs);
taskGroup.tasks.remove(sequenceCheckpoint.lhs);
- });
+ }
+ );
}
private void addDiscoveredTaskToPendingCompletionTaskGroups(
@@ -1849,7 +1856,7 @@ public class KafkaSupervisor implements Supervisor
private long getOffsetFromStorageForPartition(int partition)
{
long offset;
- Map<Integer, Long> metadataOffsets = getOffsetsFromMetadataStorage();
+ final Map<Integer, Long> metadataOffsets = getOffsetsFromMetadataStorage();
if (metadataOffsets.get(partition) != null) {
offset = metadataOffsets.get(partition);
log.debug("Getting offset [%,d] from metadata storage for partition [%d]", offset, partition);
@@ -1877,8 +1884,8 @@ public class KafkaSupervisor implements Supervisor
private Map<Integer, Long> getOffsetsFromMetadataStorage()
{
- DataSourceMetadata dataSourceMetadata = indexerMetadataStorageCoordinator.getDataSourceMetadata(dataSource);
- if (dataSourceMetadata != null && dataSourceMetadata instanceof KafkaDataSourceMetadata) {
+ final DataSourceMetadata dataSourceMetadata = indexerMetadataStorageCoordinator.getDataSourceMetadata(dataSource);
+ if (dataSourceMetadata instanceof KafkaDataSourceMetadata) {
KafkaPartitions partitions = ((KafkaDataSourceMetadata) dataSourceMetadata).getKafkaPartitions();
if (partitions != null) {
if (!ioConfig.getTopic().equals(partitions.getTopic())) {
@@ -1887,14 +1894,14 @@ public class KafkaSupervisor implements Supervisor
partitions.getTopic(),
ioConfig.getTopic()
);
- return ImmutableMap.of();
+ return Collections.emptyMap();
} else if (partitions.getPartitionOffsetMap() != null) {
return partitions.getPartitionOffsetMap();
}
}
}
- return ImmutableMap.of();
+ return Collections.emptyMap();
}
private long getOffsetFromKafkaForPartition(int partition, boolean useEarliestOffset)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org