You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/05 02:58:01 UTC

[2/4] storm git commit: STORM-2913: Add metadata to at-most-once and at-least-once commits

STORM-2913: Add metadata to at-most-once and at-least-once commits


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e756889a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e756889a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e756889a

Branch: refs/heads/master
Commit: e756889aa712aee22c216bc99ee17b972abf886d
Parents: eff32a3
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sat Jan 27 15:15:45 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sun Feb 4 23:18:54 2018 +0100

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 80 ++++++-----------
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  4 +-
 .../spout/internal/CommitMetadataManager.java   | 91 ++++++++++++++++++++
 .../spout/KafkaSpoutMessagingGuaranteeTest.java | 45 ++++++----
 4 files changed, 146 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 84e7851..9d133a7 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -23,8 +23,6 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
@@ -53,6 +51,7 @@ import org.apache.kafka.common.errors.RetriableException;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
 import org.apache.storm.kafka.spout.internal.CommitMetadata;
+import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
 import org.apache.storm.kafka.spout.internal.OffsetManager;
@@ -72,7 +71,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     //Initial delay for the commit and subscription refresh timers
     public static final long TIMER_DELAY_MS = 500;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
-    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
 
     // Storm
     protected SpoutOutputCollector collector;
@@ -104,8 +102,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     // Triggers when a subscription should be refreshed
     private transient Timer refreshSubscriptionTimer;
     private transient TopologyContext context;
-    // Metadata information to commit to Kafka. It is unique per spout per topology.
-    private transient String commitMetadata;
+    private transient CommitMetadataManager commitMetadataManager;
     private transient KafkaOffsetMetric kafkaOffsetMetric;
 
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
@@ -142,7 +139,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         offsetManagers = new HashMap<>();
         emitted = new HashSet<>();
         waitingToEmit = new HashMap<>();
-        setCommitMetadata(context);
+        commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee());
 
         tupleListener.open(conf, context);
         if (canRegisterMetrics()) {
@@ -154,7 +151,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private void registerMetric() {
         LOG.info("Registering Spout Metrics");
-        kafkaOffsetMetric = new KafkaOffsetMetric(() -> offsetManagers, () -> kafkaConsumer);
+        kafkaOffsetMetric = new KafkaOffsetMetric(() -> Collections.unmodifiableMap(offsetManagers), () -> kafkaConsumer);
         context.registerMetric("kafkaOffset", kafkaOffsetMetric, kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs());
     }
 
@@ -168,16 +165,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return true;
     }
 
-    private void setCommitMetadata(TopologyContext context) {
-        try {
-            commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
-                context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
-        } catch (JsonProcessingException e) {
-            LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e);
-            throw new RuntimeException(e);
-        }
-    }
-
     private boolean isAtLeastOnceProcessing() {
         return kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE;
     }
@@ -215,8 +202,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 retryService.retainAll(partitions);
 
                 /*
-                 * Emitted messages for partitions that are no longer assigned to this spout can't
-                 * be acked and should not be retried, hence remove them from emitted collection.
+                 * Emitted messages for partitions that are no longer assigned to this spout can't be acked and should not be retried, hence
+                 * remove them from emitted collection.
                  */
                 emitted.removeIf(msgId -> !partitions.contains(msgId.getTopicPartition()));
             }
@@ -246,7 +233,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
             if (committedOffset != null) {
                 // offset was previously committed for this consumer group and topic-partition, either by this or another topology.
-                if (isOffsetCommittedByThisTopology(newTp, committedOffset)) {
+                if (commitMetadataManager.isOffsetCommittedByThisTopology(newTp, committedOffset, Collections.unmodifiableMap(offsetManagers))) {
                     // Another KafkaSpout instance (of this topology) already committed, therefore FirstPollOffsetStrategy does not apply.
                     kafkaConsumer.seek(newTp, committedOffset.offset());
                 } else {
@@ -274,31 +261,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
     }
 
-    /**
-     * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology. This info is used to decide if
-     * {@link FirstPollOffsetStrategy} should be applied
-     *
-     * @param tp topic-partition
-     * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
-     * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
-     */
-    private boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset) {
-        try {
-            if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).hasCommitted()) {
-                return true;
-            }
-
-            final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
-            return committedMetadata.getTopologyId().equals(context.getStormId());
-        } catch (IOException e) {
-            LOG.warn("Failed to deserialize [{}]. Error likely occurred because the last commit "
-                + "for this topic-partition was done using an earlier version of Storm. "
-                + "Defaulting to behavior compatible with earlier version", committedOffset);
-            LOG.trace("", e);
-            return false;
-        }
-    }
-
     // ======== Next Tuple =======
     @Override
     public void nextTuple() {
@@ -311,7 +273,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 if (isAtLeastOnceProcessing()) {
                     commitOffsetsForAckedTuples(kafkaConsumer.assignment());
                 } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
-                    commitFetchedOffsetsAsync(kafkaConsumer.assignment());
+                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
+                        createFetchedOffsetsMetadata(kafkaConsumer.assignment());
+                    kafkaConsumer.commitAsync(offsetsToCommit, null);
+                    LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
                 }
             }
 
@@ -396,7 +361,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 numPolledRecords);
             if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
                 //Commit polled records immediately to ensure delivery is at-most-once.
-                kafkaConsumer.commitSync();
+                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
+                    createFetchedOffsetsMetadata(kafkaConsumer.assignment());
+                kafkaConsumer.commitSync(offsetsToCommit);
+                LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
             }
             return consumerRecords;
         } finally {
@@ -469,11 +437,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
         } else {
             final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
-            if (committedOffset != null && isOffsetCommittedByThisTopology(tp, committedOffset)
-                && committedOffset.offset() > record.offset()) {
+            if (isAtLeastOnceProcessing()
+                && committedOffset != null 
+                && committedOffset.offset() > record.offset()
+                && commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, Collections.unmodifiableMap(offsetManagers))) {
                 // Ensures that after a topology with this id is started, the consumer fetch
                 // position never falls behind the committed offset (STORM-2844)
-                throw new IllegalStateException("Attempting to emit a message that has already been committed.");
+                throw new IllegalStateException("Attempting to emit a message that has already been committed."
+                    + " This should never occur when using the at-least-once processing guarantee.");
             }
 
             final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record);
@@ -519,13 +490,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
     }
 
-    private void commitFetchedOffsetsAsync(Set<TopicPartition> assignedPartitions) {
+    private Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata(Set<TopicPartition> assignedPartitions) {
         Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
         for (TopicPartition tp : assignedPartitions) {
-            offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp)));
+            offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp), commitMetadataManager.getCommitMetadata()));
         }
-        kafkaConsumer.commitAsync(offsetsToCommit, null);
-        LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
+        return offsetsToCommit;
     }
     
     private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions) {
@@ -536,7 +506,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
         for (Map.Entry<TopicPartition, OffsetManager> tpOffset : assignedOffsetManagers.entrySet()) {
-            final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadata);
+            final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadataManager.getCommitMetadata());
             if (nextCommitOffset != null) {
                 nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index c2305cb..40e449a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -474,7 +474,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
                  * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer
                  * requests an offset that was deleted.
                  */
-                LOG.info("Setting consumer property '{}' to 'earliest' to ensure at-least-once processing",
+                LOG.info("Setting Kafka consumer property '{}' to 'earliest' to ensure at-least-once processing",
                     ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
                 builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
             } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
@@ -488,7 +488,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
                     + " Some messages may be processed more than once.");
             }
         }
-        LOG.info("Setting consumer property '{}' to 'false', because the spout does not support auto-commit",
+        LOG.info("Setting Kafka consumer property '{}' to 'false', because the spout does not support auto-commit",
             ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
         builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
new file mode 100644
index 0000000..a63619c
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public final class CommitMetadataManager {
+
+    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+    private static final Logger LOG = LoggerFactory.getLogger(CommitMetadataManager.class);
+    // Metadata information to commit to Kafka. It is unique per spout instance.
+    private final String commitMetadata;
+    private final ProcessingGuarantee processingGuarantee;
+    private final TopologyContext context;
+
+    /**
+     * Create a manager with the given context.
+     */
+    public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
+        this.context = context;
+        try {
+            commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
+                context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
+            this.processingGuarantee = processingGuarantee;
+        } catch (JsonProcessingException e) {
+            LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology.
+     *
+     * @param tp The topic partition the commit metadata belongs to.
+     * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
+     * @param offsetManagers The offset managers.
+     * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
+     */
+    public boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset,
+        Map<TopicPartition, OffsetManager> offsetManagers) {
+        try {
+            if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
+                && offsetManagers.containsKey(tp)
+                && offsetManagers.get(tp).hasCommitted()) {
+                return true;
+            }
+
+            final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
+            return committedMetadata.getTopologyId().equals(context.getStormId());
+        } catch (IOException e) {
+            LOG.warn("Failed to deserialize expected commit metadata [{}]."
+                + " This error is expected to occur once per partition, if the last commit to each partition"
+                + " was by an earlier version of the KafkaSpout, or by a process other than the KafkaSpout. "
+                + "Defaulting to behavior compatible with earlier version", committedOffset);
+            LOG.trace("", e);
+            return false;
+        }
+    }
+
+    public String getCommitMetadata() {
+        return commitMetadata;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e756889a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index 12391c8..a9e7c6c 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -21,11 +21,12 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -42,6 +43,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
 import org.apache.storm.kafka.spout.subscription.Subscription;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -84,12 +86,19 @@ public class KafkaSpoutMessagingGuaranteeTest {
             SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1))));
 
         spout.nextTuple();
+        
+        when(consumerMock.position(partition)).thenReturn(1L);
 
         //The spout should have emitted the tuple, and must have committed it before emit
         InOrder inOrder = inOrder(consumerMock, collectorMock);
         inOrder.verify(consumerMock).poll(anyLong());
-        inOrder.verify(consumerMock).commitSync();
+        inOrder.verify(consumerMock).commitSync(commitCapture.capture());
         inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
+        
+        CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
+        assertThat(committedOffsets.get(partition).offset(), is(0L));
+        assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata()));
     }
 
     private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> spoutConfig) {
@@ -172,7 +181,13 @@ public class KafkaSpoutMessagingGuaranteeTest {
         doTestModeCannotReplayTuples(spoutConfig);
     }
 
-    private void doTestModeDoesNotCommitAckedTuples(KafkaSpoutConfig<String, String> spoutConfig) {
+    @Test
+    public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .setTupleTrackingEnforced(true)
+            .build();
         try (SimulatedTime time = new SimulatedTime()) {
             KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition);
 
@@ -180,6 +195,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
                 SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1))));
 
             spout.nextTuple();
+            clearInvocations(consumerMock);
 
             ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
             verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
@@ -189,21 +205,15 @@ public class KafkaSpoutMessagingGuaranteeTest {
             
             Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
             
+            when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
+            
             spout.nextTuple();
             
-            verify(consumerMock, never()).commitSync(any());
+            verify(consumerMock, never()).commitSync(argThat(arg -> {
+                return !arg.containsKey(partition);
+            }));
         }
     }
-
-    @Test
-    public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
-        //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted
-        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
-            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
-            .setTupleTrackingEnforced(true)
-            .build();
-        doTestModeDoesNotCommitAckedTuples(spoutConfig);
-    }
     
     @Test
     public void testNoGuaranteeModeCommitsPolledTuples() throws Exception {
@@ -233,9 +243,10 @@ public class KafkaSpoutMessagingGuaranteeTest {
             
             verify(consumerMock).commitAsync(commitCapture.capture(), isNull());
             
-            Map<TopicPartition, OffsetAndMetadata> commit = commitCapture.getValue();
-            assertThat(commit.containsKey(partition), is(true));
-            assertThat(commit.get(partition).offset(), is(1L));
+            CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
+            Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
+            assertThat(committedOffsets.get(partition).offset(), is(1L));
+            assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata()));
         }
     }