You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by ji...@apache.org on 2018/07/09 19:17:09 UTC

[incubator-druid] branch 0.12.2 updated: Fix ConcurrentModificationException in IncrementalPublishingKafkaIndexTaskRunner (#5907) (#5973)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.12.2 by this push:
     new 9007591  Fix ConcurrentModificationException in IncrementalPublishingKafkaIndexTaskRunner (#5907) (#5973)
9007591 is described below

commit 90075913f72add0a478c3058ef1fc0d67fcdd305
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Mon Jul 9 12:17:05 2018 -0700

    Fix ConcurrentModificationException in IncrementalPublishingKafkaIndexTaskRunner (#5907) (#5973)
    
    Ported without cherry-pick, since the original commit depends on the patch that
    splits KafkaIndexTask.
---
 .../io/druid/indexing/kafka/KafkaIndexTask.java    | 152 ++++++++++++++-------
 1 file changed, 103 insertions(+), 49 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index d7bffed..62aca36 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -44,7 +44,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.data.input.Committer;
 import io.druid.data.input.InputRow;
 import io.druid.data.input.impl.InputRowParser;
@@ -72,6 +71,7 @@ import io.druid.java.util.common.collect.Utils;
 import io.druid.java.util.common.concurrent.Execs;
 import io.druid.java.util.common.guava.Sequence;
 import io.druid.java.util.common.parsers.ParseException;
+import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.query.DruidMetrics;
 import io.druid.query.NoopQueryRunner;
 import io.druid.query.Query;
@@ -85,9 +85,9 @@ import io.druid.segment.realtime.RealtimeMetricsMonitor;
 import io.druid.segment.realtime.appenderator.Appenderator;
 import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
 import io.druid.segment.realtime.appenderator.Appenderators;
-import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
 import io.druid.segment.realtime.appenderator.SegmentIdentifier;
 import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
 import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
 import io.druid.segment.realtime.firehose.ChatHandler;
 import io.druid.segment.realtime.firehose.ChatHandlerProvider;
@@ -2059,13 +2059,19 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
 
   private static class SequenceMetadata
   {
+    /**
+     * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This lock is required because
+     * {@link #setEndOffsets)} can be called by both the main thread and the HTTP thread.
+     */
+    private final ReentrantLock lock = new ReentrantLock();
+
     private final int sequenceId;
     private final String sequenceName;
     private final Map<Integer, Long> startOffsets;
     private final Map<Integer, Long> endOffsets;
     private final Set<Integer> assignments;
     private final boolean sentinel;
-    private volatile boolean checkpointed;
+    private boolean checkpointed;
 
     @JsonCreator
     public SequenceMetadata(
@@ -2082,8 +2088,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
       this.sequenceId = sequenceId;
       this.sequenceName = sequenceName;
       this.startOffsets = ImmutableMap.copyOf(startOffsets);
-      this.endOffsets = Maps.newHashMap(endOffsets);
-      this.assignments = Sets.newHashSet(startOffsets.keySet());
+      this.endOffsets = new HashMap<>(endOffsets);
+      this.assignments = new HashSet<>(startOffsets.keySet());
       this.checkpointed = checkpointed;
       this.sentinel = false;
     }
@@ -2097,7 +2103,13 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
     @JsonProperty
     public boolean isCheckpointed()
     {
-      return checkpointed;
+      lock.lock();
+      try {
+        return checkpointed;
+      }
+      finally {
+        lock.unlock();
+      }
     }
 
     @JsonProperty
@@ -2115,7 +2127,13 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
     @JsonProperty
     public Map<Integer, Long> getEndOffsets()
     {
-      return endOffsets;
+      lock.lock();
+      try {
+        return endOffsets;
+      }
+      finally {
+        lock.unlock();
+      }
     }
 
     @JsonProperty
@@ -2126,19 +2144,31 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
 
     public void setEndOffsets(Map<Integer, Long> newEndOffsets)
     {
-      endOffsets.putAll(newEndOffsets);
-      checkpointed = true;
+      lock.lock();
+      try {
+        endOffsets.putAll(newEndOffsets);
+        checkpointed = true;
+      }
+      finally {
+        lock.unlock();
+      }
     }
 
     public void updateAssignments(Map<Integer, Long> nextPartitionOffset)
     {
-      assignments.clear();
-      nextPartitionOffset.entrySet().forEach(partitionOffset -> {
-        if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey()))
-            > 0) {
-          assignments.add(partitionOffset.getKey());
-        }
-      });
+      lock.lock();
+      try {
+        assignments.clear();
+        nextPartitionOffset.entrySet().forEach(partitionOffset -> {
+          if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey()))
+              > 0) {
+            assignments.add(partitionOffset.getKey());
+          }
+        });
+      }
+      finally {
+        lock.unlock();
+      }
     }
 
     public boolean isOpen()
@@ -2148,10 +2178,16 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
 
     boolean canHandle(ConsumerRecord<byte[], byte[]> record)
     {
-      return isOpen()
-             && endOffsets.get(record.partition()) != null
-             && record.offset() >= startOffsets.get(record.partition())
-             && record.offset() < endOffsets.get(record.partition());
+      lock.lock();
+      try {
+        return isOpen()
+               && endOffsets.get(record.partition()) != null
+               && record.offset() >= startOffsets.get(record.partition())
+               && record.offset() < endOffsets.get(record.partition());
+      }
+      finally {
+        lock.unlock();
+      }
     }
 
     private SequenceMetadata()
@@ -2173,15 +2209,21 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
     @Override
     public String toString()
     {
-      return "SequenceMetadata{" +
-             "sequenceName='" + sequenceName + '\'' +
-             ", sequenceId=" + sequenceId +
-             ", startOffsets=" + startOffsets +
-             ", endOffsets=" + endOffsets +
-             ", assignments=" + assignments +
-             ", sentinel=" + sentinel +
-             ", checkpointed=" + checkpointed +
-             '}';
+      lock.lock();
+      try {
+        return "SequenceMetadata{" +
+               "sequenceName='" + sequenceName + '\'' +
+               ", sequenceId=" + sequenceId +
+               ", startOffsets=" + startOffsets +
+               ", endOffsets=" + endOffsets +
+               ", assignments=" + assignments +
+               ", sentinel=" + sentinel +
+               ", checkpointed=" + checkpointed +
+               '}';
+      }
+      finally {
+        lock.unlock();
+      }
     }
 
 
@@ -2194,28 +2236,40 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
             @Override
             public Object getMetadata()
             {
-              Preconditions.checkState(
-                  assignments.isEmpty(),
-                  "This committer can be used only once all the records till offsets [%s] have been consumed, also make sure to call updateAssignments before using this committer",
-                  endOffsets
-              );
+              lock.lock();
 
-              // merge endOffsets for this sequence with globally lastPersistedOffsets
-              // This is done because this committer would be persisting only sub set of segments
-              // corresponding to the current sequence. Generally, lastPersistedOffsets should already
-              // cover endOffsets but just to be sure take max of offsets and persist that
-              for (Map.Entry<Integer, Long> partitionOffset : endOffsets.entrySet()) {
-                lastPersistedOffsets.put(partitionOffset.getKey(), Math.max(
-                    partitionOffset.getValue(),
-                    lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L)
-                ));
-              }
+              try {
+                Preconditions.checkState(
+                    assignments.isEmpty(),
+                    "This committer can be used only once all the records till offsets [%s] have been consumed, also make"
+                    + " sure to call updateAssignments before using this committer",
+                    endOffsets
+                );
 
-              // Publish metadata can be different from persist metadata as we are going to publish only
-              // subset of segments
-              return ImmutableMap.of(METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets),
-                                     METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets)
-              );
+                // merge endOffsets for this sequence with globally lastPersistedOffsets
+                // This is done because this committer would be persisting only sub set of segments
+                // corresponding to the current sequence. Generally, lastPersistedOffsets should already
+                // cover endOffsets but just to be sure take max of offsets and persist that
+                for (Map.Entry<Integer, Long> partitionOffset : endOffsets.entrySet()) {
+                  lastPersistedOffsets.put(
+                      partitionOffset.getKey(),
+                      Math.max(
+                          partitionOffset.getValue(),
+                          lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L)
+                      )
+                  );
+                }
+
+                // Publish metadata can be different from persist metadata as we are going to publish only
+                // subset of segments
+                return ImmutableMap.of(
+                    METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets),
+                    METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets)
+                );
+              }
+              finally {
+                lock.unlock();
+              }
             }
 
             @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org