You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/15 05:22:02 UTC
[kafka] branch 0.11.0 updated: KAFKA-7021: checkpoint offsets from
committed (#5232)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.11.0 by this push:
new 8852900 KAFKA-7021: checkpoint offsets from committed (#5232)
8852900 is described below
commit 88529006b425bbc683da9fb8132fe6af823c953f
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Jun 14 22:21:49 2018 -0700
KAFKA-7021: checkpoint offsets from committed (#5232)
This is a cherry-pick PR from #5207
1. add the committed offsets to checkpointable offset map.
2. add the restoration integration test for the source KTable case.
---
.../streams/processor/internals/AbstractTask.java | 4 +-
.../processor/internals/StateDirectory.java | 2 +-
.../streams/processor/internals/StreamTask.java | 15 +-
.../integration/RestoreIntegrationTest.java | 193 +++++++++++++++++++++
4 files changed, 208 insertions(+), 6 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 364fbe8..7f6ac7c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -163,7 +163,7 @@ public abstract class AbstractTask {
return sb.toString();
}
- protected Map<TopicPartition, Long> recordCollectorOffsets() {
+ protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
return Collections.emptyMap();
}
@@ -234,7 +234,7 @@ public abstract class AbstractTask {
ProcessorStateException exception = null;
log.trace("{} Closing state manager", logPrefix);
try {
- stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
+ stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets() : null);
} catch (final ProcessorStateException e) {
exception = e;
} finally {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 8d46da1..a18175a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -81,7 +81,7 @@ public class StateDirectory {
* @param taskId
* @return directory for the {@link TaskId}
*/
- File directoryForTask(final TaskId taskId) {
+ public File directoryForTask(final TaskId taskId) {
final File taskDir = new File(stateDir, taskId.toString());
if (!taskDir.exists() && !taskDir.mkdir()) {
throw new ProcessorStateException(String.format("task directory [%s] doesn't exist and couldn't be created",
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 4b24aab..86855f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -286,7 +286,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
public void run() {
flushState();
if (!eosEnabled) {
- stateMgr.checkpoint(recordCollectorOffsets());
+ stateMgr.checkpoint(activeTaskCheckpointableOffsets());
}
commitOffsets(startNewTransaction);
}
@@ -297,8 +297,17 @@ public class StreamTask extends AbstractTask implements Punctuator {
}
@Override
- protected Map<TopicPartition, Long> recordCollectorOffsets() {
- return recordCollector.offsets();
+
+ protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
+ // put both producer acked offsets and consumer committed offsets as checkpointable offsets
+ final Map<TopicPartition, Long> checkpointableOffsets = recordCollector.offsets();
+ for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
+ if (!checkpointableOffsets.containsKey(entry.getKey())) {
+ checkpointableOffsets.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ return checkpointableOffsets;
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
new file mode 100644
index 0000000..54c2bd7
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertTrue;
+
+@Category({IntegrationTest.class})
+public class RestoreIntegrationTest {
+ private static final int NUM_BROKERS = 1;
+
+ private static final String APPID = "restore-test";
+
+ @ClassRule
+ public static final EmbeddedKafkaCluster CLUSTER =
+ new EmbeddedKafkaCluster(NUM_BROKERS);
+ private static final String INPUT_STREAM = "input-stream";
+ private static final String INPUT_STREAM_2 = "input-stream-2";
+ private final int numberOfKeys = 10000;
+ private KafkaStreams kafkaStreams;
+
+ @BeforeClass
+ public static void createTopics() throws InterruptedException {
+ CLUSTER.createTopic(INPUT_STREAM, 2, 1);
+ CLUSTER.createTopic(INPUT_STREAM_2, 2, 1);
+ CLUSTER.createTopic(APPID + "-store-changelog", 2, 1);
+ }
+
+ private Properties props(final String applicationId) {
+ Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return streamsConfiguration;
+ }
+
+ @After
+ public void shutdown() {
+ if (kafkaStreams != null) {
+ kafkaStreams.close(30, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void shouldRestoreStateFromSourceTopic() throws Exception {
+ final AtomicInteger numReceived = new AtomicInteger(0);
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ final Properties props = props(APPID);
+
+ // restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on each of the two partitions
+ final int offsetLimitDelta = 1000;
+ final int offsetCheckpointed = 1000;
+ createStateForRestoration(INPUT_STREAM);
+ setCommittedOffset(INPUT_STREAM, offsetLimitDelta);
+
+ final StateDirectory stateDirectory = new StateDirectory(APPID, props.getProperty(StreamsConfig.STATE_DIR_CONFIG), new MockTime());
+ new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 0)), ".checkpoint"))
+ .write(Collections.singletonMap(new TopicPartition(INPUT_STREAM, 0), (long) offsetCheckpointed));
+ new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 1)), ".checkpoint"))
+ .write(Collections.singletonMap(new TopicPartition(INPUT_STREAM, 1), (long) offsetCheckpointed));
+
+ final CountDownLatch startupLatch = new CountDownLatch(1);
+ final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+ builder.table(Serdes.Integer(), Serdes.Integer(), INPUT_STREAM, "store")
+ .toStream()
+ .foreach(new ForeachAction<Integer, Integer>() {
+ @Override
+ public void apply(final Integer key, final Integer value) {
+ if (numReceived.incrementAndGet() == 2 * offsetLimitDelta)
+ shutdownLatch.countDown();
+ }
+ });
+
+ kafkaStreams = new KafkaStreams(builder, props);
+ kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
+ @Override
+ public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
+ if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
+ startupLatch.countDown();
+ }
+ }
+ });
+
+ kafkaStreams.start();
+
+ assertTrue(startupLatch.await(30, TimeUnit.SECONDS));
+ ReadOnlyKeyValueStore<Integer, Integer> store = kafkaStreams.store("store", QueryableStoreTypes.<Integer, Integer>keyValueStore());
+ assertThat(store.approximateNumEntries(), equalTo((long) numberOfKeys - offsetLimitDelta * 2 - offsetCheckpointed * 2));
+
+ assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS));
+ assertThat(numReceived.get(), equalTo(offsetLimitDelta * 2));
+ assertThat(store.approximateNumEntries(), equalTo((long) numberOfKeys - offsetCheckpointed * 2));
+ }
+
+ private void createStateForRestoration(final String changelogTopic) {
+ final Properties producerConfig = new Properties();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+ try (final KafkaProducer<Integer, Integer> producer =
+ new KafkaProducer<>(producerConfig, new IntegerSerializer(), new IntegerSerializer())) {
+
+ for (int i = 0; i < numberOfKeys; i++) {
+ producer.send(new ProducerRecord<>(changelogTopic, i, i));
+ }
+ }
+ }
+
+ private void setCommittedOffset(final String topic, final int limitDelta) {
+ final Properties consumerConfig = new Properties();
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APPID);
+ consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "commit-consumer");
+ consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+ consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+
+ final Consumer consumer = new KafkaConsumer(consumerConfig);
+ final List<TopicPartition> partitions = Arrays.asList(
+ new TopicPartition(topic, 0),
+ new TopicPartition(topic, 1));
+
+ consumer.assign(partitions);
+ consumer.seekToEnd(partitions);
+
+ for (TopicPartition partition : partitions) {
+ final long position = consumer.position(partition);
+ consumer.seek(partition, position - limitDelta);
+ }
+
+ consumer.commitSync();
+ consumer.close();
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.