You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by gi...@apache.org on 2018/07/09 18:22:19 UTC
[incubator-druid] branch 0.12.2 updated: Fix Kafka Indexing task
pause forever if no events in taskDuration (#5656) (#5899) (#5971)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.2 by this push:
new 638f50c Fix Kafka Indexing task pause forever if no events in taskDuration (#5656) (#5899) (#5971)
638f50c is described below
commit 638f50cb52c248f4408975d5fc7762cc9ce82d8e
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Mon Jul 9 11:22:16 2018 -0700
Fix Kafka Indexing task pause forever if no events in taskDuration (#5656) (#5899) (#5971)
* Fix Kafka Indexing task pause forever (#5656)
* Fix Nullpointer Exception in overlord if taskGroups does not contain the groupId
* If the endOffset is same as startOffset, still let the task resume instead of returning
endOffsets early which causes the tasks to pause forever and ultimately fail on timeout
* Address PR comment
*Remove the null check and do not return null from generateSequenceName
---
.../indexing/kafka/supervisor/KafkaSupervisor.java | 11 ++-
.../kafka/supervisor/KafkaSupervisorTest.java | 81 ++++++++++++++++++++++
2 files changed, 86 insertions(+), 6 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 5eb783c..f79b297 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -43,11 +43,8 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.service.ServiceEmitter;
-import io.druid.java.util.emitter.service.ServiceMetricEvent;
-import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexer.TaskLocation;
+import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
@@ -74,6 +71,9 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+import io.druid.java.util.emitter.service.ServiceMetricEvent;
import io.druid.metadata.EntryExistsException;
import io.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.commons.codec.digest.DigestUtils;
@@ -1508,12 +1508,11 @@ public class KafkaSupervisor implements Supervisor
if (endOffsets.equals(taskGroup.sequenceOffsets.lastEntry().getValue())) {
log.warn(
- "Not adding checkpoint [%s] as its same as the start offsets [%s] of latest sequence for the task group [%d]",
+ "Checkpoint [%s] is same as the start offsets [%s] of latest sequence for the task group [%d]",
endOffsets,
taskGroup.sequenceOffsets.lastEntry().getValue(),
groupId
);
- return endOffsets;
}
log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets);
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 3c2b198..c429265 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -1902,6 +1902,87 @@ public class KafkaSupervisorTest extends EasyMockSupport
verifyAll();
}
+ @Test
+ public void testNoDataIngestionTasks() throws Exception
+ {
+ final DateTime startTime = DateTimes.nowUtc();
+ supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+ //not adding any events
+ Task id1 = createKafkaIndexTask(
+ "id1",
+ DATASOURCE,
+ "sequenceName-0",
+ new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
+ new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+ Task id2 = createKafkaIndexTask(
+ "id2",
+ DATASOURCE,
+ "sequenceName-0",
+ new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+ new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+ Task id3 = createKafkaIndexTask(
+ "id3",
+ DATASOURCE,
+ "sequenceName-0",
+ new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+ new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+ expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+ expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
+ expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+ expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+ expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+ expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+ expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+ expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+ expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+ new KafkaDataSourceMetadata(
+ null
+ )
+ ).anyTimes();
+ expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+ expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+ expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+ expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime));
+ expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
+ expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
+
+ TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+ checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+
+ taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
+ replayAll();
+
+ supervisor.start();
+ supervisor.runInternal();
+ verifyAll();
+
+ reset(taskQueue, indexerMetadataStorageCoordinator);
+ expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
+ taskQueue.shutdown("id1");
+ taskQueue.shutdown("id2");
+ taskQueue.shutdown("id3");
+ replay(taskQueue, indexerMetadataStorageCoordinator);
+
+ supervisor.resetInternal(null);
+ verifyAll();
+ }
+
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org