You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by ji...@apache.org on 2018/07/09 20:49:31 UTC
[incubator-druid] branch 0.12.2 updated: Fix the broken
Appenderator contract in KafkaIndexTask (#5905) (#5983)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.2 by this push:
new c637b3e Fix the broken Appenderator contract in KafkaIndexTask (#5905) (#5983)
c637b3e is described below
commit c637b3eb4a823f1f3c64a6366979df10db34f3aa
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Mon Jul 9 13:49:28 2018 -0700
Fix the broken Appenderator contract in KafkaIndexTask (#5905) (#5983)
* Fix broken Appenderator contract in KafkaIndexTask
* fix build
* add publishFuture
* reuse sequenceToUse if possible
---
.../io/druid/indexing/kafka/KafkaIOConfig.java | 11 -
.../io/druid/indexing/kafka/KafkaIndexTask.java | 1068 ++++++++++----------
.../druid/indexing/kafka/KafkaIndexTaskClient.java | 37 +-
.../indexing/kafka/supervisor/KafkaSupervisor.java | 3 +-
.../io/druid/indexing/kafka/KafkaIOConfigTest.java | 14 +-
.../indexing/kafka/KafkaIndexTaskClientTest.java | 93 +-
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 131 +--
.../kafka/supervisor/KafkaSupervisorTest.java | 9 +-
.../java/io/druid/indexing/common/task/Task.java | 1 +
.../realtime/appenderator/SegmentsAndMetadata.java | 3 +-
.../appenderator/StreamAppenderatorDriver.java | 3 +-
11 files changed, 590 insertions(+), 783 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
index d9a7fb2..5d48fe1 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
@@ -31,7 +31,6 @@ import java.util.Map;
public class KafkaIOConfig implements IOConfig
{
private static final boolean DEFAULT_USE_TRANSACTION = true;
- private static final boolean DEFAULT_PAUSE_AFTER_READ = false;
private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false;
private final String baseSequenceName;
@@ -39,7 +38,6 @@ public class KafkaIOConfig implements IOConfig
private final KafkaPartitions endPartitions;
private final Map<String, String> consumerProperties;
private final boolean useTransaction;
- private final boolean pauseAfterRead;
private final Optional<DateTime> minimumMessageTime;
private final Optional<DateTime> maximumMessageTime;
private final boolean skipOffsetGaps;
@@ -51,7 +49,6 @@ public class KafkaIOConfig implements IOConfig
@JsonProperty("endPartitions") KafkaPartitions endPartitions,
@JsonProperty("consumerProperties") Map<String, String> consumerProperties,
@JsonProperty("useTransaction") Boolean useTransaction,
- @JsonProperty("pauseAfterRead") Boolean pauseAfterRead,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps
@@ -62,7 +59,6 @@ public class KafkaIOConfig implements IOConfig
this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions");
this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION;
- this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ;
this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
this.maximumMessageTime = Optional.fromNullable(maximumMessageTime);
this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : DEFAULT_SKIP_OFFSET_GAPS;
@@ -118,12 +114,6 @@ public class KafkaIOConfig implements IOConfig
}
@JsonProperty
- public boolean isPauseAfterRead()
- {
- return pauseAfterRead;
- }
-
- @JsonProperty
public Optional<DateTime> getMaximumMessageTime()
{
return maximumMessageTime;
@@ -150,7 +140,6 @@ public class KafkaIOConfig implements IOConfig
", endPartitions=" + endPartitions +
", consumerProperties=" + consumerProperties +
", useTransaction=" + useTransaction +
- ", pauseAfterRead=" + pauseAfterRead +
", minimumMessageTime=" + minimumMessageTime +
", maximumMessageTime=" + maximumMessageTime +
", skipOffsetGaps=" + skipOffsetGaps +
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 62aca36..16a67ec 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -33,7 +33,6 @@ import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -42,8 +41,7 @@ import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
@@ -68,7 +66,6 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.collect.Utils;
-import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.java.util.emitter.EmittingLogger;
@@ -127,23 +124,21 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -179,7 +174,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private final AuthorizerMapper authorizerMapper;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
- private final Map<Integer, Long> endOffsets = new ConcurrentHashMap<>();
+ private final Map<Integer, Long> endOffsets;
private final Map<Integer, Long> nextOffsets = new ConcurrentHashMap<>();
private final Map<Integer, Long> maxEndOffsets = new HashMap<>();
private final Map<Integer, Long> lastPersistedOffsets = new ConcurrentHashMap<>();
@@ -192,7 +187,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private volatile DateTime startTime;
private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread)
private volatile Thread runThread = null;
- private volatile File sequencesPersistFile = null;
private final AtomicBoolean stopRequested = new AtomicBoolean(false);
private final AtomicBoolean publishOnStop = new AtomicBoolean(false);
@@ -232,20 +226,17 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private final Object statusLock = new Object();
private volatile boolean pauseRequested = false;
- private volatile long pauseMillis = 0;
// This value can be tuned in some tests
private long pollRetryMs = 30000;
private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
- private final BlockingQueue<SequenceMetadata> publishQueue = new LinkedBlockingQueue<>();
- private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = new CopyOnWriteArrayList<>(); // to prevent concurrency visibility issue
- private final CountDownLatch waitForPublishes = new CountDownLatch(1);
- private final AtomicReference<Throwable> throwableAtomicReference = new AtomicReference<>();
+ private final List<ListenableFuture<SegmentsAndMetadata>> publishWaitList = new LinkedList<>();
+ private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = new LinkedList<>();
private final String topic;
private volatile CopyOnWriteArrayList<SequenceMetadata> sequences;
- private ListeningExecutorService publishExecService;
+ private volatile Throwable backgroundThreadException;
private final boolean useLegacy;
@JsonCreator
@@ -274,7 +265,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
this.authorizerMapper = authorizerMapper;
- this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap());
+ this.endOffsets = new ConcurrentHashMap<>(ioConfig.getEndPartitions().getPartitionOffsetMap());
this.maxEndOffsets.putAll(endOffsets.entrySet()
.stream()
.collect(Collectors.toMap(
@@ -343,75 +334,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
return ioConfig;
}
- private void createAndStartPublishExecutor()
- {
- publishExecService = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-driver"));
- publishExecService.submit(
- (Runnable) () -> {
- while (true) {
- try {
- final SequenceMetadata sequenceMetadata = publishQueue.take();
-
- Preconditions.checkNotNull(driver);
-
- if (sequenceMetadata.isSentinel()) {
- waitForPublishes.countDown();
- break;
- }
-
- log.info("Publishing segments for sequence [%s]", sequenceMetadata);
-
- final SegmentsAndMetadata result = driver.publish(
- sequenceMetadata.getPublisher(toolbox, ioConfig.isUseTransaction()),
- sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(),
- ImmutableList.of(sequenceMetadata.getSequenceName())
- ).get();
-
- if (result == null) {
- throw new ISE(
- "Transaction failure publishing segments for sequence [%s]",
- sequenceMetadata
- );
- } else {
- log.info(
- "Published segments[%s] with metadata[%s].",
- Joiner.on(", ").join(
- result.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList())
- ),
- Preconditions.checkNotNull(result.getCommitMetadata(), "commitMetadata")
- );
- }
-
- sequences.remove(sequenceMetadata);
- publishingSequences.remove(sequenceMetadata.getSequenceName());
- try {
- persistSequences();
- }
- catch (IOException e) {
- log.error(e, "Unable to persist state, dying");
- Throwables.propagate(e);
- }
-
- final ListenableFuture<SegmentsAndMetadata> handOffFuture = driver.registerHandoff(result);
- handOffWaitList.add(handOffFuture);
- }
- catch (Throwable t) {
- if ((t instanceof InterruptedException || (t instanceof RejectedExecutionException
- && t.getCause() instanceof InterruptedException))) {
- log.warn("Stopping publish thread as we are interrupted, probably we are shutting down");
- } else {
- log.makeAlert(t, "Error in publish thread, dying").emit();
- throwableAtomicReference.set(t);
- }
- Futures.allAsList(handOffWaitList).cancel(true);
- waitForPublishes.countDown();
- break;
- }
- }
- }
- );
- }
-
@Override
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
@@ -426,47 +348,40 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
status = Status.STARTING;
this.toolbox = toolbox;
- if (getContext() != null && getContext().get("checkpoints") != null) {
- log.info("Got checkpoints [%s]", (String) getContext().get("checkpoints"));
- final TreeMap<Integer, Map<Integer, Long>> checkpoints = toolbox.getObjectMapper().readValue(
- (String) getContext().get("checkpoints"),
- new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
- {
- }
- );
-
- Iterator<Map.Entry<Integer, Map<Integer, Long>>> sequenceOffsets = checkpoints.entrySet().iterator();
- Map.Entry<Integer, Map<Integer, Long>> previous = sequenceOffsets.next();
- while (sequenceOffsets.hasNext()) {
- Map.Entry<Integer, Map<Integer, Long>> current = sequenceOffsets.next();
+ if (!restoreSequences()) {
+ final TreeMap<Integer, Map<Integer, Long>> checkpoints = getCheckPointsFromContext(toolbox, this);
+ if (checkpoints != null) {
+ Iterator<Map.Entry<Integer, Map<Integer, Long>>> sequenceOffsets = checkpoints.entrySet().iterator();
+ Map.Entry<Integer, Map<Integer, Long>> previous = sequenceOffsets.next();
+ while (sequenceOffsets.hasNext()) {
+ Map.Entry<Integer, Map<Integer, Long>> current = sequenceOffsets.next();
+ sequences.add(new SequenceMetadata(
+ previous.getKey(),
+ StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
+ previous.getValue(),
+ current.getValue(),
+ true
+ ));
+ previous = current;
+ }
sequences.add(new SequenceMetadata(
previous.getKey(),
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
previous.getValue(),
- current.getValue(),
- true
+ maxEndOffsets,
+ false
+ ));
+ } else {
+ sequences.add(new SequenceMetadata(
+ 0,
+ StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
+ ioConfig.getStartPartitions().getPartitionOffsetMap(),
+ maxEndOffsets,
+ false
));
- previous = current;
}
- sequences.add(new SequenceMetadata(
- previous.getKey(),
- StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
- previous.getValue(),
- maxEndOffsets,
- false
- ));
- } else {
- sequences.add(new SequenceMetadata(
- 0,
- StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
- ioConfig.getStartPartitions().getPartitionOffsetMap(),
- maxEndOffsets,
- false
- ));
- }
- sequencesPersistFile = new File(toolbox.getPersistDir(), "sequences.json");
- restoreSequences();
+ }
log.info("Starting with sequences: %s", sequences);
if (chatHandlerProvider.isPresent()) {
@@ -504,15 +419,12 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
)
);
- try (
- final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
- ) {
+ try (final KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
toolbox.getDataSegmentServerAnnouncer().announce();
toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
appenderator = newAppenderator(fireDepartmentMetrics, toolbox);
driver = newDriver(appenderator, toolbox, fireDepartmentMetrics);
- createAndStartPublishExecutor();
final String topic = ioConfig.getStartPartitions().getTopic();
@@ -602,7 +514,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
status = Status.READING;
try {
while (stillReading) {
- if (possiblyPause(assignment)) {
+ if (possiblyPause()) {
// The partition assignments may have changed while paused by a call to setEndOffsets() so reassign
// partitions upon resuming. This is safe even if the end offsets have not been modified.
assignment = assignPartitionsAndSeekToNext(consumer, topic);
@@ -615,8 +527,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
// if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true
- if (stopRequested.get() || (sequences.get(sequences.size() - 1).isCheckpointed()
- && !ioConfig.isPauseAfterRead())) {
+ if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) {
status = Status.PUBLISHING;
}
@@ -624,11 +535,12 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
break;
}
- checkAndMaybeThrowException();
-
- if (!ioConfig.isPauseAfterRead()) {
- maybePersistAndPublishSequences(committerSupplier);
+ if (backgroundThreadException != null) {
+ throw new RuntimeException(backgroundThreadException);
}
+ checkPublishAndHandoffFailure();
+
+ maybePersistAndPublishSequences(committerSupplier);
// The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to
// offset is not present in the topic-partition. This can happen if we're asking a task to read from data
@@ -640,19 +552,17 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox);
- stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
+ stillReading = !assignment.isEmpty();
}
SequenceMetadata sequenceToCheckpoint = null;
for (ConsumerRecord<byte[], byte[]> record : records) {
- if (log.isTraceEnabled()) {
- log.trace(
- "Got topic[%s] partition[%d] offset[%,d].",
- record.topic(),
- record.partition(),
- record.offset()
- );
- }
+ log.trace(
+ "Got topic[%s] partition[%d] offset[%,d].",
+ record.topic(),
+ record.partition(),
+ record.offset()
+ );
if (record.offset() < endOffsets.get(record.partition())) {
if (record.offset() != nextOffsets.get(record.partition())) {
@@ -680,24 +590,23 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
: parser.parseBatch(ByteBuffer.wrap(valueBytes));
boolean isPersistRequired = false;
- for (InputRow row : rows) {
- if (row != null && withinMinMaxRecordTime(row)) {
- SequenceMetadata sequenceToUse = null;
- for (SequenceMetadata sequence : sequences) {
- if (sequence.canHandle(record)) {
- sequenceToUse = sequence;
- }
- }
+ final SequenceMetadata sequenceToUse = sequences
+ .stream()
+ .filter(sequenceMetadata -> sequenceMetadata.canHandle(record))
+ .findFirst()
+ .orElse(null);
- if (sequenceToUse == null) {
- throw new ISE(
- "WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s",
- record.partition(),
- record.offset(),
- sequences
- );
- }
+ if (sequenceToUse == null) {
+ throw new ISE(
+ "WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s",
+ record.partition(),
+ record.offset(),
+ sequences
+ );
+ }
+ for (InputRow row : rows) {
+ if (row != null && withinMinMaxRecordTime(row)) {
final AppenderatorDriverAddResult addResult = driver.add(
row,
sequenceToUse.getSequenceName(),
@@ -751,7 +660,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
public void onFailure(Throwable t)
{
log.error("Persist failed, dying");
- throwableAtomicReference.set(t);
+ backgroundThreadException = t;
}
}
);
@@ -779,11 +688,11 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
&& assignment.remove(record.partition())) {
log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
assignPartitions(consumer, topic, assignment);
- stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
+ stillReading = !assignment.isEmpty();
}
}
- if (sequenceToCheckpoint != null && !ioConfig.isPauseAfterRead()) {
+ if (sequenceToCheckpoint != null && stillReading) {
Preconditions.checkArgument(
sequences.get(sequences.size() - 1)
.getSequenceName()
@@ -792,7 +701,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
sequenceToCheckpoint,
sequences
);
- requestPause(PAUSE_FOREVER);
+ requestPause();
if (!toolbox.getTaskActionClient().submit(new CheckPointDataSourceMetadataAction(
getDataSource(),
ioConfig.getBaseSequenceName(),
@@ -804,6 +713,11 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
}
}
+ catch (Exception e) {
+ // (1) catch all exceptions while reading from kafka
+ log.error(e, "Encountered exception in run() before persisting.");
+ throw e;
+ }
finally {
log.info("Persisting all pending data");
driver.persist(committerSupplier.get()); // persist pending data
@@ -824,16 +738,23 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
sequenceMetadata.updateAssignments(nextOffsets);
publishingSequences.add(sequenceMetadata.getSequenceName());
// persist already done in finally, so directly add to publishQueue
- publishQueue.add(sequenceMetadata);
+ publishAndRegisterHandoff(sequenceMetadata);
}
}
- // add Sentinel SequenceMetadata to indicate end of all sequences
- publishQueue.add(SequenceMetadata.getSentinelSequenceMetadata());
- waitForPublishes.await();
- checkAndMaybeThrowException();
+ if (backgroundThreadException != null) {
+ throw new RuntimeException(backgroundThreadException);
+ }
+
+ // Wait for publish futures to complete.
+ Futures.allAsList(publishWaitList).get();
- List<SegmentsAndMetadata> handedOffList = Lists.newArrayList();
+ // Wait for handoff futures to complete.
+ // Note that every publishing task (created by calling AppenderatorDriver.publish()) has a corresponding
+ // handoffFuture. handoffFuture can throw an exception if 1) the corresponding publishFuture failed or 2) it
+ // failed to persist sequences. It might also return null if handoff failed, but was recoverable.
+ // See publishAndRegisterHandoff() for details.
+ List<SegmentsAndMetadata> handedOffList = Collections.emptyList();
if (tuningConfig.getHandoffConditionTimeout() == 0) {
handedOffList = Futures.allAsList(handOffWaitList).get();
} else {
@@ -842,6 +763,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
}
catch (TimeoutException e) {
+ // Handoff timeout is not an indexing failure, but coordination failure. We simply ignore timeout exception
+ // here.
log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout())
.addData("TaskId", this.getId())
.emit();
@@ -861,8 +784,14 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
);
}
}
+
+ appenderator.close();
}
catch (InterruptedException | RejectedExecutionException e) {
+ // (2) catch InterruptedException and RejectedExecutionException thrown for the whole ingestion steps including
+ // the final publishing.
+ Futures.allAsList(publishWaitList).cancel(true);
+ Futures.allAsList(handOffWaitList).cancel(true);
appenderator.closeNow();
// handle the InterruptedException that gets wrapped in a RejectedExecutionException
if (e instanceof RejectedExecutionException
@@ -878,14 +807,14 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
log.info("The task was asked to stop before completing");
}
+ catch (Exception e) {
+ // (3) catch all other exceptions thrown for the whole ingestion steps including the final publishing.
+ Futures.allAsList(publishWaitList).cancel(true);
+ Futures.allAsList(handOffWaitList).cancel(true);
+ appenderator.closeNow();
+ throw e;
+ }
finally {
- if (appenderator != null) {
- if (throwableAtomicReference.get() != null) {
- appenderator.closeNow();
- } else {
- appenderator.close();
- }
- }
if (driver != null) {
driver.close();
}
@@ -893,10 +822,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
chatHandlerProvider.get().unregister(getId());
}
- if (publishExecService != null) {
- publishExecService.shutdownNow();
- }
-
toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
toolbox.getDataSegmentServerAnnouncer().unannounce();
}
@@ -947,9 +872,9 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
);
try (
- final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox);
- final StreamAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics);
- final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
+ final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox);
+ final StreamAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics);
+ final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
) {
toolbox.getDataSegmentServerAnnouncer().announce();
toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
@@ -1032,7 +957,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
status = Status.READING;
try {
while (stillReading) {
- if (possiblyPause(assignment)) {
+ if (possiblyPause()) {
// The partition assignments may have changed while paused by a call to setEndOffsets() so reassign
// partitions upon resuming. This is safe even if the end offsets have not been modified.
assignment = assignPartitionsAndSeekToNext(consumer, topic);
@@ -1058,7 +983,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox);
- stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
+ stillReading = !assignment.isEmpty();
}
for (ConsumerRecord<byte[], byte[]> record : records) {
@@ -1158,7 +1083,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
&& assignment.remove(record.partition())) {
log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
assignPartitions(consumer, topic, assignment);
- stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
+ stillReading = !assignment.isEmpty();
}
}
}
@@ -1211,32 +1136,37 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
sequenceNames.values()
).get();
+ final List<String> publishedSegments = published.getSegments()
+ .stream()
+ .map(DataSegment::getIdentifier)
+ .collect(Collectors.toList());
+ log.info(
+ "Published segments[%s] with metadata[%s].",
+ publishedSegments,
+ Preconditions.checkNotNull(published.getCommitMetadata(), "commitMetadata")
+ );
+
final Future<SegmentsAndMetadata> handoffFuture = driver.registerHandoff(published);
- final SegmentsAndMetadata handedOff;
+ SegmentsAndMetadata handedOff = null;
if (tuningConfig.getHandoffConditionTimeout() == 0) {
handedOff = handoffFuture.get();
} else {
- handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
+ try {
+ handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
+ }
+ catch (TimeoutException e) {
+ log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout())
+ .addData("TaskId", getId())
+ .emit();
+ }
}
if (handedOff == null) {
- throw new ISE("Transaction failure publishing segments, aborting");
+ log.warn("Failed to handoff segments[%s]", publishedSegments);
} else {
log.info(
- "Published segments[%s] with metadata[%s].",
- Joiner.on(", ").join(
- Iterables.transform(
- handedOff.getSegments(),
- new Function<DataSegment, String>()
- {
- @Override
- public String apply(DataSegment input)
- {
- return input.getIdentifier();
- }
- }
- )
- ),
+ "Handoff completed for segments[%s] with metadata[%s]",
+ handedOff.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList()),
Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata")
);
}
@@ -1268,13 +1198,156 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
return success();
}
- private void checkAndMaybeThrowException()
+ private void checkPublishAndHandoffFailure() throws ExecutionException, InterruptedException
+ {
+ // Check if any publishFuture failed.
+ final List<ListenableFuture<SegmentsAndMetadata>> publishFinished = publishWaitList
+ .stream()
+ .filter(Future::isDone)
+ .collect(Collectors.toList());
+
+ for (ListenableFuture<SegmentsAndMetadata> publishFuture : publishFinished) {
+ // If publishFuture failed, the below line will throw an exception and catched by (1), and then (2) or (3).
+ publishFuture.get();
+ }
+
+ publishWaitList.removeAll(publishFinished);
+
+ // Check if any handoffFuture failed.
+ final List<ListenableFuture<SegmentsAndMetadata>> handoffFinished = handOffWaitList
+ .stream()
+ .filter(Future::isDone)
+ .collect(Collectors.toList());
+
+ for (ListenableFuture<SegmentsAndMetadata> handoffFuture : handoffFinished) {
+ // If handoffFuture failed, the below line will throw an exception and catched by (1), and then (2) or (3).
+ handoffFuture.get();
+ }
+
+ handOffWaitList.removeAll(handoffFinished);
+ }
+
+ private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata)
+ {
+ log.info("Publishing segments for sequence [%s]", sequenceMetadata);
+
+ final ListenableFuture<SegmentsAndMetadata> publishFuture = Futures.transform(
+ driver.publish(
+ sequenceMetadata.createPublisher(toolbox, ioConfig.isUseTransaction()),
+ sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(),
+ Collections.singletonList(sequenceMetadata.getSequenceName())
+ ),
+ (Function<SegmentsAndMetadata, SegmentsAndMetadata>) publishedSegmentsAndMetadata -> {
+ if (publishedSegmentsAndMetadata == null) {
+ throw new ISE(
+ "Transaction failure publishing segments for sequence [%s]",
+ sequenceMetadata
+ );
+ } else {
+ return publishedSegmentsAndMetadata;
+ }
+ }
+ );
+ publishWaitList.add(publishFuture);
+
+ // Create a handoffFuture for every publishFuture. The created handoffFuture must fail if publishFuture fails.
+ final SettableFuture<SegmentsAndMetadata> handoffFuture = SettableFuture.create();
+ handOffWaitList.add(handoffFuture);
+
+ Futures.addCallback(
+ publishFuture,
+ new FutureCallback<SegmentsAndMetadata>()
+ {
+ @Override
+ public void onSuccess(SegmentsAndMetadata publishedSegmentsAndMetadata)
+ {
+ log.info(
+ "Published segments[%s] with metadata[%s].",
+ publishedSegmentsAndMetadata.getSegments()
+ .stream()
+ .map(DataSegment::getIdentifier)
+ .collect(Collectors.toList()),
+ Preconditions.checkNotNull(publishedSegmentsAndMetadata.getCommitMetadata(), "commitMetadata")
+ );
+
+ sequences.remove(sequenceMetadata);
+ publishingSequences.remove(sequenceMetadata.getSequenceName());
+ try {
+ persistSequences();
+ }
+ catch (IOException e) {
+ log.error(e, "Unable to persist state, dying");
+ handoffFuture.setException(e);
+ throw new RuntimeException(e);
+ }
+
+ Futures.transform(
+ driver.registerHandoff(publishedSegmentsAndMetadata),
+ new Function<SegmentsAndMetadata, Void>()
+ {
+ @Nullable
+ @Override
+ public Void apply(@Nullable SegmentsAndMetadata handoffSegmentsAndMetadata)
+ {
+ if (handoffSegmentsAndMetadata == null) {
+ log.warn(
+ "Failed to handoff segments[%s]",
+ publishedSegmentsAndMetadata.getSegments()
+ .stream()
+ .map(DataSegment::getIdentifier)
+ .collect(Collectors.toList())
+ );
+ }
+ handoffFuture.set(handoffSegmentsAndMetadata);
+ return null;
+ }
+ }
+ );
+ }
+
+ @Override
+ public void onFailure(Throwable t)
+ {
+ log.error(t, "Error while publishing segments for sequence[%s]", sequenceMetadata);
+ handoffFuture.setException(t);
+ }
+ }
+ );
+ }
+
+ private static File getSequencesPersistFile(TaskToolbox toolbox)
+ {
+ return new File(toolbox.getPersistDir(), "sequences.json");
+ }
+
+ private boolean restoreSequences() throws IOException
{
- if (throwableAtomicReference.get() != null) {
- Throwables.propagate(throwableAtomicReference.get());
+ final File sequencesPersistFile = getSequencesPersistFile(toolbox);
+ if (sequencesPersistFile.exists()) {
+ sequences = new CopyOnWriteArrayList<>(
+ toolbox.getObjectMapper().<List<SequenceMetadata>>readValue(
+ sequencesPersistFile,
+ new TypeReference<List<SequenceMetadata>>()
+ {
+ }
+ )
+ );
+ return true;
+ } else {
+ return false;
}
}
+ private synchronized void persistSequences() throws IOException
+ {
+ log.info("Persisting Sequences Metadata [%s]", sequences);
+ toolbox.getObjectMapper().writerWithType(
+ new TypeReference<List<SequenceMetadata>>()
+ {
+ }
+ ).writeValue(getSequencesPersistFile(toolbox), sequences);
+ }
+
private void maybePersistAndPublishSequences(Supplier<Committer> committerSupplier)
throws InterruptedException
{
@@ -1289,7 +1362,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
result,
sequenceMetadata
);
- publishQueue.add(sequenceMetadata);
+ publishAndRegisterHandoff(sequenceMetadata);
}
catch (InterruptedException e) {
log.warn("Interrupted while persisting sequence [%s]", sequenceMetadata);
@@ -1299,73 +1372,194 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
}
- private void restoreSequences() throws IOException
+ private Set<Integer> assignPartitionsAndSeekToNext(KafkaConsumer consumer, String topic)
{
- Preconditions.checkNotNull(sequencesPersistFile);
- if (sequencesPersistFile.exists()) {
- sequences = new CopyOnWriteArrayList<>(toolbox.getObjectMapper().<List<SequenceMetadata>>readValue(
- sequencesPersistFile, new TypeReference<List<SequenceMetadata>>()
- {
- }));
+ // Initialize consumer assignment.
+ final Set<Integer> assignment = Sets.newHashSet();
+ for (Map.Entry<Integer, Long> entry : nextOffsets.entrySet()) {
+ final long endOffset = endOffsets.get(entry.getKey());
+ if (entry.getValue() < endOffset) {
+ assignment.add(entry.getKey());
+ } else if (entry.getValue() == endOffset) {
+ log.info("Finished reading partition[%d].", entry.getKey());
+ } else {
+ throw new ISE(
+ "WTF?! Cannot start from offset[%,d] > endOffset[%,d]",
+ entry.getValue(),
+ endOffset
+ );
+ }
}
- }
- private synchronized void persistSequences() throws IOException
- {
- log.info("Persisting Sequences Metadata [%s]", sequences);
- toolbox.getObjectMapper().writerWithType(
- new TypeReference<List<SequenceMetadata>>()
- {
- }
- ).writeValue(sequencesPersistFile, sequences);
- }
+ KafkaIndexTask.assignPartitions(consumer, topic, assignment);
- @Override
- public boolean canRestore()
- {
- return true;
+ // Seek to starting offsets.
+ for (final int partition : assignment) {
+ final long offset = nextOffsets.get(partition);
+ log.info("Seeking partition[%d] to offset[%,d].", partition, offset);
+ consumer.seek(new TopicPartition(topic, partition), offset);
+ }
+
+ return assignment;
}
/**
- * Authorizes action to be performed on this task's datasource
+ * Checks if the pauseRequested flag was set and if so blocks until pauseRequested is cleared.
+ * <p/>
+ * Sets paused = true and signals paused so callers can be notified when the pause command has been accepted.
+ * <p/>
*
- * @return authorization result
+ * @return true if a pause request was handled, false otherwise
*/
- private Access authorizationCheck(final HttpServletRequest req, Action action)
+ private boolean possiblyPause() throws InterruptedException
{
- ResourceAction resourceAction = new ResourceAction(
- new Resource(dataSchema.getDataSource(), ResourceType.DATASOURCE),
- action
- );
+ pauseLock.lockInterruptibly();
+ try {
+ if (pauseRequested) {
+ status = Status.PAUSED;
+ hasPaused.signalAll();
- Access access = AuthorizationUtils.authorizeResourceAction(req, resourceAction, authorizerMapper);
- if (!access.isAllowed()) {
- throw new ForbiddenException(access.toString());
- }
+ while (pauseRequested) {
+ log.info("Pausing ingestion until resumed");
+ shouldResume.await();
+ }
- return access;
- }
+ status = Status.READING;
+ shouldResume.signalAll();
+ log.info("Ingestion loop resumed");
+ return true;
+ }
+ }
+ finally {
+ pauseLock.unlock();
+ }
- @VisibleForTesting
- Appenderator getAppenderator()
- {
- return appenderator;
+ return false;
}
- @Override
- public void stopGracefully()
+ private void possiblyResetOffsetsOrWait(
+ Map<TopicPartition, Long> outOfRangePartitions,
+ KafkaConsumer<byte[], byte[]> consumer,
+ TaskToolbox taskToolbox
+ ) throws InterruptedException, IOException
{
- log.info("Stopping gracefully (status: [%s])", status);
- stopRequested.set(true);
-
- synchronized (statusLock) {
- if (status == Status.PUBLISHING) {
- runThread.interrupt();
- return;
+ final Map<TopicPartition, Long> resetPartitions = Maps.newHashMap();
+ boolean doReset = false;
+ if (tuningConfig.isResetOffsetAutomatically()) {
+ for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
+ final TopicPartition topicPartition = outOfRangePartition.getKey();
+ final long nextOffset = outOfRangePartition.getValue();
+ // seek to the beginning to get the least available offset
+ consumer.seekToBeginning(Collections.singletonList(topicPartition));
+ final long leastAvailableOffset = consumer.position(topicPartition);
+ // reset the seek
+ consumer.seek(topicPartition, nextOffset);
+ // Reset consumer offset if resetOffsetAutomatically is set to true
+ // and the current message offset in the kafka partition is more than the
+ // next message offset that we are trying to fetch
+ if (leastAvailableOffset > nextOffset) {
+ doReset = true;
+ resetPartitions.put(topicPartition, nextOffset);
+ }
}
}
- try {
+ if (doReset) {
+ sendResetRequestAndWait(resetPartitions, taskToolbox);
+ } else {
+ log.warn("Retrying in %dms", pollRetryMs);
+ pollRetryLock.lockInterruptibly();
+ try {
+ long nanos = TimeUnit.MILLISECONDS.toNanos(pollRetryMs);
+ while (nanos > 0L && !pauseRequested && !stopRequested.get()) {
+ nanos = isAwaitingRetry.awaitNanos(nanos);
+ }
+ }
+ finally {
+ pollRetryLock.unlock();
+ }
+ }
+ }
+
+ private void requestPause()
+ {
+ pauseRequested = true;
+ }
+
+ private void sendResetRequestAndWait(Map<TopicPartition, Long> outOfRangePartitions, TaskToolbox taskToolbox)
+ throws IOException
+ {
+ Map<Integer, Long> partitionOffsetMap = Maps.newHashMap();
+ for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
+ partitionOffsetMap.put(outOfRangePartition.getKey().partition(), outOfRangePartition.getValue());
+ }
+ boolean result = taskToolbox.getTaskActionClient()
+ .submit(new ResetDataSourceMetadataAction(
+ getDataSource(),
+ new KafkaDataSourceMetadata(new KafkaPartitions(
+ ioConfig.getStartPartitions()
+ .getTopic(),
+ partitionOffsetMap
+ ))
+ ));
+
+ if (result) {
+ log.makeAlert("Resetting Kafka offsets for datasource [%s]", getDataSource())
+ .addData("partitions", partitionOffsetMap.keySet())
+ .emit();
+ // wait for being killed by supervisor
+ requestPause();
+ } else {
+ log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit();
+ }
+ }
+
+ @Override
+ public boolean canRestore()
+ {
+ return true;
+ }
+
+ /**
+ * Authorizes action to be performed on this task's datasource
+ *
+ * @return authorization result
+ */
+ private Access authorizationCheck(final HttpServletRequest req, Action action)
+ {
+ ResourceAction resourceAction = new ResourceAction(
+ new Resource(dataSchema.getDataSource(), ResourceType.DATASOURCE),
+ action
+ );
+
+ Access access = AuthorizationUtils.authorizeResourceAction(req, resourceAction, authorizerMapper);
+ if (!access.isAllowed()) {
+ throw new ForbiddenException(access.toString());
+ }
+
+ return access;
+ }
+
+ @VisibleForTesting
+ Appenderator getAppenderator()
+ {
+ return appenderator;
+ }
+
+ @Override
+ public void stopGracefully()
+ {
+ log.info("Stopping gracefully (status: [%s])", status);
+ stopRequested.set(true);
+
+ synchronized (statusLock) {
+ if (status == Status.PUBLISHING) {
+ runThread.interrupt();
+ return;
+ }
+ }
+
+ try {
if (pauseLock.tryLock(LOCK_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
if (pauseRequested) {
@@ -1474,25 +1668,23 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
@Produces(MediaType.APPLICATION_JSON)
public Response setEndOffsetsHTTP(
Map<Integer, Long> offsets,
- @QueryParam("resume") @DefaultValue("false") final boolean resume,
@QueryParam("finish") @DefaultValue("true") final boolean finish,
// this field is only for internal purposes, shouldn't be usually set by users
@Context final HttpServletRequest req
) throws InterruptedException
{
authorizationCheck(req, Action.WRITE);
- return setEndOffsets(offsets, resume, finish);
+ return setEndOffsets(offsets, finish);
}
public Response setEndOffsets(
Map<Integer, Long> offsets,
- final boolean resume,
final boolean finish // this field is only for internal purposes, shouldn't be usually set by users
) throws InterruptedException
{
// for backwards compatibility, should be removed from versions greater than 0.12.x
if (useLegacy) {
- return setEndOffsetsLegacy(offsets, resume);
+ return setEndOffsetsLegacy(offsets);
}
if (offsets == null) {
@@ -1520,7 +1712,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
(latestSequence.getEndOffsets().equals(offsets) && finish)) {
log.warn("Ignoring duplicate request, end offsets already set for sequences [%s]", sequences);
return Response.ok(offsets).build();
- } else if (latestSequence.isCheckpointed() && !ioConfig.isPauseAfterRead()) {
+ } else if (latestSequence.isCheckpointed()) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(StringUtils.format(
"WTH?! Sequence [%s] has already endOffsets set, cannot set to [%s]",
@@ -1553,13 +1745,12 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
log.info("Updating endOffsets from [%s] to [%s]", endOffsets, offsets);
endOffsets.putAll(offsets);
} else {
- Preconditions.checkState(!ioConfig.isPauseAfterRead());
// create new sequence
final SequenceMetadata newSequence = new SequenceMetadata(
latestSequence.getSequenceId() + 1,
StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1),
offsets,
- maxEndOffsets,
+ endOffsets,
false
);
sequences.add(newSequence);
@@ -1569,77 +1760,23 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
catch (Exception e) {
log.error(e, "Unable to set end offsets, dying");
- throwableAtomicReference.set(e);
- Throwables.propagate(e);
+ backgroundThreadException = e;
+ // should resume to immediately finish kafka index task as failed
+ resume();
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(Throwables.getStackTraceAsString(e))
+ .build();
}
finally {
pauseLock.unlock();
}
}
- if (resume) {
- resume();
- }
+ resume();
return Response.ok(offsets).build();
}
- private Response setEndOffsetsLegacy(
- Map<Integer, Long> offsets,
- final boolean resume
- ) throws InterruptedException
- {
- if (offsets == null) {
- return Response.status(Response.Status.BAD_REQUEST)
- .entity("Request body must contain a map of { partition:endOffset }")
- .build();
- } else if (!endOffsets.keySet().containsAll(offsets.keySet())) {
- return Response.status(Response.Status.BAD_REQUEST)
- .entity(
- StringUtils.format(
- "Request contains partitions not being handled by this task, my partitions: %s",
- endOffsets.keySet()
- )
- )
- .build();
- }
-
- pauseLock.lockInterruptibly();
- try {
- if (!isPaused()) {
- return Response.status(Response.Status.BAD_REQUEST)
- .entity("Task must be paused before changing the end offsets")
- .build();
- }
-
- for (Map.Entry<Integer, Long> entry : offsets.entrySet()) {
- if (entry.getValue().compareTo(nextOffsets.get(entry.getKey())) < 0) {
- return Response.status(Response.Status.BAD_REQUEST)
- .entity(
- StringUtils.format(
- "End offset must be >= current offset for partition [%s] (current: %s)",
- entry.getKey(),
- nextOffsets.get(entry.getKey())
- )
- )
- .build();
- }
- }
-
- endOffsets.putAll(offsets);
- log.info("endOffsets changed to %s", endOffsets);
- }
- finally {
- pauseLock.unlock();
- }
-
- if (resume) {
- resume();
- }
-
- return Response.ok(endOffsets).build();
- }
-
@GET
@Path("/checkpoints")
@Produces(MediaType.APPLICATION_JSON)
@@ -1649,7 +1786,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
return getCheckpoints();
}
- public Map<Integer, Map<Integer, Long>> getCheckpoints()
+ private Map<Integer, Map<Integer, Long>> getCheckpoints()
{
TreeMap<Integer, Map<Integer, Long>> result = new TreeMap<>();
result.putAll(
@@ -1661,8 +1798,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
/**
* Signals the ingestion loop to pause.
*
- * @param timeout how long to pause for before resuming in milliseconds, <= 0 means indefinitely
- *
* @return one of the following Responses: 400 Bad Request if the task has started publishing; 202 Accepted if the
* method has timed out and returned before the task has paused; 200 OK with a map of the current partition offsets
* in the response body if the task successfully paused
@@ -1671,15 +1806,14 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
@Path("/pause")
@Produces(MediaType.APPLICATION_JSON)
public Response pauseHTTP(
- @QueryParam("timeout") @DefaultValue("0") final long timeout,
@Context final HttpServletRequest req
) throws InterruptedException
{
authorizationCheck(req, Action.WRITE);
- return pause(timeout);
+ return pause();
}
- public Response pause(final long timeout) throws InterruptedException
+ public Response pause() throws InterruptedException
{
if (!(status == Status.PAUSED || status == Status.READING)) {
return Response.status(Response.Status.BAD_REQUEST)
@@ -1689,7 +1823,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
pauseLock.lockInterruptibly();
try {
- pauseMillis = timeout <= 0 ? PAUSE_FOREVER : timeout;
pauseRequested = true;
pollRetryLock.lockInterruptibly();
@@ -1764,6 +1897,79 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
return startTime;
}
+ @Nullable
+ private static TreeMap<Integer, Map<Integer, Long>> getCheckPointsFromContext(
+ TaskToolbox toolbox,
+ KafkaIndexTask task
+ ) throws IOException
+ {
+ final String checkpointsString = task.getContextValue("checkpoints");
+ if (checkpointsString != null) {
+ log.info("Checkpoints [%s]", checkpointsString);
+ return toolbox.getObjectMapper().readValue(
+ checkpointsString,
+ new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
+ {
+ }
+ );
+ } else {
+ return null;
+ }
+ }
+
+ private Response setEndOffsetsLegacy(
+ Map<Integer, Long> offsets
+ ) throws InterruptedException
+ {
+ if (offsets == null) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity("Request body must contain a map of { partition:endOffset }")
+ .build();
+ } else if (!endOffsets.keySet().containsAll(offsets.keySet())) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity(
+ StringUtils.format(
+ "Request contains partitions not being handled by this task, my partitions: %s",
+ endOffsets.keySet()
+ )
+ )
+ .build();
+ }
+
+ pauseLock.lockInterruptibly();
+ try {
+ if (!isPaused()) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity("Task must be paused before changing the end offsets")
+ .build();
+ }
+
+ for (Map.Entry<Integer, Long> entry : offsets.entrySet()) {
+ if (entry.getValue().compareTo(nextOffsets.get(entry.getKey())) < 0) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity(
+ StringUtils.format(
+ "End offset must be >= current offset for partition [%s] (current: %s)",
+ entry.getKey(),
+ nextOffsets.get(entry.getKey())
+ )
+ )
+ .build();
+ }
+ }
+
+ endOffsets.putAll(offsets);
+ log.info("endOffsets changed to %s", endOffsets);
+ }
+ finally {
+ pauseLock.unlock();
+ }
+
+ resume();
+
+ return Response.ok(endOffsets).build();
+ }
+
@VisibleForTesting
FireDepartmentMetrics getFireDepartmentMetrics()
{
@@ -1775,12 +1981,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
return status == Status.PAUSED;
}
- private void requestPause(long pauseMillis)
- {
- this.pauseMillis = pauseMillis;
- pauseRequested = true;
- }
-
private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox)
{
return Appenderators.createRealtime(
@@ -1854,169 +2054,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
);
}
- private Set<Integer> assignPartitionsAndSeekToNext(KafkaConsumer consumer, String topic)
- {
- // Initialize consumer assignment.
- final Set<Integer> assignment = Sets.newHashSet();
- for (Map.Entry<Integer, Long> entry : nextOffsets.entrySet()) {
- final long endOffset = endOffsets.get(entry.getKey());
- if (entry.getValue() < endOffset) {
- assignment.add(entry.getKey());
- } else if (entry.getValue() == endOffset) {
- log.info("Finished reading partition[%d].", entry.getKey());
- } else {
- throw new ISE(
- "WTF?! Cannot start from offset[%,d] > endOffset[%,d]",
- entry.getValue(),
- endOffset
- );
- }
- }
-
- assignPartitions(consumer, topic, assignment);
-
- // Seek to starting offsets.
- for (final int partition : assignment) {
- final long offset = nextOffsets.get(partition);
- log.info("Seeking partition[%d] to offset[%,d].", partition, offset);
- consumer.seek(new TopicPartition(topic, partition), offset);
- }
-
- return assignment;
- }
-
- /**
- * Checks if the pauseRequested flag was set and if so blocks:
- * a) if pauseMillis == PAUSE_FOREVER, until pauseRequested is cleared
- * b) if pauseMillis != PAUSE_FOREVER, until pauseMillis elapses -or- pauseRequested is cleared
- * <p/>
- * If pauseMillis is changed while paused, the new pause timeout will be applied. This allows adjustment of the
- * pause timeout (making a timed pause into an indefinite pause and vice versa is valid) without having to resume
- * and ensures that the loop continues to stay paused without ingesting any new events. You will need to signal
- * shouldResume after adjusting pauseMillis for the new value to take effect.
- * <p/>
- * Sets paused = true and signals paused so callers can be notified when the pause command has been accepted.
- * <p/>
- * Additionally, pauses if all partitions assignments have been read and pauseAfterRead flag is set.
- *
- * @return true if a pause request was handled, false otherwise
- */
- private boolean possiblyPause(Set<Integer> assignment) throws InterruptedException
- {
- pauseLock.lockInterruptibly();
- try {
- if (ioConfig.isPauseAfterRead() && assignment.isEmpty()) {
- pauseMillis = PAUSE_FOREVER;
- pauseRequested = true;
- }
-
- if (pauseRequested) {
- status = Status.PAUSED;
- long nanos = 0;
- hasPaused.signalAll();
-
- while (pauseRequested) {
- if (pauseMillis == PAUSE_FOREVER) {
- log.info("Pausing ingestion until resumed");
- shouldResume.await();
- } else {
- if (pauseMillis > 0) {
- log.info("Pausing ingestion for [%,d] ms", pauseMillis);
- nanos = TimeUnit.MILLISECONDS.toNanos(pauseMillis);
- pauseMillis = 0;
- }
- if (nanos <= 0L) {
- pauseRequested = false; // timeout elapsed
- }
- nanos = shouldResume.awaitNanos(nanos);
- }
- }
-
- status = Status.READING;
- shouldResume.signalAll();
- log.info("Ingestion loop resumed");
- return true;
- }
- }
- finally {
- pauseLock.unlock();
- }
-
- return false;
- }
-
- private void possiblyResetOffsetsOrWait(
- Map<TopicPartition, Long> outOfRangePartitions,
- KafkaConsumer<byte[], byte[]> consumer,
- TaskToolbox taskToolbox
- ) throws InterruptedException, IOException
- {
- final Map<TopicPartition, Long> resetPartitions = Maps.newHashMap();
- boolean doReset = false;
- if (tuningConfig.isResetOffsetAutomatically()) {
- for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
- final TopicPartition topicPartition = outOfRangePartition.getKey();
- final long nextOffset = outOfRangePartition.getValue();
- // seek to the beginning to get the least available offset
- consumer.seekToBeginning(Collections.singletonList(topicPartition));
- final long leastAvailableOffset = consumer.position(topicPartition);
- // reset the seek
- consumer.seek(topicPartition, nextOffset);
- // Reset consumer offset if resetOffsetAutomatically is set to true
- // and the current message offset in the kafka partition is more than the
- // next message offset that we are trying to fetch
- if (leastAvailableOffset > nextOffset) {
- doReset = true;
- resetPartitions.put(topicPartition, nextOffset);
- }
- }
- }
-
- if (doReset) {
- sendResetRequestAndWait(resetPartitions, taskToolbox);
- } else {
- log.warn("Retrying in %dms", pollRetryMs);
- pollRetryLock.lockInterruptibly();
- try {
- long nanos = TimeUnit.MILLISECONDS.toNanos(pollRetryMs);
- while (nanos > 0L && !pauseRequested && !stopRequested.get()) {
- nanos = isAwaitingRetry.awaitNanos(nanos);
- }
- }
- finally {
- pollRetryLock.unlock();
- }
- }
- }
-
- private void sendResetRequestAndWait(Map<TopicPartition, Long> outOfRangePartitions, TaskToolbox taskToolbox)
- throws IOException
- {
- Map<Integer, Long> partitionOffsetMap = Maps.newHashMap();
- for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
- partitionOffsetMap.put(outOfRangePartition.getKey().partition(), outOfRangePartition.getValue());
- }
- boolean result = taskToolbox.getTaskActionClient()
- .submit(new ResetDataSourceMetadataAction(
- getDataSource(),
- new KafkaDataSourceMetadata(new KafkaPartitions(
- ioConfig.getStartPartitions()
- .getTopic(),
- partitionOffsetMap
- ))
- ));
-
- if (result) {
- log.makeAlert("Resetting Kafka offsets for datasource [%s]", getDataSource())
- .addData("partitions", partitionOffsetMap.keySet())
- .emit();
- // wait for being killed by supervisor
- requestPause(PAUSE_FOREVER);
- } else {
- log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit();
- }
- }
-
private boolean withinMinMaxRecordTime(final InputRow row)
{
final boolean beforeMinimumMessageTime = ioConfig.getMinimumMessageTime().isPresent()
@@ -2142,7 +2179,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
return sentinel;
}
- public void setEndOffsets(Map<Integer, Long> newEndOffsets)
+ void setEndOffsets(Map<Integer, Long> newEndOffsets)
{
lock.lock();
try {
@@ -2154,15 +2191,14 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
}
- public void updateAssignments(Map<Integer, Long> nextPartitionOffset)
+ void updateAssignments(Map<Integer, Long> nextPartitionOffset)
{
lock.lock();
try {
assignments.clear();
- nextPartitionOffset.entrySet().forEach(partitionOffset -> {
- if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey()))
- > 0) {
- assignments.add(partitionOffset.getKey());
+ nextPartitionOffset.forEach((key, value) -> {
+ if (Longs.compare(endOffsets.get(key), nextPartitionOffset.get(key)) > 0) {
+ assignments.add(key);
}
});
}
@@ -2171,7 +2207,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
}
- public boolean isOpen()
+ boolean isOpen()
{
return !assignments.isEmpty();
}
@@ -2180,32 +2216,17 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
{
lock.lock();
try {
+ final Long partitionEndOffset = endOffsets.get(record.partition());
return isOpen()
- && endOffsets.get(record.partition()) != null
+ && partitionEndOffset != null
&& record.offset() >= startOffsets.get(record.partition())
- && record.offset() < endOffsets.get(record.partition());
+ && record.offset() < partitionEndOffset;
}
finally {
lock.unlock();
}
}
- private SequenceMetadata()
- {
- this.sequenceId = -1;
- this.sequenceName = null;
- this.startOffsets = null;
- this.endOffsets = null;
- this.assignments = null;
- this.checkpointed = true;
- this.sentinel = true;
- }
-
- public static SequenceMetadata getSentinelSequenceMetadata()
- {
- return new SequenceMetadata();
- }
-
@Override
public String toString()
{
@@ -2226,8 +2247,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
}
-
- public Supplier<Committer> getCommitterSupplier(String topic, Map<Integer, Long> lastPersistedOffsets)
+ Supplier<Committer> getCommitterSupplier(String topic, Map<Integer, Long> lastPersistedOffsets)
{
// Set up committer.
return () ->
@@ -2280,7 +2300,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
};
}
- public TransactionalSegmentPublisher getPublisher(TaskToolbox toolbox, boolean useTransaction)
+ TransactionalSegmentPublisher createPublisher(TaskToolbox toolbox, boolean useTransaction)
{
return (segments, commitMetadata) -> {
final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue(
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
index 6525d12..1c20980 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
@@ -29,22 +29,22 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.http.client.HttpClient;
-import io.druid.java.util.http.client.Request;
-import io.druid.java.util.http.client.response.FullResponseHandler;
-import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.indexer.TaskLocation;
import io.druid.indexing.common.RetryPolicy;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.TaskInfoProvider;
-import io.druid.indexer.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.IOE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
import io.druid.segment.realtime.firehose.ChatHandlerResource;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.handler.codec.http.HttpMethod;
@@ -166,19 +166,14 @@ public class KafkaIndexTaskClient
public Map<Integer, Long> pause(final String id)
{
- return pause(id, 0);
- }
-
- public Map<Integer, Long> pause(final String id, final long timeout)
- {
- log.debug("Pause task[%s] timeout[%d]", id, timeout);
+ log.debug("Pause task[%s]", id);
try {
final FullResponseHolder response = submitRequest(
id,
HttpMethod.POST,
"pause",
- timeout > 0 ? StringUtils.format("timeout=%d", timeout) : null,
+ null,
true
);
@@ -320,18 +315,17 @@ public class KafkaIndexTaskClient
public boolean setEndOffsets(
final String id,
final Map<Integer, Long> endOffsets,
- final boolean resume,
final boolean finalize
)
{
- log.debug("SetEndOffsets task[%s] endOffsets[%s] resume[%s] finalize[%s]", id, endOffsets, resume, finalize);
+ log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", id, endOffsets, finalize);
try {
final FullResponseHolder response = submitRequest(
id,
HttpMethod.POST,
"offsets/end",
- StringUtils.format("resume=%s&finish=%s", resume, finalize),
+ StringUtils.format("finish=%s", finalize),
jsonMapper.writeValueAsBytes(endOffsets),
true
);
@@ -375,18 +369,13 @@ public class KafkaIndexTaskClient
public ListenableFuture<Map<Integer, Long>> pauseAsync(final String id)
{
- return pauseAsync(id, 0);
- }
-
- public ListenableFuture<Map<Integer, Long>> pauseAsync(final String id, final long timeout)
- {
return executorService.submit(
new Callable<Map<Integer, Long>>()
{
@Override
public Map<Integer, Long> call() throws Exception
{
- return pause(id, timeout);
+ return pause(id);
}
}
);
@@ -449,7 +438,7 @@ public class KafkaIndexTaskClient
}
public ListenableFuture<Boolean> setEndOffsetsAsync(
- final String id, final Map<Integer, Long> endOffsets, final boolean resume, final boolean finalize
+ final String id, final Map<Integer, Long> endOffsets, final boolean finalize
)
{
return executorService.submit(
@@ -458,7 +447,7 @@ public class KafkaIndexTaskClient
@Override
public Boolean call() throws Exception
{
- return setEndOffsets(id, endOffsets, resume, finalize);
+ return setEndOffsets(id, endOffsets, finalize);
}
}
);
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index f79b297..454deff 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -1517,7 +1517,7 @@ public class KafkaSupervisor implements Supervisor
log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets);
for (final String taskId : setEndOffsetTaskIds) {
- setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, true, finalize));
+ setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize));
}
List<Boolean> results = Futures.successfulAsList(setEndOffsetFutures)
@@ -1764,7 +1764,6 @@ public class KafkaSupervisor implements Supervisor
new KafkaPartitions(ioConfig.getTopic(), endPartitions),
consumerProperties,
true,
- false,
minimumMessageTime,
maximumMessageTime,
ioConfig.isSkipOffsetGaps()
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
index 49a9b90..22a792f 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
@@ -71,8 +71,7 @@ public class KafkaIOConfigTest
Assert.assertEquals("mytopic", config.getEndPartitions().getTopic());
Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
- Assert.assertEquals(true, config.isUseTransaction());
- Assert.assertEquals(false, config.isPauseAfterRead());
+ Assert.assertTrue(config.isUseTransaction());
Assert.assertFalse("minimumMessageTime", config.getMinimumMessageTime().isPresent());
Assert.assertFalse("maximumMessageTime", config.getMaximumMessageTime().isPresent());
Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps());
@@ -88,7 +87,6 @@ public class KafkaIOConfigTest
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
- + " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\",\n"
+ " \"skipOffsetGaps\": true\n"
@@ -109,8 +107,7 @@ public class KafkaIOConfigTest
Assert.assertEquals("mytopic", config.getEndPartitions().getTopic());
Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
- Assert.assertEquals(false, config.isUseTransaction());
- Assert.assertEquals(true, config.isPauseAfterRead());
+ Assert.assertFalse(config.isUseTransaction());
Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), config.getMinimumMessageTime().get());
Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), config.getMaximumMessageTime().get());
Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps());
@@ -125,7 +122,6 @@ public class KafkaIOConfigTest
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
- + " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
+ "}";
@@ -145,7 +141,6 @@ public class KafkaIOConfigTest
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
- + " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
+ "}";
@@ -165,7 +160,6 @@ public class KafkaIOConfigTest
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
- + " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
+ "}";
@@ -185,7 +179,6 @@ public class KafkaIOConfigTest
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"useTransaction\": false,\n"
- + " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
+ "}";
@@ -206,7 +199,6 @@ public class KafkaIOConfigTest
+ " \"endPartitions\": {\"topic\":\"other\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
- + " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
+ "}";
@@ -227,7 +219,6 @@ public class KafkaIOConfigTest
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
- + " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
+ "}";
@@ -248,7 +239,6 @@ public class KafkaIOConfigTest
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
- + " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
+ "}";
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java
index 2834cc8..b63563b 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java
@@ -27,17 +27,17 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import io.druid.java.util.http.client.HttpClient;
-import io.druid.java.util.http.client.Request;
-import io.druid.java.util.http.client.response.FullResponseHandler;
-import io.druid.java.util.http.client.response.FullResponseHolder;
-import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexer.TaskLocation;
+import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexing.common.TaskStatus;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMockSupport;
@@ -56,6 +56,7 @@ import org.junit.runners.Parameterized;
import java.io.IOException;
import java.net.URL;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -139,13 +140,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
Assert.assertEquals(false, client.stop(TEST_ID, true));
Assert.assertEquals(false, client.resume(TEST_ID));
Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID));
- Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID, 10));
+ Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID));
Assert.assertEquals(KafkaIndexTask.Status.NOT_STARTED, client.getStatus(TEST_ID));
Assert.assertEquals(null, client.getStartTime(TEST_ID));
Assert.assertEquals(ImmutableMap.of(), client.getCurrentOffsets(TEST_ID, true));
Assert.assertEquals(ImmutableMap.of(), client.getEndOffsets(TEST_ID));
- Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.<Integer, Long>of(), false, true));
- Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.<Integer, Long>of(), true, true));
+ Assert.assertEquals(false, client.setEndOffsets(TEST_ID, Collections.emptyMap(), true));
+ Assert.assertEquals(false, client.setEndOffsets(TEST_ID, Collections.emptyMap(), true));
verifyAll();
}
@@ -432,33 +433,6 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
}
@Test
- public void testPauseWithTimeout() throws Exception
- {
- Capture<Request> captured = Capture.newInstance();
- expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).times(2);
- expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}").anyTimes();
- expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
- Futures.immediateFuture(responseHolder)
- );
- replayAll();
-
- Map<Integer, Long> results = client.pause(TEST_ID, 101);
- verifyAll();
-
- Request request = captured.getValue();
- Assert.assertEquals(HttpMethod.POST, request.getMethod());
- Assert.assertEquals(
- new URL("http://test-host:1234/druid/worker/v1/chat/test-id/pause?timeout=101"),
- request.getUrl()
- );
- Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id"));
-
- Assert.assertEquals(2, results.size());
- Assert.assertEquals(1, (long) results.get(0));
- Assert.assertEquals(10, (long) results.get(1));
- }
-
- @Test
public void testPauseWithSubsequentGetOffsets() throws Exception
{
Capture<Request> captured = Capture.newInstance();
@@ -544,13 +518,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
);
replayAll();
- client.setEndOffsets(TEST_ID, endOffsets, false, true);
+ client.setEndOffsets(TEST_ID, endOffsets, true);
verifyAll();
Request request = captured.getValue();
Assert.assertEquals(HttpMethod.POST, request.getMethod());
Assert.assertEquals(
- new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=false&finish=true"),
+ new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?finish=true"),
request.getUrl()
);
Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id"));
@@ -569,13 +543,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
);
replayAll();
- client.setEndOffsets(TEST_ID, endOffsets, true, true);
+ client.setEndOffsets(TEST_ID, endOffsets, true);
verifyAll();
Request request = captured.getValue();
Assert.assertEquals(HttpMethod.POST, request.getMethod());
Assert.assertEquals(
- new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=true&finish=true"),
+ new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?finish=true"),
request.getUrl()
);
Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id"));
@@ -724,39 +698,6 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
}
@Test
- public void testPauseAsyncWithTimeout() throws Exception
- {
- final int numRequests = TEST_IDS.size();
- Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
- expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
- expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
- expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
- Futures.immediateFuture(responseHolder)
- ).times(numRequests);
- replayAll();
-
- List<URL> expectedUrls = Lists.newArrayList();
- List<ListenableFuture<Map<Integer, Long>>> futures = Lists.newArrayList();
- for (String testId : TEST_IDS) {
- expectedUrls.add(new URL(StringUtils.format(URL_FORMATTER, TEST_HOST, TEST_PORT, testId, "pause?timeout=9")));
- futures.add(client.pauseAsync(testId, 9));
- }
-
- List<Map<Integer, Long>> responses = Futures.allAsList(futures).get();
-
- verifyAll();
- List<Request> requests = captured.getValues();
-
- Assert.assertEquals(numRequests, requests.size());
- Assert.assertEquals(numRequests, responses.size());
- for (int i = 0; i < numRequests; i++) {
- Assert.assertEquals(HttpMethod.POST, requests.get(i).getMethod());
- Assert.assertTrue("unexpectedURL", expectedUrls.contains(requests.get(i).getUrl()));
- Assert.assertEquals(Maps.newLinkedHashMap(ImmutableMap.of(0, 1L)), responses.get(i));
- }
- }
-
- @Test
public void testGetStatusAsync() throws Exception
{
final int numRequests = TEST_IDS.size();
@@ -909,9 +850,9 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
TEST_HOST,
TEST_PORT,
testId,
- StringUtils.format("offsets/end?resume=%s&finish=%s", false, true)
+ StringUtils.format("offsets/end?finish=%s", true)
)));
- futures.add(client.setEndOffsetsAsync(testId, endOffsets, false, true));
+ futures.add(client.setEndOffsetsAsync(testId, endOffsets, true));
}
List<Boolean> responses = Futures.allAsList(futures).get();
@@ -950,11 +891,11 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
TEST_HOST,
TEST_PORT,
testId,
- "offsets/end?resume=true&finish=true"
+ "offsets/end?finish=true"
)
)
);
- futures.add(client.setEndOffsetsAsync(testId, endOffsets, true, true));
+ futures.add(client.setEndOffsetsAsync(testId, endOffsets, true));
}
List<Boolean> responses = Futures.allAsList(futures).get();
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
index 45f3003..4232b58 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -38,10 +38,6 @@ import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.core.NoopEmitter;
-import io.druid.java.util.emitter.service.ServiceEmitter;
-import io.druid.java.util.metrics.MonitorScheduler;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache;
import io.druid.data.input.impl.DimensionsSpec;
@@ -85,6 +81,10 @@ import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.JSONPathFieldSpec;
import io.druid.java.util.common.parsers.JSONPathSpec;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.core.NoopEmitter;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+import io.druid.java.util.metrics.MonitorScheduler;
import io.druid.math.expr.ExprMacroTable;
import io.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import io.druid.metadata.EntryExistsException;
@@ -350,7 +350,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -392,7 +391,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -468,7 +466,6 @@ public class KafkaIndexTaskTest
endPartitions,
consumerProps,
true,
- false,
null,
null,
false
@@ -481,7 +478,7 @@ public class KafkaIndexTaskTest
final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getCurrentOffsets());
Assert.assertTrue(checkpoint1.getPartitionOffsetMap().equals(currentOffsets) || checkpoint2.getPartitionOffsetMap()
.equals(currentOffsets));
- task.setEndOffsets(currentOffsets, true, false);
+ task.setEndOffsets(currentOffsets, false);
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(1, checkpointRequestsHash.size());
@@ -536,7 +533,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
- false,
DateTimes.of("2010"),
null,
false
@@ -590,7 +586,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
DateTimes.of("2010"),
false
@@ -654,7 +649,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -714,7 +708,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -755,7 +748,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -807,7 +799,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -817,10 +808,7 @@ public class KafkaIndexTaskTest
final ListenableFuture<TaskStatus> future = runTask(task);
// Wait for task to exit
- Assert.assertEquals(
- isIncrementalHandoffSupported ? TaskState.SUCCESS : TaskState.FAILED,
- future.get().getStatusCode()
- );
+ Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
@@ -861,7 +849,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 7L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -894,7 +881,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -908,7 +894,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -962,7 +947,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -976,7 +960,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 9L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -1031,7 +1014,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
false,
- false,
null,
null,
false
@@ -1045,7 +1027,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 9L)),
kafkaServer.consumerProperties(),
false,
- false,
null,
null,
false
@@ -1105,7 +1086,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -1170,7 +1150,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -1184,7 +1163,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(1, 1L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -1240,7 +1218,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -1277,7 +1254,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -1329,7 +1305,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -1354,7 +1329,7 @@ public class KafkaIndexTaskTest
Assert.assertEquals(KafkaIndexTask.Status.READING, task.getStatus());
Map<Integer, Long> currentOffsets = objectMapper.readValue(
- task.pause(0).getEntity().toString(),
+ task.pause().getEntity().toString(),
new TypeReference<Map<Integer, Long>>()
{
}
@@ -1402,93 +1377,6 @@ public class KafkaIndexTaskTest
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}
- @Test(timeout = 60_000L)
- public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception
- {
- final KafkaIndexTask task = createTask(
- null,
- new KafkaIOConfig(
- "sequence0",
- new KafkaPartitions(topic, ImmutableMap.of(0, 1L)),
- new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
- kafkaServer.consumerProperties(),
- true,
- true,
- null,
- null,
- false
- )
- );
-
- final ListenableFuture<TaskStatus> future = runTask(task);
-
- try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
- for (ProducerRecord<byte[], byte[]> record : records) {
- kafkaProducer.send(record).get();
- }
- }
-
- while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
- Thread.sleep(25);
- }
-
- // reached the end of the assigned offsets and paused instead of publishing
- Assert.assertEquals(task.getEndOffsets(), task.getCurrentOffsets());
- Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
-
- Assert.assertEquals(ImmutableMap.of(0, 3L), task.getEndOffsets());
- Map<Integer, Long> newEndOffsets = ImmutableMap.of(0, 4L);
- task.setEndOffsets(newEndOffsets, false, true);
- Assert.assertEquals(newEndOffsets, task.getEndOffsets());
- Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
- task.resume();
-
- while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
- Thread.sleep(25);
- }
-
- // reached the end of the updated offsets and paused
- Assert.assertEquals(newEndOffsets, task.getCurrentOffsets());
- Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
-
- // try again but with resume flag == true
- newEndOffsets = ImmutableMap.of(0, 7L);
- task.setEndOffsets(newEndOffsets, true, true);
- Assert.assertEquals(newEndOffsets, task.getEndOffsets());
- Assert.assertNotEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
-
- while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
- Thread.sleep(25);
- }
-
- Assert.assertEquals(newEndOffsets, task.getCurrentOffsets());
- Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
-
- task.resume();
-
- Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
-
- // Check metrics
- Assert.assertEquals(4, task.getFireDepartmentMetrics().processed());
- Assert.assertEquals(1, task.getFireDepartmentMetrics().unparseable());
- Assert.assertEquals(1, task.getFireDepartmentMetrics().thrownAway());
-
- // Check published metadata
- SegmentDescriptor desc1 = SD(task, "2009/P1D", 0);
- SegmentDescriptor desc2 = SD(task, "2010/P1D", 0);
- SegmentDescriptor desc3 = SD(task, "2011/P1D", 0);
- Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors());
- Assert.assertEquals(
- new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 7L))),
- metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
- );
-
- // Check segments in deep storage
- Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc1));
- Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc2));
- Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc3));
- }
-
@Test(timeout = 30_000L)
public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception
{
@@ -1500,7 +1388,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -1513,7 +1400,7 @@ public class KafkaIndexTaskTest
Thread.sleep(2000);
}
- task.pause(0);
+ task.pause();
while (!task.getStatus().equals(KafkaIndexTask.Status.PAUSED)) {
Thread.sleep(25);
@@ -1539,7 +1426,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 500L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
@@ -1593,7 +1479,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
- false,
null,
null,
false
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index c429265..60a47f0 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -254,7 +254,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals("myCustomValue", taskConfig.getConsumerProperties().get("myCustomKey"));
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
- Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead());
Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent());
Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent());
Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps());
@@ -1046,7 +1045,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskClient.setEndOffsetsAsync(
EasyMock.contains("sequenceName-0"),
EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)),
- EasyMock.eq(true),
EasyMock.eq(true)
)
).andReturn(Futures.immediateFuture(true)).times(2);
@@ -1074,7 +1072,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
KafkaIOConfig taskConfig = kafkaIndexTask.getIOConfig();
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
- Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead());
Assert.assertEquals(topic, taskConfig.getStartPartitions().getTopic());
Assert.assertEquals(10L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0));
@@ -1164,7 +1161,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals("myCustomValue", capturedTaskConfig.getConsumerProperties().get("myCustomKey"));
Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName());
Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction());
- Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead());
// check that the new task was created with starting offsets matching where the publishing task finished
Assert.assertEquals(topic, capturedTaskConfig.getStartPartitions().getTopic());
@@ -1255,7 +1251,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals("myCustomValue", capturedTaskConfig.getConsumerProperties().get("myCustomKey"));
Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName());
Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction());
- Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead());
// check that the new task was created with starting offsets matching where the publishing task finished
Assert.assertEquals(topic, capturedTaskConfig.getStartPartitions().getTopic());
@@ -1570,7 +1565,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskClient.setEndOffsetsAsync(
EasyMock.contains("sequenceName-0"),
EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)),
- EasyMock.eq(true),
EasyMock.eq(true)
)
).andReturn(Futures.<Boolean>immediateFailedFuture(new RuntimeException())).times(2);
@@ -1695,7 +1689,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
expect(taskClient.pauseAsync("id2"))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 15L, 1, 25L, 2, 30L)));
- expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true, true))
+ expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true))
.andReturn(Futures.immediateFuture(true));
taskQueue.shutdown("id3");
expectLastCall().times(2);
@@ -2125,7 +2119,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
endPartitions,
ImmutableMap.<String, String>of(),
true,
- false,
minimumMessageTime,
maximumMessageTime,
false
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
index 8e18d72..aeb0578 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
@@ -179,6 +179,7 @@ public interface Task
*/
TaskStatus run(TaskToolbox toolbox) throws Exception;
+ @Nullable
Map<String, Object> getContext();
@Nullable
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java
index 72a7f8b..74a1011 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java
@@ -23,12 +23,13 @@ import com.google.common.collect.ImmutableList;
import io.druid.timeline.DataSegment;
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
public class SegmentsAndMetadata
{
- private static final SegmentsAndMetadata NIL = new SegmentsAndMetadata(ImmutableList.<DataSegment>of(), null);
+ private static final SegmentsAndMetadata NIL = new SegmentsAndMetadata(Collections.emptyList(), null);
private final Object commitMetadata;
private final ImmutableList<DataSegment> segments;
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
index 6d11092..33b823d 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -221,7 +220,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
throw e;
}
catch (Exception e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org