You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/05/23 16:25:41 UTC
[incubator-druid] branch master updated: Remove
LegacyKafkaIndexTaskRunner (#7735)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new eff2be4 Remove LegacyKafkaIndexTaskRunner (#7735)
eff2be4 is described below
commit eff2be4f8f9d7aad0f01516f5425f3ebccaa006c
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Thu May 23 09:25:35 2019 -0700
Remove LegacyKafkaIndexTaskRunner (#7735)
---
.../druid/indexing/kafka/KafkaIndexTask.java | 32 +-
.../indexing/kafka/LegacyKafkaIndexTaskRunner.java | 1239 --------------------
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 82 +-
.../indexing/kinesis/KinesisIndexTaskTest.java | 2 -
.../supervisor/SeekableStreamSupervisor.java | 2 -
5 files changed, 19 insertions(+), 1338 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 314f99c..b0b0c6b 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -28,7 +28,6 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
-import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
@@ -129,28 +128,15 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
@Override
protected SeekableStreamIndexTaskRunner<Integer, Long> createTaskRunner()
{
- if (context != null && context.get(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED) != null
- && ((boolean) context.get(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED))) {
- //noinspection unchecked
- return new IncrementalPublishingKafkaIndexTaskRunner(
- this,
- dataSchema.getParser(),
- authorizerMapper,
- chatHandlerProvider,
- savedParseExceptions,
- rowIngestionMetersFactory
- );
- } else {
- //noinspection unchecked
- return new LegacyKafkaIndexTaskRunner(
- this,
- dataSchema.getParser(),
- authorizerMapper,
- chatHandlerProvider,
- savedParseExceptions,
- rowIngestionMetersFactory
- );
- }
+ //noinspection unchecked
+ return new IncrementalPublishingKafkaIndexTaskRunner(
+ this,
+ dataSchema.getParser(),
+ authorizerMapper,
+ chatHandlerProvider,
+ savedParseExceptions,
+ rowIngestionMetersFactory
+ );
}
@Override
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
deleted file mode 100644
index 5b186c4..0000000
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ /dev/null
@@ -1,1239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.kafka;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import org.apache.druid.data.input.Committer;
-import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.impl.InputRowParser;
-import org.apache.druid.discovery.DiscoveryDruidNode;
-import org.apache.druid.discovery.LookupNodeService;
-import org.apache.druid.discovery.NodeType;
-import org.apache.druid.indexer.IngestionState;
-import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
-import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
-import org.apache.druid.indexing.common.TaskReport;
-import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction;
-import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
-import org.apache.druid.indexing.common.task.IndexTaskUtils;
-import org.apache.druid.indexing.common.task.RealtimeIndexTask;
-import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
-import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
-import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
-import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
-import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
-import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
-import org.apache.druid.indexing.seekablestream.SequenceMetadata;
-import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
-import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
-import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
-import org.apache.druid.indexing.seekablestream.common.StreamPartition;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.collect.Utils;
-import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.segment.indexing.RealtimeIOConfig;
-import org.apache.druid.segment.realtime.FireDepartment;
-import org.apache.druid.segment.realtime.FireDepartmentMetrics;
-import org.apache.druid.segment.realtime.appenderator.Appenderator;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
-import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
-import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata;
-import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
-import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
-import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
-import org.apache.druid.server.security.Access;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.AuthorizerMapper;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.utils.CircularBuffer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
-import org.apache.kafka.common.TopicPartition;
-import org.joda.time.DateTime;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Kafka index task runner which doesn't support incremental segment publishing. We keep this to support rolling update.
- * This class will be removed in a future release.
- */
-@Deprecated
-public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<Integer, Long>
-{
- private static final EmittingLogger log = new EmittingLogger(LegacyKafkaIndexTaskRunner.class);
- private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
-
- private final ConcurrentMap<Integer, Long> endOffsets = new ConcurrentHashMap<>();
- private final ConcurrentMap<Integer, Long> nextOffsets = new ConcurrentHashMap<>();
-
- // The pause lock and associated conditions are to support coordination between the Jetty threads and the main
- // ingestion loop. The goal is to provide callers of the API a guarantee that if pause() returns successfully
- // the ingestion loop has been stopped at the returned offsets and will not ingest any more data until resumed. The
- // fields are used as follows (every step requires acquiring [pauseLock]):
- // Pausing:
- // - In pause(), [pauseRequested] is set to true and then execution waits for [status] to change to PAUSED, with the
- // condition checked when [hasPaused] is signalled.
- // - In possiblyPause() called from the main loop, if [pauseRequested] is true, [status] is set to PAUSED,
- // [hasPaused] is signalled, and execution pauses until [pauseRequested] becomes false, either by being set or by
- // the [pauseMillis] timeout elapsing. [pauseRequested] is checked when [shouldResume] is signalled.
- // Resuming:
- // - In resume(), [pauseRequested] is set to false, [shouldResume] is signalled, and execution waits for [status] to
- // change to something other than PAUSED, with the condition checked when [shouldResume] is signalled.
- // - In possiblyPause(), when [shouldResume] is signalled, if [pauseRequested] has become false the pause loop ends,
- // [status] is changed to STARTING and [shouldResume] is signalled.
-
- private final Lock pauseLock = new ReentrantLock();
- private final Condition hasPaused = pauseLock.newCondition();
- private final Condition shouldResume = pauseLock.newCondition();
-
- private final AtomicBoolean stopRequested = new AtomicBoolean(false);
- private final AtomicBoolean publishOnStop = new AtomicBoolean(false);
-
- // [statusLock] is used to synchronize the Jetty thread calling stopGracefully() with the main run thread. It prevents
- // the main run thread from switching into a publishing state while the stopGracefully() thread thinks it's still in
- // a pre-publishing state. This is important because stopGracefully() will try to use the [stopRequested] flag to stop
- // the main thread where possible, but this flag is not honored once publishing has begun so in this case we must
- // interrupt the thread. The lock ensures that if the run thread is about to transition into publishing state, it
- // blocks until after stopGracefully() has set [stopRequested] and then does a final check on [stopRequested] before
- // transitioning to publishing state.
- private final Object statusLock = new Object();
-
- private final Lock pollRetryLock = new ReentrantLock();
- private final Condition isAwaitingRetry = pollRetryLock.newCondition();
-
- private final KafkaIndexTask task;
- private final KafkaIndexTaskIOConfig ioConfig;
- private final KafkaIndexTaskTuningConfig tuningConfig;
- private final InputRowParser<ByteBuffer> parser;
- private final AuthorizerMapper authorizerMapper;
- private final Optional<ChatHandlerProvider> chatHandlerProvider;
- private final CircularBuffer<Throwable> savedParseExceptions;
- private final RowIngestionMeters rowIngestionMeters;
-
- private volatile DateTime startTime;
- private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread)
- private volatile ObjectMapper objectMapper;
- private volatile Thread runThread;
- private volatile Appenderator appenderator;
- private volatile StreamAppenderatorDriver driver;
- private volatile FireDepartmentMetrics fireDepartmentMetrics;
- private volatile IngestionState ingestionState;
-
- private volatile boolean pauseRequested;
-
- LegacyKafkaIndexTaskRunner(
- KafkaIndexTask task,
- InputRowParser<ByteBuffer> parser,
- AuthorizerMapper authorizerMapper,
- Optional<ChatHandlerProvider> chatHandlerProvider,
- CircularBuffer<Throwable> savedParseExceptions,
- RowIngestionMetersFactory rowIngestionMetersFactory
- )
- {
- super(
- task,
- parser,
- authorizerMapper,
- chatHandlerProvider,
- savedParseExceptions,
- rowIngestionMetersFactory
- );
- this.task = task;
- this.ioConfig = task.getIOConfig();
- this.tuningConfig = task.getTuningConfig();
- this.parser = parser;
- this.authorizerMapper = authorizerMapper;
- this.chatHandlerProvider = chatHandlerProvider;
- this.savedParseExceptions = savedParseExceptions;
- this.rowIngestionMeters = rowIngestionMetersFactory.createRowIngestionMeters();
-
- this.endOffsets.putAll(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
- this.ingestionState = IngestionState.NOT_STARTED;
- }
-
- @Override
- public TaskStatus run(TaskToolbox toolbox)
- {
- try {
- return runInternal(toolbox);
- }
- catch (Exception e) {
- log.error(e, "Encountered exception while running task.");
- final String errorMsg = Throwables.getStackTraceAsString(e);
- toolbox.getTaskReportFileWriter().write(getTaskCompletionReports(errorMsg));
- return TaskStatus.failure(
- task.getId(),
- errorMsg
- );
- }
- }
-
- @Override
- public Appenderator getAppenderator()
- {
- return appenderator;
- }
-
- @Override
- public RowIngestionMeters getRowIngestionMeters()
- {
- return rowIngestionMeters;
- }
-
- private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
- {
- log.info("Starting up!");
- startTime = DateTimes.nowUtc();
- status = Status.STARTING;
- objectMapper = toolbox.getObjectMapper();
-
- if (chatHandlerProvider.isPresent()) {
- log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
- chatHandlerProvider.get().register(task.getId(), this, false);
- } else {
- log.warn("No chat handler detected");
- }
-
- runThread = Thread.currentThread();
-
- // Set up FireDepartmentMetrics
- final FireDepartment fireDepartmentForMetrics = new FireDepartment(
- task.getDataSchema(),
- new RealtimeIOConfig(null, null, null),
- null
- );
- fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
- toolbox.getMonitorScheduler()
- .addMonitor(TaskRealtimeMetricsMonitorBuilder.build(task, fireDepartmentForMetrics, rowIngestionMeters));
-
- final String lookupTier = task.getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER);
- LookupNodeService lookupNodeService = lookupTier == null ?
- toolbox.getLookupNodeService() :
- new LookupNodeService(lookupTier);
- DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
- toolbox.getDruidNode(),
- NodeType.PEON,
- ImmutableMap.of(
- toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
- lookupNodeService.getName(), lookupNodeService
- )
- );
-
- ingestionState = IngestionState.BUILD_SEGMENTS;
-
- try (
- final Appenderator appenderator0 = task.newAppenderator(fireDepartmentMetrics, toolbox);
- final StreamAppenderatorDriver driver = task.newDriver(appenderator0, toolbox, fireDepartmentMetrics);
- final KafkaConsumer<byte[], byte[]> consumer = task.newConsumer()
- ) {
- toolbox.getDataSegmentServerAnnouncer().announce();
- toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
-
- appenderator = appenderator0;
-
- final String topic = ioConfig.getStartSequenceNumbers().getStream();
-
- // Start up, set up initial offsets.
- final Object restoredMetadata = driver.startJob();
- if (restoredMetadata == null) {
- nextOffsets.putAll(ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
- } else {
- final Map<String, Object> restoredMetadataMap = (Map) restoredMetadata;
- final SeekableStreamEndSequenceNumbers<Integer, Long> restoredNextPartitions = toolbox.getObjectMapper().convertValue(
- restoredMetadataMap.get(METADATA_NEXT_PARTITIONS),
- toolbox.getObjectMapper().getTypeFactory().constructParametrizedType(
- SeekableStreamStartSequenceNumbers.class,
- SeekableStreamStartSequenceNumbers.class,
- Integer.class,
- Long.class
- )
- );
- nextOffsets.putAll(restoredNextPartitions.getPartitionSequenceNumberMap());
-
- // Sanity checks.
- if (!restoredNextPartitions.getStream().equals(ioConfig.getStartSequenceNumbers().getStream())) {
- throw new ISE(
- "WTF?! Restored topic[%s] but expected topic[%s]",
- restoredNextPartitions.getStream(),
- ioConfig.getStartSequenceNumbers().getStream()
- );
- }
-
- if (!nextOffsets.keySet().equals(ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet())) {
- throw new ISE(
- "WTF?! Restored partitions[%s] but expected partitions[%s]",
- nextOffsets.keySet(),
- ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet()
- );
- }
- }
-
- // Set up sequenceNames.
- final Map<Integer, String> sequenceNames = new HashMap<>();
- for (Integer partitionNum : nextOffsets.keySet()) {
- sequenceNames.put(partitionNum, StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), partitionNum));
- }
-
- // Set up committer.
- final Supplier<Committer> committerSupplier = new Supplier<Committer>()
- {
- @Override
- public Committer get()
- {
- final Map<Integer, Long> snapshot = ImmutableMap.copyOf(nextOffsets);
-
- return new Committer()
- {
- @Override
- public Object getMetadata()
- {
- return ImmutableMap.of(
- METADATA_NEXT_PARTITIONS,
- new SeekableStreamEndSequenceNumbers<>(
- ioConfig.getStartSequenceNumbers().getStream(),
- snapshot
- )
- );
- }
-
- @Override
- public void run()
- {
- // Do nothing.
- }
- };
- }
- };
-
- Set<Integer> assignment = assignPartitionsAndSeekToNext(consumer, topic);
-
- // Main loop.
- // Could eventually support leader/follower mode (for keeping replicas more in sync)
- boolean stillReading = !assignment.isEmpty();
- status = Status.READING;
- try {
- while (stillReading) {
- if (possiblyPause()) {
- // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign
- // partitions upon resuming. This is safe even if the end offsets have not been modified.
- assignment = assignPartitionsAndSeekToNext(consumer, topic);
-
- if (assignment.isEmpty()) {
- log.info("All partitions have been fully read");
- publishOnStop.set(true);
- stopRequested.set(true);
- }
- }
-
- if (stopRequested.get()) {
- break;
- }
-
- // The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to
- // offset is not present in the topic-partition. This can happen if we're asking a task to read from data
- // that has not been written yet (which is totally legitimate). So let's wait for it to show up.
- ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
- try {
- records = consumer.poll(task.getIOConfig().getPollTimeout());
- }
- catch (OffsetOutOfRangeException e) {
- log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
- possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox);
- stillReading = !assignment.isEmpty();
- }
-
- for (ConsumerRecord<byte[], byte[]> record : records) {
- if (log.isTraceEnabled()) {
- log.trace(
- "Got topic[%s] partition[%d] offset[%,d].",
- record.topic(),
- record.partition(),
- record.offset()
- );
- }
-
- if (record.offset() < endOffsets.get(record.partition())) {
-
- try {
- final byte[] valueBytes = record.value();
- final List<InputRow> rows = valueBytes == null
- ? Utils.nullableListOf((InputRow) null)
- : parser.parseBatch(ByteBuffer.wrap(valueBytes));
- boolean isPersistRequired = false;
- final Map<String, Set<SegmentIdWithShardSpec>> segmentsToMoveOut = new HashMap<>();
-
- for (InputRow row : rows) {
- if (row != null && task.withinMinMaxRecordTime(row)) {
- final String sequenceName = sequenceNames.get(record.partition());
- final AppenderatorDriverAddResult addResult = driver.add(
- row,
- sequenceName,
- committerSupplier,
- false,
- false
- );
-
- if (addResult.isOk()) {
- // If the number of rows in the segment exceeds the threshold after adding a row,
- // move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
- if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) {
- segmentsToMoveOut.computeIfAbsent(sequenceName, k -> new HashSet<>())
- .add(addResult.getSegmentIdentifier());
- }
- isPersistRequired |= addResult.isPersistRequired();
- } else {
- // Failure to allocate segment puts determinism at risk, bail out to be safe.
- // May want configurable behavior here at some point.
- // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
- throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
- }
-
- if (addResult.getParseException() != null) {
- handleParseException(addResult.getParseException(), record);
- } else {
- rowIngestionMeters.incrementProcessed();
- }
- } else {
- rowIngestionMeters.incrementThrownAway();
- }
- }
-
- if (isPersistRequired) {
- driver.persist(committerSupplier.get());
- }
- segmentsToMoveOut.forEach((String sequence, Set<SegmentIdWithShardSpec> segments) -> {
- driver.moveSegmentOut(sequence, new ArrayList<>(segments));
- });
- }
- catch (ParseException e) {
- handleParseException(e, record);
- }
-
- nextOffsets.put(record.partition(), record.offset() + 1);
- }
-
- if (nextOffsets.get(record.partition()) >= (endOffsets.get(record.partition()))
- && assignment.remove(record.partition())) {
- log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
- KafkaIndexTask.assignPartitions(consumer, topic, assignment);
- stillReading = !assignment.isEmpty();
- }
- }
- }
- ingestionState = IngestionState.COMPLETED;
- }
- catch (Exception e) {
- log.error(e, "Encountered exception in runLegacy() before persisting.");
- throw e;
- }
- finally {
- driver.persist(committerSupplier.get()); // persist pending data
- }
-
- synchronized (statusLock) {
- if (stopRequested.get() && !publishOnStop.get()) {
- throw new InterruptedException("Stopping without publishing");
- }
-
- status = Status.PUBLISHING;
- }
-
- final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> {
- final SeekableStreamEndSequenceNumbers<Integer, Long> finalPartitions = toolbox.getObjectMapper().convertValue(
- ((Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata")).get(METADATA_NEXT_PARTITIONS),
- toolbox.getObjectMapper()
- .getTypeFactory()
- .constructParametrizedType(
- SeekableStreamEndSequenceNumbers.class,
- SeekableStreamEndSequenceNumbers.class,
- Integer.class,
- Long.class
- )
- );
-
- // Sanity check, we should only be publishing things that match our desired end state.
- if (!endOffsets.equals(finalPartitions.getPartitionSequenceNumberMap())) {
- throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata);
- }
-
- final SegmentTransactionalInsertAction action;
-
- if (ioConfig.isUseTransaction()) {
- action = new SegmentTransactionalInsertAction(
- segments,
- new KafkaDataSourceMetadata(ioConfig.getStartSequenceNumbers()),
- new KafkaDataSourceMetadata(finalPartitions)
- );
- } else {
- action = new SegmentTransactionalInsertAction(segments, null, null);
- }
-
- log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction());
-
- return toolbox.getTaskActionClient().submit(action);
- };
-
- // Supervised kafka tasks are killed by KafkaSupervisor if they are stuck during publishing segments or waiting
- // for hand off. See KafkaSupervisorIOConfig.completionTimeout.
- final SegmentsAndMetadata published = driver.publish(
- publisher,
- committerSupplier.get(),
- sequenceNames.values()
- ).get();
-
- List<?> publishedSegmentIds = Lists.transform(published.getSegments(), DataSegment::getId);
- log.info(
- "Published segments %s with metadata[%s].",
- publishedSegmentIds,
- Preconditions.checkNotNull(published.getCommitMetadata(), "commitMetadata")
- );
-
- final Future<SegmentsAndMetadata> handoffFuture = driver.registerHandoff(published);
- SegmentsAndMetadata handedOff = null;
- if (tuningConfig.getHandoffConditionTimeout() == 0) {
- handedOff = handoffFuture.get();
- } else {
- try {
- handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
- }
- catch (TimeoutException e) {
- log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout())
- .addData("TaskId", task.getId())
- .emit();
- }
- }
-
- if (handedOff == null) {
- log.warn("Failed to handoff segments %s", publishedSegmentIds);
- } else {
- log.info(
- "Handoff completed for segments %s with metadata[%s]",
- Lists.transform(handedOff.getSegments(), DataSegment::getId),
- Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata")
- );
- }
- }
- catch (InterruptedException | RejectedExecutionException e) {
- // handle the InterruptedException that gets wrapped in a RejectedExecutionException
- if (e instanceof RejectedExecutionException
- && (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) {
- throw e;
- }
-
- // if we were interrupted because we were asked to stop, handle the exception and return success, else rethrow
- if (!stopRequested.get()) {
- Thread.currentThread().interrupt();
- throw e;
- }
-
- log.info("The task was asked to stop before completing");
- }
- finally {
- if (chatHandlerProvider.isPresent()) {
- chatHandlerProvider.get().unregister(task.getId());
- }
-
- toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
- toolbox.getDataSegmentServerAnnouncer().unannounce();
- }
-
- toolbox.getTaskReportFileWriter().write(getTaskCompletionReports(null));
- return TaskStatus.success(
- task.getId(),
- null
- );
- }
-
- @Override
- protected boolean isEndOfShard(Long seqNum)
- {
- return false;
- }
-
- @Override
- public TypeReference<List<SequenceMetadata<Integer, Long>>> getSequenceMetadataTypeReference()
- {
- return new TypeReference<List<SequenceMetadata<Integer, Long>>>()
- {
- };
- }
-
- @Nonnull
- @Override
- protected List<OrderedPartitionableRecord<Integer, Long>> getRecords(
- RecordSupplier<Integer, Long> recordSupplier, TaskToolbox toolbox
- )
- {
- throw new UnsupportedOperationException();
- }
-
- private Set<Integer> assignPartitionsAndSeekToNext(KafkaConsumer consumer, String topic)
- {
- // Initialize consumer assignment.
- final Set<Integer> assignment = new HashSet<>();
- for (Map.Entry<Integer, Long> entry : nextOffsets.entrySet()) {
- final long endOffset = endOffsets.get(entry.getKey());
- if (entry.getValue() < endOffset) {
- assignment.add(entry.getKey());
- } else if (entry.getValue() == endOffset) {
- log.info("Finished reading partition[%d].", entry.getKey());
- } else {
- throw new ISE(
- "WTF?! Cannot start from offset[%,d] > endOffset[%,d]",
- entry.getValue(),
- endOffset
- );
- }
- }
-
- KafkaIndexTask.assignPartitions(consumer, topic, assignment);
-
- // Seek to starting offsets.
- for (final int partition : assignment) {
- final long offset = nextOffsets.get(partition);
- log.info("Seeking partition[%d] to offset[%,d].", partition, offset);
- consumer.seek(new TopicPartition(topic, partition), offset);
- }
-
- return assignment;
- }
-
- /**
- * Checks if the pauseRequested flag was set and if so blocks until pauseRequested is cleared.
- * <p/>
- * Sets paused = true and signals paused so callers can be notified when the pause command has been accepted.
- * <p/>
- *
- * @return true if a pause request was handled, false otherwise
- */
- private boolean possiblyPause() throws InterruptedException
- {
- pauseLock.lockInterruptibly();
- try {
- if (pauseRequested) {
- status = Status.PAUSED;
- hasPaused.signalAll();
-
- while (pauseRequested) {
- log.info("Pausing ingestion until resumed");
- shouldResume.await();
- }
-
- status = Status.READING;
- shouldResume.signalAll();
- log.info("Ingestion loop resumed");
- return true;
- }
- }
- finally {
- pauseLock.unlock();
- }
-
- return false;
- }
-
- @Override
- protected void possiblyResetDataSourceMetadata(
- TaskToolbox toolbox,
- RecordSupplier<Integer, Long> recordSupplier,
- Set<StreamPartition<Integer>> assignment
- )
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected boolean isEndOffsetExclusive()
- {
- return true;
- }
-
- @Override
- protected SeekableStreamEndSequenceNumbers<Integer, Long> deserializePartitionsFromMetadata(
- ObjectMapper mapper,
- Object object
- )
- {
- throw new UnsupportedOperationException();
- }
-
- private void possiblyResetOffsetsOrWait(
- Map<TopicPartition, Long> outOfRangePartitions,
- KafkaConsumer<byte[], byte[]> consumer,
- TaskToolbox taskToolbox
- ) throws InterruptedException, IOException
- {
- final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
- boolean doReset = false;
- if (tuningConfig.isResetOffsetAutomatically()) {
- for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
- final TopicPartition topicPartition = outOfRangePartition.getKey();
- final long nextOffset = outOfRangePartition.getValue();
- // seek to the beginning to get the least available offset
- consumer.seekToBeginning(Collections.singletonList(topicPartition));
- final long leastAvailableOffset = consumer.position(topicPartition);
- // reset the seek
- consumer.seek(topicPartition, nextOffset);
- // Reset consumer offset if resetOffsetAutomatically is set to true
- // and the current message offset in the kafka partition is more than the
- // next message offset that we are trying to fetch
- if (leastAvailableOffset > nextOffset) {
- doReset = true;
- resetPartitions.put(topicPartition, nextOffset);
- }
- }
- }
-
- if (doReset) {
- sendResetRequestAndWaitLegacy(resetPartitions, taskToolbox);
- } else {
- log.warn("Retrying in %dms", task.getPollRetryMs());
- pollRetryLock.lockInterruptibly();
- try {
- long nanos = TimeUnit.MILLISECONDS.toNanos(task.getPollRetryMs());
- while (nanos > 0L && !pauseRequested && !stopRequested.get()) {
- nanos = isAwaitingRetry.awaitNanos(nanos);
- }
- }
- finally {
- pollRetryLock.unlock();
- }
- }
- }
-
- private void sendResetRequestAndWaitLegacy(Map<TopicPartition, Long> outOfRangePartitions, TaskToolbox taskToolbox)
- throws IOException
- {
- Map<Integer, Long> partitionOffsetMap = new HashMap<>();
- for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
- partitionOffsetMap.put(outOfRangePartition.getKey().partition(), outOfRangePartition.getValue());
- }
- boolean result = taskToolbox.getTaskActionClient()
- .submit(new ResetDataSourceMetadataAction(
- task.getDataSource(),
- new KafkaDataSourceMetadata(
- new SeekableStreamStartSequenceNumbers<>(
- ioConfig.getStartSequenceNumbers().getStream(),
- partitionOffsetMap,
- Collections.emptySet()
- )
- )
- ));
-
- if (result) {
- log.makeAlert("Resetting Kafka offsets for datasource [%s]", task.getDataSource())
- .addData("partitions", partitionOffsetMap.keySet())
- .emit();
- // wait for being killed by supervisor
- requestPause();
- } else {
- log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit();
- }
- }
-
- private void requestPause()
- {
- pauseRequested = true;
- }
-
- @Override
- protected Long getNextStartOffset(Long sequenceNumber)
- {
- throw new UnsupportedOperationException();
- }
-
- private void handleParseException(ParseException pe, ConsumerRecord<byte[], byte[]> record)
- {
- if (pe.isFromPartiallyValidRow()) {
- rowIngestionMeters.incrementProcessedWithError();
- } else {
- rowIngestionMeters.incrementUnparseable();
- }
-
- if (tuningConfig.isLogParseExceptions()) {
- log.error(
- pe,
- "Encountered parse exception on row from partition[%d] offset[%d]",
- record.partition(),
- record.offset()
- );
- }
-
- if (savedParseExceptions != null) {
- savedParseExceptions.add(pe);
- }
-
- if (rowIngestionMeters.getUnparseable() + rowIngestionMeters.getProcessedWithError()
- > tuningConfig.getMaxParseExceptions()) {
- log.error("Max parse exceptions exceeded, terminating task...");
- throw new RuntimeException("Max parse exceptions exceeded, terminating task...");
- }
- }
-
- private Map<String, TaskReport> getTaskCompletionReports(@Nullable String errorMsg)
- {
- return TaskReport.buildTaskReports(
- new IngestionStatsAndErrorsTaskReport(
- task.getId(),
- new IngestionStatsAndErrorsTaskReportData(
- ingestionState,
- getTaskCompletionUnparseableEvents(),
- getTaskCompletionRowStats(),
- errorMsg
- )
- )
- );
- }
-
- private Map<String, Object> getTaskCompletionUnparseableEvents()
- {
- Map<String, Object> unparseableEventsMap = new HashMap<>();
- List<String> buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(
- savedParseExceptions
- );
- if (buildSegmentsParseExceptionMessages != null) {
- unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsParseExceptionMessages);
- }
- return unparseableEventsMap;
- }
-
- private Map<String, Object> getTaskCompletionRowStats()
- {
- Map<String, Object> metrics = new HashMap<>();
- metrics.put(
- RowIngestionMeters.BUILD_SEGMENTS,
- rowIngestionMeters.getTotals()
- );
- return metrics;
- }
-
- @Override
- public void stopGracefully()
- {
- log.info("Stopping gracefully (status: [%s])", status);
- stopRequested.set(true);
-
- synchronized (statusLock) {
- if (status == Status.PUBLISHING) {
- runThread.interrupt();
- return;
- }
- }
-
- try {
- if (pauseLock.tryLock(SeekableStreamIndexTask.LOCK_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
- try {
- if (pauseRequested) {
- pauseRequested = false;
- shouldResume.signalAll();
- }
- }
- finally {
- pauseLock.unlock();
- }
- } else {
- log.warn("While stopping: failed to acquire pauseLock before timeout, interrupting run thread");
- runThread.interrupt();
- return;
- }
-
- if (pollRetryLock.tryLock(SeekableStreamIndexTask.LOCK_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
- try {
- isAwaitingRetry.signalAll();
- }
- finally {
- pollRetryLock.unlock();
- }
- } else {
- log.warn("While stopping: failed to acquire pollRetryLock before timeout, interrupting run thread");
- runThread.interrupt();
- }
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Authorizes action to be performed on this task's datasource
- *
- * @return authorization result
- */
- private Access authorizationCheck(final HttpServletRequest req, Action action)
- {
- return IndexTaskUtils.datasourceAuthorizationCheck(req, action, task.getDataSource(), authorizerMapper);
- }
-
- @Override
- @POST
- @Path("/stop")
- public Response stop(@Context final HttpServletRequest req)
- {
- authorizationCheck(req, Action.WRITE);
- stopGracefully();
- return Response.status(Response.Status.OK).build();
- }
-
- @Override
- @GET
- @Path("/status")
- @Produces(MediaType.APPLICATION_JSON)
- public Status getStatusHTTP(@Context final HttpServletRequest req)
- {
- authorizationCheck(req, Action.READ);
- return status;
- }
-
- @Override
- public Status getStatus()
- {
- return status;
- }
-
- @Override
- @GET
- @Path("/offsets/current")
- @Produces(MediaType.APPLICATION_JSON)
- public Map<Integer, Long> getCurrentOffsets(@Context final HttpServletRequest req)
- {
- authorizationCheck(req, Action.READ);
- return getCurrentOffsets();
- }
-
- @Override
- public ConcurrentMap<Integer, Long> getCurrentOffsets()
- {
- return nextOffsets;
- }
-
- @Override
- @GET
- @Path("/offsets/end")
- @Produces(MediaType.APPLICATION_JSON)
- public Map<Integer, Long> getEndOffsetsHTTP(@Context final HttpServletRequest req)
- {
- authorizationCheck(req, Action.READ);
- return getEndOffsets();
- }
-
- @Override
- public Map<Integer, Long> getEndOffsets()
- {
- return endOffsets;
- }
-
- @Override
- public Response setEndOffsets(Map<Integer, Long> sequenceNumbers, boolean finish) throws InterruptedException
- {
- // finish is not used in this mode
- return setEndOffsets(sequenceNumbers);
- }
-
- @POST
- @Path("/offsets/end")
- @Consumes(MediaType.APPLICATION_JSON)
- @Produces(MediaType.APPLICATION_JSON)
- public Response setEndOffsetsHTTP(
- Map<Integer, Long> offsets,
- @Context final HttpServletRequest req
- ) throws InterruptedException
- {
- authorizationCheck(req, Action.WRITE);
- return setEndOffsets(offsets);
- }
-
- @Override
- @GET
- @Path("/rowStats")
- @Produces(MediaType.APPLICATION_JSON)
- public Response getRowStats(
- @Context final HttpServletRequest req
- )
- {
- authorizationCheck(req, Action.READ);
- Map<String, Object> returnMap = new HashMap<>();
- Map<String, Object> totalsMap = new HashMap<>();
- Map<String, Object> averagesMap = new HashMap<>();
-
- totalsMap.put(
- RowIngestionMeters.BUILD_SEGMENTS,
- rowIngestionMeters.getTotals()
- );
- averagesMap.put(
- RowIngestionMeters.BUILD_SEGMENTS,
- rowIngestionMeters.getMovingAverages()
- );
-
- returnMap.put("movingAverages", averagesMap);
- returnMap.put("totals", totalsMap);
- return Response.ok(returnMap).build();
- }
-
- @Override
- @GET
- @Path("/unparseableEvents")
- @Produces(MediaType.APPLICATION_JSON)
- public Response getUnparseableEvents(
- @Context final HttpServletRequest req
- )
- {
- authorizationCheck(req, Action.READ);
- List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions);
- return Response.ok(events).build();
- }
-
- public Response setEndOffsets(
- Map<Integer, Long> offsets
- ) throws InterruptedException
- {
- if (offsets == null) {
- return Response.status(Response.Status.BAD_REQUEST)
- .entity("Request body must contain a map of { partition:endOffset }")
- .build();
- } else if (!endOffsets.keySet().containsAll(offsets.keySet())) {
- return Response.status(Response.Status.BAD_REQUEST)
- .entity(
- StringUtils.format(
- "Request contains partitions not being handled by this task, my partitions: %s",
- endOffsets.keySet()
- )
- )
- .build();
- }
-
- pauseLock.lockInterruptibly();
- try {
- if (!isPaused()) {
- return Response.status(Response.Status.BAD_REQUEST)
- .entity("Task must be paused before changing the end offsets")
- .build();
- }
-
- for (Map.Entry<Integer, Long> entry : offsets.entrySet()) {
- if (entry.getValue().compareTo(nextOffsets.get(entry.getKey())) < 0) {
- return Response.status(Response.Status.BAD_REQUEST)
- .entity(
- StringUtils.format(
- "End offset must be >= current offset for partition [%s] (current: %s)",
- entry.getKey(),
- nextOffsets.get(entry.getKey())
- )
- )
- .build();
- }
- }
-
- endOffsets.putAll(offsets);
- log.info("endOffsets changed to %s", endOffsets);
- }
- finally {
- pauseLock.unlock();
- }
-
- resume();
-
- return Response.ok(endOffsets).build();
- }
-
- private boolean isPaused()
- {
- return status == Status.PAUSED;
- }
-
- /**
- * Signals the ingestion loop to pause.
- *
- * @return one of the following Responses: 400 Bad Request if the task has started publishing; 202 Accepted if the
- * method has timed out and returned before the task has paused; 200 OK with a map of the current partition offsets
- * in the response body if the task successfully paused
- */
- @Override
- @POST
- @Path("/pause")
- @Produces(MediaType.APPLICATION_JSON)
- public Response pauseHTTP(
- @Context final HttpServletRequest req
- ) throws InterruptedException
- {
- authorizationCheck(req, Action.WRITE);
- return pause();
- }
-
- @Override
- public Response pause() throws InterruptedException
- {
- if (!(status == Status.PAUSED || status == Status.READING)) {
- return Response.status(Response.Status.BAD_REQUEST)
- .entity(StringUtils.format("Can't pause, task is not in a pausable state (state: [%s])", status))
- .build();
- }
-
- pauseLock.lockInterruptibly();
- try {
- pauseRequested = true;
-
- pollRetryLock.lockInterruptibly();
- try {
- isAwaitingRetry.signalAll();
- }
- finally {
- pollRetryLock.unlock();
- }
-
- if (isPaused()) {
- shouldResume.signalAll(); // kick the monitor so it re-awaits with the new pauseMillis
- }
-
- long nanos = TimeUnit.SECONDS.toNanos(2);
- while (!isPaused()) {
- if (nanos <= 0L) {
- return Response.status(Response.Status.ACCEPTED)
- .entity("Request accepted but task has not yet paused")
- .build();
- }
- nanos = hasPaused.awaitNanos(nanos);
- }
- }
- finally {
- pauseLock.unlock();
- }
-
- try {
- return Response.ok().entity(objectMapper.writeValueAsString(getCurrentOffsets())).build();
- }
- catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- @POST
- @Path("/resume")
- public Response resumeHTTP(@Context final HttpServletRequest req) throws InterruptedException
- {
- authorizationCheck(req, Action.WRITE);
- resume();
- return Response.status(Response.Status.OK).build();
- }
-
- @Override
- public void resume() throws InterruptedException
- {
- pauseLock.lockInterruptibly();
- try {
- pauseRequested = false;
- shouldResume.signalAll();
-
- long nanos = TimeUnit.SECONDS.toNanos(5);
- while (isPaused()) {
- if (nanos <= 0L) {
- throw new RuntimeException("Resume command was not accepted within 5 seconds");
- }
- nanos = shouldResume.awaitNanos(nanos);
- }
- }
- finally {
- pauseLock.unlock();
- }
- }
-
- @Override
- protected SeekableStreamDataSourceMetadata<Integer, Long> createDataSourceMetadata(
- SeekableStreamSequenceNumbers<Integer, Long> partitions
- )
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected OrderedSequenceNumber<Long> createSequenceNumber(Long sequenceNumber)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- @GET
- @Path("/time/start")
- @Produces(MediaType.APPLICATION_JSON)
- public DateTime getStartTime(@Context final HttpServletRequest req)
- {
- authorizationCheck(req, Action.WRITE);
- return startTime;
- }
-
- @Nullable
- @Override
- protected TreeMap<Integer, Map<Integer, Long>> getCheckPointsFromContext(
- TaskToolbox toolbox,
- String checkpointsString
- )
- {
- throw new UnsupportedOperationException();
- }
-
-}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 4c9b39b..783af47 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -140,8 +140,6 @@ import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
-import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
@@ -165,8 +163,6 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
@@ -191,7 +187,6 @@ import java.util.concurrent.TimeoutException;
import static org.apache.druid.query.QueryPlus.wrap;
-@RunWith(Parameterized.class)
public class KafkaIndexTaskTest
{
private static final Logger log = new Logger(KafkaIndexTaskTest.class);
@@ -228,24 +223,10 @@ public class KafkaIndexTaskTest
private File directory;
private String topic;
private List<ProducerRecord<byte[], byte[]>> records;
- private final boolean isIncrementalHandoffSupported;
private final Set<Integer> checkpointRequestsHash = new HashSet<>();
private File reportsFile;
private RowIngestionMetersFactory rowIngestionMetersFactory;
- // This should be removed in versions greater that 0.12.x
- // isIncrementalHandoffSupported should always be set to true in those later versions
- @Parameterized.Parameters(name = "isIncrementalHandoffSupported = {0}")
- public static Iterable<Object[]> constructorFeeder()
- {
- return ImmutableList.of(new Object[]{true}, new Object[]{false});
- }
-
- public KafkaIndexTaskTest(boolean isIncrementalHandoffSupported)
- {
- this.isIncrementalHandoffSupported = isIncrementalHandoffSupported;
- }
-
private static final DataSchema DATA_SCHEMA = new DataSchema(
"test_ds",
OBJECT_MAPPER.convertValue(
@@ -502,9 +483,6 @@ public class KafkaIndexTaskTest
@Test(timeout = 60_000L)
public void testIncrementalHandOff() throws Exception
{
- if (!isIncrementalHandoffSupported) {
- return;
- }
final String baseSequenceName = "sequence0";
// as soon as any segment has more than one record, incremental publishing should happen
maxRowsPerSegment = 2;
@@ -608,9 +586,6 @@ public class KafkaIndexTaskTest
@Test(timeout = 60_000L)
public void testIncrementalHandOffMaxTotalRows() throws Exception
{
- if (!isIncrementalHandoffSupported) {
- return;
- }
final String baseSequenceName = "sequence0";
// incremental publish should happen every 3 records
maxRowsPerSegment = Integer.MAX_VALUE;
@@ -763,9 +738,6 @@ public class KafkaIndexTaskTest
@Test(timeout = 60_000L)
public void testTimeBasedIncrementalHandOff() throws Exception
{
- if (!isIncrementalHandoffSupported) {
- return;
- }
final String baseSequenceName = "sequence0";
// as soon as any segment hits maxRowsPerSegment or intermediateHandoffPeriod, incremental publishing should happen
maxRowsPerSegment = Integer.MAX_VALUE;
@@ -853,9 +825,6 @@ public class KafkaIndexTaskTest
@Test(timeout = 60_000L)
public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
{
- if (!isIncrementalHandoffSupported) {
- return;
- }
records = generateSinglePartitionRecords(topic);
final String baseSequenceName = "sequence0";
@@ -1675,9 +1644,7 @@ public class KafkaIndexTaskTest
// desc3 will not be created in KafkaIndexTask (0.12.x) as it does not create per Kafka partition Druid segments
SegmentDescriptor desc3 = sd(task, "2011/P1D", 1);
SegmentDescriptor desc4 = sd(task, "2012/P1D", 0);
- Assert.assertEquals(isIncrementalHandoffSupported
- ? ImmutableSet.of(desc1, desc2, desc4)
- : ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
+ Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc4), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L, 1, 2L))
@@ -1691,12 +1658,8 @@ public class KafkaIndexTaskTest
// Check desc2/desc3 without strong ordering because two partitions are interleaved nondeterministically
Assert.assertEquals(
- isIncrementalHandoffSupported
- ? ImmutableSet.of(ImmutableList.of("d", "e", "h"))
- : ImmutableSet.of(ImmutableList.of("d", "e"), ImmutableList.of("h")),
- isIncrementalHandoffSupported
- ? ImmutableSet.of(readSegmentColumn("dim1", desc2))
- : ImmutableSet.of(readSegmentColumn("dim1", desc2), readSegmentColumn("dim1", desc3))
+ ImmutableSet.of(ImmutableList.of("d", "e", "h")),
+ ImmutableSet.of(readSegmentColumn("dim1", desc2))
);
}
@@ -1867,10 +1830,6 @@ public class KafkaIndexTaskTest
@Test(timeout = 60_000L)
public void testRestoreAfterPersistingSequences() throws Exception
{
- if (!isIncrementalHandoffSupported) {
- return;
- }
-
records = generateSinglePartitionRecords(topic);
maxRowsPerSegment = 2;
Map<String, Object> consumerProps = kafkaServer.consumerProperties();
@@ -2136,12 +2095,6 @@ public class KafkaIndexTaskTest
@Test(timeout = 60_000L)
public void testRunContextSequenceAheadOfStartingOffsets() throws Exception
{
- // This tests the case when a replacement task is created in place of a failed test
- // which has done some incremental handoffs, thus the context will contain starting
- // sequence offsets from which the task should start reading and ignore the start offsets
- if (!isIncrementalHandoffSupported) {
- return;
- }
// Insert data
insertData();
@@ -2344,9 +2297,6 @@ public class KafkaIndexTaskTest
@Test(timeout = 60_000L)
public void testCanStartFromLaterThanEarliestOffset() throws Exception
{
- if (!isIncrementalHandoffSupported) {
- return;
- }
final String baseSequenceName = "sequence0";
maxRowsPerSegment = Integer.MAX_VALUE;
maxTotalRows = null;
@@ -2542,17 +2492,13 @@ public class KafkaIndexTaskTest
maxParseExceptions,
maxSavedParseExceptions
);
- if (isIncrementalHandoffSupported) {
- context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
-
- if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
- final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
- checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
- final String checkpointsJson = OBJECT_MAPPER
- .writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF)
- .writeValueAsString(checkpoints);
- context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson);
- }
+ if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
+ final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+ checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
+ final String checkpointsJson = OBJECT_MAPPER
+ .writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF)
+ .writeValueAsString(checkpoints);
+ context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson);
}
final KafkaIndexTask task = new KafkaIndexTask(
@@ -2736,14 +2682,6 @@ public class KafkaIndexTaskTest
final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig();
dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);
- SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
- {
- @Override
- public List<StorageLocationConfig> getLocations()
- {
- return new ArrayList<>();
- }
- };
toolboxFactory = new TaskToolboxFactory(
taskConfig,
taskActionClientFactory,
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index a04556d..ef5c254 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -2664,8 +2664,6 @@ public class KinesisIndexTaskTest extends EasyMockSupport
) throws JsonProcessingException
{
if (context != null) {
- context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
-
if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 966b76c..755f630 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -121,7 +121,6 @@ import java.util.stream.Stream;
*/
public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType> implements Supervisor
{
- public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED";
public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
private static final long MAX_RUN_FREQUENCY_MILLIS = 1000;
@@ -2752,7 +2751,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
protected Map<String, Object> createBaseTaskContexts()
{
final Map<String, Object> contexts = new HashMap<>();
- contexts.put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
if (spec.getContext() != null) {
contexts.putAll(spec.getContext());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org