You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by gi...@apache.org on 2018/07/09 18:22:19 UTC

[incubator-druid] branch 0.12.2 updated: Fix Kafka Indexing task pause forever if no events in taskDuration (#5656) (#5899) (#5971)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.12.2 by this push:
     new 638f50c  Fix Kafka Indexing task pause forever if no events in taskDuration (#5656) (#5899) (#5971)
638f50c is described below

commit 638f50cb52c248f4408975d5fc7762cc9ce82d8e
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Mon Jul 9 11:22:16 2018 -0700

    Fix Kafka Indexing task pause forever if no events in taskDuration (#5656) (#5899) (#5971)
    
    * Fix Kafka Indexing task pause forever (#5656)
    
    * Fix Nullpointer Exception in overlord if taskGroups does not contain the groupId
    * If the endOffset is same as startOffset, still let the task resume instead of returning
       endOffsets early which causes the tasks to pause forever and ultimately fail on timeout
    
    * Address PR comment
    
    *Remove the null check and do not return null from generateSequenceName
---
 .../indexing/kafka/supervisor/KafkaSupervisor.java | 11 ++-
 .../kafka/supervisor/KafkaSupervisorTest.java      | 81 ++++++++++++++++++++++
 2 files changed, 86 insertions(+), 6 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 5eb783c..f79b297 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -43,11 +43,8 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.service.ServiceEmitter;
-import io.druid.java.util.emitter.service.ServiceMetricEvent;
-import io.druid.indexing.common.TaskInfoProvider;
 import io.druid.indexer.TaskLocation;
+import io.druid.indexing.common.TaskInfoProvider;
 import io.druid.indexing.common.TaskStatus;
 import io.druid.indexing.common.task.Task;
 import io.druid.indexing.common.task.TaskResource;
@@ -74,6 +71,9 @@ import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.Pair;
 import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+import io.druid.java.util.emitter.service.ServiceMetricEvent;
 import io.druid.metadata.EntryExistsException;
 import io.druid.server.metrics.DruidMonitorSchedulerConfig;
 import org.apache.commons.codec.digest.DigestUtils;
@@ -1508,12 +1508,11 @@ public class KafkaSupervisor implements Supervisor
 
               if (endOffsets.equals(taskGroup.sequenceOffsets.lastEntry().getValue())) {
                 log.warn(
-                    "Not adding checkpoint [%s] as its same as the start offsets [%s] of latest sequence for the task group [%d]",
+                    "Checkpoint [%s] is same as the start offsets [%s] of latest sequence for the task group [%d]",
                     endOffsets,
                     taskGroup.sequenceOffsets.lastEntry().getValue(),
                     groupId
                 );
-                return endOffsets;
               }
 
               log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets);
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 3c2b198..c429265 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -1902,6 +1902,87 @@ public class KafkaSupervisorTest extends EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void testNoDataIngestionTasks() throws Exception
+  {
+    final DateTime startTime = DateTimes.nowUtc();
+    supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+    //not adding any events
+    Task id1 = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        "sequenceName-0",
+        new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
+        new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    Task id2 = createKafkaIndexTask(
+        "id2",
+        DATASOURCE,
+        "sequenceName-0",
+        new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    Task id3 = createKafkaIndexTask(
+        "id3",
+        DATASOURCE,
+        "sequenceName-0",
+        new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
+    expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+    expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+    expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+    expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+    expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            null
+        )
+    ).anyTimes();
+    expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+    expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+    expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+    expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime));
+    expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
+    expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
+
+    TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+    checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+
+    taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
+    replayAll();
+
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+
+    reset(taskQueue, indexerMetadataStorageCoordinator);
+    expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
+    taskQueue.shutdown("id1");
+    taskQueue.shutdown("id2");
+    taskQueue.shutdown("id3");
+    replay(taskQueue, indexerMetadataStorageCoordinator);
+
+    supervisor.resetInternal(null);
+    verifyAll();
+  }
+
   private void addSomeEvents(int numEventsPerPartition) throws Exception
   {
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org