You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/05 02:58:01 UTC
[2/4] storm git commit: STORM-2913: Add metadata to at-most-once and
at-least-once commits
STORM-2913: Add metadata to at-most-once and at-least-once commits
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e756889a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e756889a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e756889a
Branch: refs/heads/master
Commit: e756889aa712aee22c216bc99ee17b972abf886d
Parents: eff32a3
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sat Jan 27 15:15:45 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sun Feb 4 23:18:54 2018 +0100
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 80 ++++++-----------
.../storm/kafka/spout/KafkaSpoutConfig.java | 4 +-
.../spout/internal/CommitMetadataManager.java | 91 ++++++++++++++++++++
.../spout/KafkaSpoutMessagingGuaranteeTest.java | 45 ++++++----
4 files changed, 146 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 84e7851..9d133a7 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -23,8 +23,6 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
@@ -53,6 +51,7 @@ import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
import org.apache.storm.kafka.spout.internal.CommitMetadata;
+import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
import org.apache.storm.kafka.spout.internal.OffsetManager;
@@ -72,7 +71,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
//Initial delay for the commit and subscription refresh timers
public static final long TIMER_DELAY_MS = 500;
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
- private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
// Storm
protected SpoutOutputCollector collector;
@@ -104,8 +102,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
// Triggers when a subscription should be refreshed
private transient Timer refreshSubscriptionTimer;
private transient TopologyContext context;
- // Metadata information to commit to Kafka. It is unique per spout per topology.
- private transient String commitMetadata;
+ private transient CommitMetadataManager commitMetadataManager;
private transient KafkaOffsetMetric kafkaOffsetMetric;
public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
@@ -142,7 +139,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
offsetManagers = new HashMap<>();
emitted = new HashSet<>();
waitingToEmit = new HashMap<>();
- setCommitMetadata(context);
+ commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee());
tupleListener.open(conf, context);
if (canRegisterMetrics()) {
@@ -154,7 +151,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private void registerMetric() {
LOG.info("Registering Spout Metrics");
- kafkaOffsetMetric = new KafkaOffsetMetric(() -> offsetManagers, () -> kafkaConsumer);
+ kafkaOffsetMetric = new KafkaOffsetMetric(() -> Collections.unmodifiableMap(offsetManagers), () -> kafkaConsumer);
context.registerMetric("kafkaOffset", kafkaOffsetMetric, kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs());
}
@@ -168,16 +165,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
return true;
}
- private void setCommitMetadata(TopologyContext context) {
- try {
- commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
- context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
- } catch (JsonProcessingException e) {
- LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e);
- throw new RuntimeException(e);
- }
- }
-
private boolean isAtLeastOnceProcessing() {
return kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE;
}
@@ -215,8 +202,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
retryService.retainAll(partitions);
/*
- * Emitted messages for partitions that are no longer assigned to this spout can't
- * be acked and should not be retried, hence remove them from emitted collection.
+ * Emitted messages for partitions that are no longer assigned to this spout can't be acked and should not be retried, hence
+ * remove them from emitted collection.
*/
emitted.removeIf(msgId -> !partitions.contains(msgId.getTopicPartition()));
}
@@ -246,7 +233,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
if (committedOffset != null) {
// offset was previously committed for this consumer group and topic-partition, either by this or another topology.
- if (isOffsetCommittedByThisTopology(newTp, committedOffset)) {
+ if (commitMetadataManager.isOffsetCommittedByThisTopology(newTp, committedOffset, Collections.unmodifiableMap(offsetManagers))) {
// Another KafkaSpout instance (of this topology) already committed, therefore FirstPollOffsetStrategy does not apply.
kafkaConsumer.seek(newTp, committedOffset.offset());
} else {
@@ -274,31 +261,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
}
- /**
- * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology. This info is used to decide if
- * {@link FirstPollOffsetStrategy} should be applied
- *
- * @param tp topic-partition
- * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
- * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
- */
- private boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset) {
- try {
- if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).hasCommitted()) {
- return true;
- }
-
- final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
- return committedMetadata.getTopologyId().equals(context.getStormId());
- } catch (IOException e) {
- LOG.warn("Failed to deserialize [{}]. Error likely occurred because the last commit "
- + "for this topic-partition was done using an earlier version of Storm. "
- + "Defaulting to behavior compatible with earlier version", committedOffset);
- LOG.trace("", e);
- return false;
- }
- }
-
// ======== Next Tuple =======
@Override
public void nextTuple() {
@@ -311,7 +273,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
if (isAtLeastOnceProcessing()) {
commitOffsetsForAckedTuples(kafkaConsumer.assignment());
} else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
- commitFetchedOffsetsAsync(kafkaConsumer.assignment());
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
+ createFetchedOffsetsMetadata(kafkaConsumer.assignment());
+ kafkaConsumer.commitAsync(offsetsToCommit, null);
+ LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
}
}
@@ -396,7 +361,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
numPolledRecords);
if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
//Commit polled records immediately to ensure delivery is at-most-once.
- kafkaConsumer.commitSync();
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
+ createFetchedOffsetsMetadata(kafkaConsumer.assignment());
+ kafkaConsumer.commitSync(offsetsToCommit);
+ LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
}
return consumerRecords;
} finally {
@@ -469,11 +437,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
} else {
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
- if (committedOffset != null && isOffsetCommittedByThisTopology(tp, committedOffset)
- && committedOffset.offset() > record.offset()) {
+ if (isAtLeastOnceProcessing()
+ && committedOffset != null
+ && committedOffset.offset() > record.offset()
+ && commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, Collections.unmodifiableMap(offsetManagers))) {
// Ensures that after a topology with this id is started, the consumer fetch
// position never falls behind the committed offset (STORM-2844)
- throw new IllegalStateException("Attempting to emit a message that has already been committed.");
+ throw new IllegalStateException("Attempting to emit a message that has already been committed."
+ + " This should never occur when using the at-least-once processing guarantee.");
}
final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record);
@@ -519,13 +490,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
}
- private void commitFetchedOffsetsAsync(Set<TopicPartition> assignedPartitions) {
+ private Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata(Set<TopicPartition> assignedPartitions) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition tp : assignedPartitions) {
- offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp)));
+ offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp), commitMetadataManager.getCommitMetadata()));
}
- kafkaConsumer.commitAsync(offsetsToCommit, null);
- LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
+ return offsetsToCommit;
}
private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions) {
@@ -536,7 +506,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetManager> tpOffset : assignedOffsetManagers.entrySet()) {
- final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadata);
+ final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadataManager.getCommitMetadata());
if (nextCommitOffset != null) {
nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index c2305cb..40e449a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -474,7 +474,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
* error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer
* requests an offset that was deleted.
*/
- LOG.info("Setting consumer property '{}' to 'earliest' to ensure at-least-once processing",
+ LOG.info("Setting Kafka consumer property '{}' to 'earliest' to ensure at-least-once processing",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
} else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
@@ -488,7 +488,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
+ " Some messages may be processed more than once.");
}
}
- LOG.info("Setting consumer property '{}' to 'false', because the spout does not support auto-commit",
+ LOG.info("Setting Kafka consumer property '{}' to 'false', because the spout does not support auto-commit",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
new file mode 100644
index 0000000..a63619c
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public final class CommitMetadataManager {
+
+ private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+ private static final Logger LOG = LoggerFactory.getLogger(CommitMetadataManager.class);
+ // Metadata information to commit to Kafka. It is unique per spout instance.
+ private final String commitMetadata;
+ private final ProcessingGuarantee processingGuarantee;
+ private final TopologyContext context;
+
+ /**
+ * Create a manager with the given context.
+ */
+ public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
+ this.context = context;
+ try {
+ commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
+ context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
+ this.processingGuarantee = processingGuarantee;
+ } catch (JsonProcessingException e) {
+ LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology.
+ *
+ * @param tp The topic partition the commit metadata belongs to.
+ * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
+ * @param offsetManagers The offset managers.
+ * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
+ */
+ public boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset,
+ Map<TopicPartition, OffsetManager> offsetManagers) {
+ try {
+ if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
+ && offsetManagers.containsKey(tp)
+ && offsetManagers.get(tp).hasCommitted()) {
+ return true;
+ }
+
+ final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
+ return committedMetadata.getTopologyId().equals(context.getStormId());
+ } catch (IOException e) {
+ LOG.warn("Failed to deserialize expected commit metadata [{}]."
+ + " This error is expected to occur once per partition, if the last commit to each partition"
+ + " was by an earlier version of the KafkaSpout, or by a process other than the KafkaSpout. "
+ + "Defaulting to behavior compatible with earlier version", committedOffset);
+ LOG.trace("", e);
+ return false;
+ }
+ }
+
+ public String getCommitMetadata() {
+ return commitMetadata;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index 12391c8..a9e7c6c 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -21,11 +21,12 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -42,6 +43,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
import org.apache.storm.kafka.spout.subscription.Subscription;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -84,12 +86,19 @@ public class KafkaSpoutMessagingGuaranteeTest {
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1))));
spout.nextTuple();
+
+ when(consumerMock.position(partition)).thenReturn(1L);
//The spout should have emitted the tuple, and must have committed it before emit
InOrder inOrder = inOrder(consumerMock, collectorMock);
inOrder.verify(consumerMock).poll(anyLong());
- inOrder.verify(consumerMock).commitSync();
+ inOrder.verify(consumerMock).commitSync(commitCapture.capture());
inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
+
+ CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
+ assertThat(committedOffsets.get(partition).offset(), is(0L));
+ assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata()));
}
private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> spoutConfig) {
@@ -172,7 +181,13 @@ public class KafkaSpoutMessagingGuaranteeTest {
doTestModeCannotReplayTuples(spoutConfig);
}
- private void doTestModeDoesNotCommitAckedTuples(KafkaSpoutConfig<String, String> spoutConfig) {
+ @Test
+ public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
+ //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted
+ KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+ .setTupleTrackingEnforced(true)
+ .build();
try (SimulatedTime time = new SimulatedTime()) {
KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition);
@@ -180,6 +195,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1))));
spout.nextTuple();
+ clearInvocations(consumerMock);
ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
@@ -189,21 +205,15 @@ public class KafkaSpoutMessagingGuaranteeTest {
Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
+ when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
+
spout.nextTuple();
- verify(consumerMock, never()).commitSync(any());
+ verify(consumerMock, never()).commitSync(argThat(arg -> {
+ return !arg.containsKey(partition);
+ }));
}
}
-
- @Test
- public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
- //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted
- KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
- .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
- .setTupleTrackingEnforced(true)
- .build();
- doTestModeDoesNotCommitAckedTuples(spoutConfig);
- }
@Test
public void testNoGuaranteeModeCommitsPolledTuples() throws Exception {
@@ -233,9 +243,10 @@ public class KafkaSpoutMessagingGuaranteeTest {
verify(consumerMock).commitAsync(commitCapture.capture(), isNull());
- Map<TopicPartition, OffsetAndMetadata> commit = commitCapture.getValue();
- assertThat(commit.containsKey(partition), is(true));
- assertThat(commit.get(partition).offset(), is(1L));
+ CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
+ assertThat(committedOffsets.get(partition).offset(), is(1L));
+ assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata()));
}
}