You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/15 15:27:43 UTC
[kafka] branch 0.10.2 updated: KAFKA-7021: checkpoint offsets from
committed (#5232)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 0.10.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.10.2 by this push:
new 3af2cec KAFKA-7021: checkpoint offsets from committed (#5232)
3af2cec is described below
commit 3af2cecb9d84369021549946db95dbef9bfe68fa
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Jun 14 22:21:49 2018 -0700
KAFKA-7021: checkpoint offsets from committed (#5232)
This is a cherry-pick PR from #5207
1. add the committed offsets to checkpointable offset map.
2. add the restoration integration test for the source KTable case.
---
.../streams/processor/internals/AbstractTask.java | 4 +-
.../streams/processor/internals/StreamTask.java | 15 +-
.../integration/RestoreIntegrationTest.java | 189 +++++++++++++++++++++
3 files changed, 203 insertions(+), 5 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 8de5d23..9095fd0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -127,13 +127,13 @@ public abstract class AbstractTask {
void closeStateManager(final boolean writeCheckpoint) {
log.trace("task [{}] Closing", id());
try {
- stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
+ stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets() : null);
} catch (IOException e) {
throw new ProcessorStateException("Error while closing the state manager", e);
}
}
- protected Map<TopicPartition, Long> recordCollectorOffsets() {
+ protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
return Collections.emptyMap();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 3270596..f16c9e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -75,7 +75,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
// 2) flush produced records in the downstream and change logs of local states
recordCollector.flush();
// 3) write checkpoints for any local state
- stateMgr.checkpoint(recordCollectorOffsets());
+ stateMgr.checkpoint(activeTaskCheckpointableOffsets());
// 4) commit consumed offsets if it is dirty already
commitOffsets();
}
@@ -379,10 +379,19 @@ public class StreamTask extends AbstractTask implements Punctuator {
}
@Override
- protected Map<TopicPartition, Long> recordCollectorOffsets() {
- return recordCollector.offsets();
+ protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
+ // put both producer acked offsets and consumer committed offsets as checkpointable offsets
+ final Map<TopicPartition, Long> checkpointableOffsets = recordCollector.offsets();
+ for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
+ if (!checkpointableOffsets.containsKey(entry.getKey())) {
+ checkpointableOffsets.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ return checkpointableOffsets;
}
+
@SuppressWarnings("unchecked")
private RecordQueue createRecordQueue(TopicPartition partition, SourceNode source, final TimestampExtractor timestampExtractor) {
return new RecordQueue(partition, source, timestampExtractor);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
new file mode 100644
index 0000000..5e6d57a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertTrue;
+
+public class RestoreIntegrationTest {
+ private static final int NUM_BROKERS = 1;
+
+ private static final String APPID = "restore-test";
+
+ @ClassRule
+ public static final EmbeddedKafkaCluster CLUSTER =
+ new EmbeddedKafkaCluster(NUM_BROKERS);
+ private static final String INPUT_STREAM = "input-stream";
+ private static final String INPUT_STREAM_2 = "input-stream-2";
+ private final int numberOfKeys = 10000;
+ private KafkaStreams kafkaStreams;
+
+ @BeforeClass
+ public static void createTopics() throws InterruptedException {
+ CLUSTER.createTopic(INPUT_STREAM, 2, 1);
+ CLUSTER.createTopic(INPUT_STREAM_2, 2, 1);
+ CLUSTER.createTopic(APPID + "-store-changelog", 2, 1);
+ }
+
+ private Properties props(final String applicationId) {
+ Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
+ streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+ streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return streamsConfiguration;
+ }
+
+ @After
+ public void shutdown() {
+ if (kafkaStreams != null) {
+ kafkaStreams.close(30, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void shouldRestoreStateFromSourceTopic() throws Exception {
+ final AtomicInteger numReceived = new AtomicInteger(0);
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ final Properties props = props(APPID);
+
+ // restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on each of the two partitions
+ final int offsetLimitDelta = 1000;
+ final int offsetCheckpointed = 1000;
+ createStateForRestoration(INPUT_STREAM);
+ setCommittedOffset(INPUT_STREAM, offsetLimitDelta);
+
+ final StateDirectory stateDirectory = new StateDirectory(APPID, props.getProperty(StreamsConfig.STATE_DIR_CONFIG));
+ new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 0)), ".checkpoint"))
+ .write(Collections.singletonMap(new TopicPartition(INPUT_STREAM, 0), (long) offsetCheckpointed));
+ new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 1)), ".checkpoint"))
+ .write(Collections.singletonMap(new TopicPartition(INPUT_STREAM, 1), (long) offsetCheckpointed));
+
+ final CountDownLatch startupLatch = new CountDownLatch(1);
+ final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+ builder.table(Serdes.Integer(), Serdes.Integer(), INPUT_STREAM, "store")
+ .toStream()
+ .foreach(new ForeachAction<Integer, Integer>() {
+ @Override
+ public void apply(final Integer key, final Integer value) {
+ if (numReceived.incrementAndGet() == 2 * offsetLimitDelta)
+ shutdownLatch.countDown();
+ }
+ });
+
+ kafkaStreams = new KafkaStreams(builder, props);
+ kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
+ @Override
+ public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
+ if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
+ startupLatch.countDown();
+ }
+ }
+ });
+
+ kafkaStreams.start();
+
+ assertTrue(startupLatch.await(30, TimeUnit.SECONDS));
+ ReadOnlyKeyValueStore<Integer, Integer> store = kafkaStreams.store("store", QueryableStoreTypes.<Integer, Integer>keyValueStore());
+ assertThat(store.approximateNumEntries(), equalTo((long) numberOfKeys - offsetLimitDelta * 2 - offsetCheckpointed * 2));
+
+ assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS));
+ assertThat(numReceived.get(), equalTo(offsetLimitDelta * 2));
+ assertThat(store.approximateNumEntries(), equalTo((long) numberOfKeys - offsetCheckpointed * 2));
+ }
+
+ private void createStateForRestoration(final String changelogTopic) {
+ final Properties producerConfig = new Properties();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+ try (final KafkaProducer<Integer, Integer> producer =
+ new KafkaProducer<>(producerConfig, new IntegerSerializer(), new IntegerSerializer())) {
+
+ for (int i = 0; i < numberOfKeys; i++) {
+ producer.send(new ProducerRecord<>(changelogTopic, i, i));
+ }
+ }
+ }
+
+ private void setCommittedOffset(final String topic, final int limitDelta) {
+ final Properties consumerConfig = new Properties();
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APPID);
+ consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "commit-consumer");
+ consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+ consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+
+ final Consumer consumer = new KafkaConsumer(consumerConfig);
+ final List<TopicPartition> partitions = Arrays.asList(
+ new TopicPartition(topic, 0),
+ new TopicPartition(topic, 1));
+
+ consumer.assign(partitions);
+ consumer.seekToEnd(partitions);
+
+ for (TopicPartition partition : partitions) {
+ final long position = consumer.position(partition);
+ consumer.seek(partition, position - limitDelta);
+ }
+
+ consumer.commitSync();
+ consumer.close();
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.