You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by ji...@apache.org on 2018/07/09 19:17:09 UTC
[incubator-druid] branch 0.12.2 updated: Fix
ConcurrentModificationException in
IncrementalPublishingKafkaIndexTaskRunner (#5907) (#5973)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.2 by this push:
new 9007591 Fix ConcurrentModificationException in IncrementalPublishingKafkaIndexTaskRunner (#5907) (#5973)
9007591 is described below
commit 90075913f72add0a478c3058ef1fc0d67fcdd305
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Mon Jul 9 12:17:05 2018 -0700
Fix ConcurrentModificationException in IncrementalPublishingKafkaIndexTaskRunner (#5907) (#5973)
Ported without cherry-pick, since the original commit depends on the patch that
splits KafkaIndexTask.
---
.../io/druid/indexing/kafka/KafkaIndexTask.java | 152 ++++++++++++++-------
1 file changed, 103 insertions(+), 49 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index d7bffed..62aca36 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -44,7 +44,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
@@ -72,6 +71,7 @@ import io.druid.java.util.common.collect.Utils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.parsers.ParseException;
+import io.druid.java.util.emitter.EmittingLogger;
import io.druid.query.DruidMetrics;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
@@ -85,9 +85,9 @@ import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators;
-import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import io.druid.segment.realtime.firehose.ChatHandler;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
@@ -2059,13 +2059,19 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private static class SequenceMetadata
{
+ /**
+ * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This lock is required because
+ * {@link #setEndOffsets)} can be called by both the main thread and the HTTP thread.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
+
private final int sequenceId;
private final String sequenceName;
private final Map<Integer, Long> startOffsets;
private final Map<Integer, Long> endOffsets;
private final Set<Integer> assignments;
private final boolean sentinel;
- private volatile boolean checkpointed;
+ private boolean checkpointed;
@JsonCreator
public SequenceMetadata(
@@ -2082,8 +2088,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
this.sequenceId = sequenceId;
this.sequenceName = sequenceName;
this.startOffsets = ImmutableMap.copyOf(startOffsets);
- this.endOffsets = Maps.newHashMap(endOffsets);
- this.assignments = Sets.newHashSet(startOffsets.keySet());
+ this.endOffsets = new HashMap<>(endOffsets);
+ this.assignments = new HashSet<>(startOffsets.keySet());
this.checkpointed = checkpointed;
this.sentinel = false;
}
@@ -2097,7 +2103,13 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
@JsonProperty
public boolean isCheckpointed()
{
- return checkpointed;
+ lock.lock();
+ try {
+ return checkpointed;
+ }
+ finally {
+ lock.unlock();
+ }
}
@JsonProperty
@@ -2115,7 +2127,13 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
@JsonProperty
public Map<Integer, Long> getEndOffsets()
{
- return endOffsets;
+ lock.lock();
+ try {
+ return endOffsets;
+ }
+ finally {
+ lock.unlock();
+ }
}
@JsonProperty
@@ -2126,19 +2144,31 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
public void setEndOffsets(Map<Integer, Long> newEndOffsets)
{
- endOffsets.putAll(newEndOffsets);
- checkpointed = true;
+ lock.lock();
+ try {
+ endOffsets.putAll(newEndOffsets);
+ checkpointed = true;
+ }
+ finally {
+ lock.unlock();
+ }
}
public void updateAssignments(Map<Integer, Long> nextPartitionOffset)
{
- assignments.clear();
- nextPartitionOffset.entrySet().forEach(partitionOffset -> {
- if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey()))
- > 0) {
- assignments.add(partitionOffset.getKey());
- }
- });
+ lock.lock();
+ try {
+ assignments.clear();
+ nextPartitionOffset.entrySet().forEach(partitionOffset -> {
+ if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey()))
+ > 0) {
+ assignments.add(partitionOffset.getKey());
+ }
+ });
+ }
+ finally {
+ lock.unlock();
+ }
}
public boolean isOpen()
@@ -2148,10 +2178,16 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
boolean canHandle(ConsumerRecord<byte[], byte[]> record)
{
- return isOpen()
- && endOffsets.get(record.partition()) != null
- && record.offset() >= startOffsets.get(record.partition())
- && record.offset() < endOffsets.get(record.partition());
+ lock.lock();
+ try {
+ return isOpen()
+ && endOffsets.get(record.partition()) != null
+ && record.offset() >= startOffsets.get(record.partition())
+ && record.offset() < endOffsets.get(record.partition());
+ }
+ finally {
+ lock.unlock();
+ }
}
private SequenceMetadata()
@@ -2173,15 +2209,21 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
@Override
public String toString()
{
- return "SequenceMetadata{" +
- "sequenceName='" + sequenceName + '\'' +
- ", sequenceId=" + sequenceId +
- ", startOffsets=" + startOffsets +
- ", endOffsets=" + endOffsets +
- ", assignments=" + assignments +
- ", sentinel=" + sentinel +
- ", checkpointed=" + checkpointed +
- '}';
+ lock.lock();
+ try {
+ return "SequenceMetadata{" +
+ "sequenceName='" + sequenceName + '\'' +
+ ", sequenceId=" + sequenceId +
+ ", startOffsets=" + startOffsets +
+ ", endOffsets=" + endOffsets +
+ ", assignments=" + assignments +
+ ", sentinel=" + sentinel +
+ ", checkpointed=" + checkpointed +
+ '}';
+ }
+ finally {
+ lock.unlock();
+ }
}
@@ -2194,28 +2236,40 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
@Override
public Object getMetadata()
{
- Preconditions.checkState(
- assignments.isEmpty(),
- "This committer can be used only once all the records till offsets [%s] have been consumed, also make sure to call updateAssignments before using this committer",
- endOffsets
- );
+ lock.lock();
- // merge endOffsets for this sequence with globally lastPersistedOffsets
- // This is done because this committer would be persisting only sub set of segments
- // corresponding to the current sequence. Generally, lastPersistedOffsets should already
- // cover endOffsets but just to be sure take max of offsets and persist that
- for (Map.Entry<Integer, Long> partitionOffset : endOffsets.entrySet()) {
- lastPersistedOffsets.put(partitionOffset.getKey(), Math.max(
- partitionOffset.getValue(),
- lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L)
- ));
- }
+ try {
+ Preconditions.checkState(
+ assignments.isEmpty(),
+ "This committer can be used only once all the records till offsets [%s] have been consumed, also make"
+ + " sure to call updateAssignments before using this committer",
+ endOffsets
+ );
- // Publish metadata can be different from persist metadata as we are going to publish only
- // subset of segments
- return ImmutableMap.of(METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets),
- METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets)
- );
+ // merge endOffsets for this sequence with globally lastPersistedOffsets
+ // This is done because this committer would be persisting only sub set of segments
+ // corresponding to the current sequence. Generally, lastPersistedOffsets should already
+ // cover endOffsets but just to be sure take max of offsets and persist that
+ for (Map.Entry<Integer, Long> partitionOffset : endOffsets.entrySet()) {
+ lastPersistedOffsets.put(
+ partitionOffset.getKey(),
+ Math.max(
+ partitionOffset.getValue(),
+ lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L)
+ )
+ );
+ }
+
+ // Publish metadata can be different from persist metadata as we are going to publish only
+ // subset of segments
+ return ImmutableMap.of(
+ METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets),
+ METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets)
+ );
+ }
+ finally {
+ lock.unlock();
+ }
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org