You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/01/12 09:33:27 UTC

[3/3] camel git commit: CAMEL-10696 - Allow Kafka to resume from any offset based using a new "standard" API to hold its state

CAMEL-10696 - Allow Kafka to resume from any offset based using a new "standard" API to hold its state


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/92df581c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/92df581c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/92df581c

Branch: refs/heads/master
Commit: 92df581c37918a656339f77761f9631ef15fc3e6
Parents: 1053a43
Author: Antoine DESSAIGNE <an...@gmail.com>
Authored: Wed Jan 11 16:48:02 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jan 12 10:27:41 2017 +0100

----------------------------------------------------------------------
 .../apache/camel/impl/FileStateRepository.java  | 295 +++++++++++++++++++
 .../camel/impl/MemoryStateRepository.java       |  56 ++++
 .../org/apache/camel/spi/StateRepository.java   |  46 +++
 .../camel/impl/FileStateRepositoryTest.java     | 136 +++++++++
 .../camel/impl/MemoryStateRepositoryTest.java   |  48 +++
 .../src/main/docs/kafka-component.adoc          |   3 +-
 .../component/kafka/KafkaConfiguration.java     |  19 +-
 .../camel/component/kafka/KafkaConsumer.java    |  47 ++-
 .../KafkaConsumerOffsetRepositoryEmptyTest.java | 107 +++++++
 ...KafkaConsumerOffsetRepositoryResumeTest.java | 109 +++++++
 .../kafka/embedded/EmbeddedKafkaCluster.java    |   4 +
 11 files changed, 863 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/92df581c/camel-core/src/main/java/org/apache/camel/impl/FileStateRepository.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/FileStateRepository.java b/camel-core/src/main/java/org/apache/camel/impl/FileStateRepository.java
new file mode 100644
index 0000000..efbd068
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/FileStateRepository.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.spi.StateRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This {@link FileStateRepository} class is a file-based implementation of a {@link StateRepository}.
+ */
+@ManagedResource(description = "File based state repository")
+public class FileStateRepository extends ServiceSupport implements StateRepository<String, String> {
+    private static final Logger LOG = LoggerFactory.getLogger(FileStateRepository.class);
+    private static final String STORE_DELIMITER = "\n";
+    private static final String KEY_VALUE_DELIMITER = "=";
+    private final AtomicBoolean init = new AtomicBoolean();
+    private Map<String, String> cache;
+    private File fileStore;
+    private long maxFileStoreSize = 1024 * 1000L; // 1mb store file
+
+    public FileStateRepository() {
+        // default use a 1st level cache
+        this.cache = new HashMap<>();
+    }
+
+    public FileStateRepository(File fileStore, Map<String, String> cache) {
+        this.fileStore = fileStore;
+        this.cache = cache;
+    }
+
+    /**
+     * Creates a new file based repository using as 1st level cache
+     *
+     * @param fileStore the file store
+     */
+    public static FileStateRepository fileStateRepository(File fileStore) {
+        return fileStateRepository(fileStore, new HashMap<>());
+    }
+
+    /**
+     * Creates a new file based repository using a {@link HashMap} as 1st level cache.
+     *
+     * @param fileStore the file store
+     * @param maxFileStoreSize the max size in bytes for the fileStore file
+     */
+    public static FileStateRepository fileStateRepository(File fileStore, long maxFileStoreSize) {
+        FileStateRepository repository = new FileStateRepository(fileStore, new HashMap<>());
+        repository.setMaxFileStoreSize(maxFileStoreSize);
+        return repository;
+    }
+
+    /**
+     * Creates a new file based repository using the given {@link java.util.Map} as 1st level cache.
+     * <p/>
+     * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
+     * memory leak.
+     *
+     * @param store the file store
+     * @param cache the cache to use as 1st level cache
+     */
+    public static FileStateRepository fileStateRepository(File store, Map<String, String> cache) {
+        return new FileStateRepository(store, cache);
+    }
+
+    @Override
+    @ManagedOperation(description = "Adds the value of the given key to the store")
+    public void setState(String key, String value) {
+        if (key.contains(KEY_VALUE_DELIMITER)) {
+            throw new IllegalArgumentException("Key " + key + " contains illegal character: " + KEY_VALUE_DELIMITER);
+        }
+        if (key.contains(STORE_DELIMITER)) {
+            throw new IllegalArgumentException("Key " + key + " contains illegal character: <newline>");
+        }
+        if (value.contains(STORE_DELIMITER)) {
+            throw new IllegalArgumentException("Value " + value + " contains illegal character: <newline>");
+        }
+        synchronized (cache) {
+            cache.put(key, value);
+            if (fileStore.length() < maxFileStoreSize) {
+                // just append to store
+                appendToStore(key, value);
+            } else {
+                // trunk store and flush the cache
+                trunkStore();
+            }
+        }
+    }
+
+    @Override
+    @ManagedOperation(description = "Gets the value of the given key from store")
+    public String getState(String key) {
+        synchronized (cache) {
+            return cache.get(key);
+        }
+    }
+
+    /**
+     * Resets and clears the store to force it to reload from file
+     */
+    @ManagedOperation(description = "Reset and reloads the file store")
+    public synchronized void reset() throws IOException {
+        synchronized (cache) {
+            // trunk and clear, before we reload the store
+            trunkStore();
+            cache.clear();
+            loadStore();
+        }
+    }
+
+    /**
+     * Appends the {@code <key,value>} pair to the file store
+     *
+     * @param key the state key
+     */
+    private void appendToStore(String key, String value) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Appending {}={} to state filestore: {}", new Object[]{key, value, fileStore});
+        }
+        FileOutputStream fos = null;
+        try {
+            // create store parent directory if missing
+            File storeParentDirectory = fileStore.getParentFile();
+            if (storeParentDirectory != null && !storeParentDirectory.exists()) {
+                LOG.info("Parent directory of file store {} doesn't exist. Creating.", fileStore);
+                if (fileStore.getParentFile().mkdirs()) {
+                    LOG.info("Parent directory of file store {} successfully created.", fileStore);
+                } else {
+                    LOG.warn("Parent directory of file store {} cannot be created.", fileStore);
+                }
+            }
+            // create store if missing
+            if (!fileStore.exists()) {
+                FileUtil.createNewFile(fileStore);
+            }
+            // append to store
+            fos = new FileOutputStream(fileStore, true);
+            fos.write(key.getBytes());
+            fos.write(KEY_VALUE_DELIMITER.getBytes());
+            fos.write(value.getBytes());
+            fos.write(STORE_DELIMITER.getBytes());
+        } catch (IOException e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        } finally {
+            IOHelper.close(fos, "Appending to file state repository", LOG);
+        }
+    }
+
+    /**
+     * Trunks the file store when the max store size is hit by rewriting the 1st level cache
+     * to the file store.
+     */
+    protected void trunkStore() {
+        LOG.info("Trunking state filestore: {}", fileStore);
+        FileOutputStream fos = null;
+        try {
+            fos = new FileOutputStream(fileStore);
+            for (Map.Entry<String, String> entry : cache.entrySet()) {
+                fos.write(entry.getKey().getBytes());
+                fos.write(KEY_VALUE_DELIMITER.getBytes());
+                fos.write(entry.getValue().getBytes());
+                fos.write(STORE_DELIMITER.getBytes());
+            }
+        } catch (IOException e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        } finally {
+            IOHelper.close(fos, "Trunking file state repository", LOG);
+        }
+    }
+
+    /**
+     * Loads the given file store into the 1st level cache
+     */
+    protected void loadStore() throws IOException {
+        // auto create starting directory if needed
+        if (!fileStore.exists()) {
+            LOG.debug("Creating filestore: {}", fileStore);
+            File parent = fileStore.getParentFile();
+            if (parent != null) {
+                parent.mkdirs();
+            }
+            boolean created = FileUtil.createNewFile(fileStore);
+            if (!created) {
+                throw new IOException("Cannot create filestore: " + fileStore);
+            }
+        }
+
+        LOG.trace("Loading to 1st level cache from state filestore: {}", fileStore);
+
+        cache.clear();
+        Scanner scanner = null;
+        try {
+            scanner = new Scanner(fileStore);
+            scanner.useDelimiter(STORE_DELIMITER);
+            while (scanner.hasNextLine()) {
+                String line = scanner.nextLine();
+                int separatorIndex = line.indexOf(KEY_VALUE_DELIMITER);
+                String key = line.substring(0, separatorIndex);
+                String value = line.substring(separatorIndex + KEY_VALUE_DELIMITER.length());
+                cache.put(key, value);
+            }
+        } catch (IOException e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        } finally {
+            if (scanner != null) {
+                scanner.close();
+            }
+        }
+
+        LOG.debug("Loaded {} to the 1st level cache from state filestore: {}", cache.size(), fileStore);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notNull(fileStore, "fileStore", this);
+
+        // init store if not loaded before
+        if (init.compareAndSet(false, true)) {
+            loadStore();
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // reset will trunk and clear the cache
+        trunkStore();
+        cache.clear();
+        init.set(false);
+    }
+
+    public File getFileStore() {
+        return fileStore;
+    }
+
+    public void setFileStore(File fileStore) {
+        this.fileStore = fileStore;
+    }
+
+    @ManagedAttribute(description = "The file path for the store")
+    public String getFilePath() {
+        return fileStore.getPath();
+    }
+
+    public Map<String, String> getCache() {
+        return cache;
+    }
+
+    public void setCache(Map<String, String> cache) {
+        this.cache = cache;
+    }
+
+    @ManagedAttribute(description = "The maximum file size for the file store in bytes")
+    public long getMaxFileStoreSize() {
+        return maxFileStoreSize;
+    }
+
+    /**
+     * Sets the maximum file size for the file store in bytes.
+     * <p/>
+     * The default is 1mb.
+     */
+    @ManagedAttribute(description = "The maximum file size for the file store in bytes")
+    public void setMaxFileStoreSize(long maxFileStoreSize) {
+        this.maxFileStoreSize = maxFileStoreSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/92df581c/camel-core/src/main/java/org/apache/camel/impl/MemoryStateRepository.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/MemoryStateRepository.java b/camel-core/src/main/java/org/apache/camel/impl/MemoryStateRepository.java
new file mode 100644
index 0000000..816c6f0
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/MemoryStateRepository.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.spi.StateRepository;
+import org.apache.camel.support.ServiceSupport;
+
+/**
+ * This {@link MemoryStateRepository} class is a memory-based implementation of a {@link StateRepository}.
+ */
+@ManagedResource(description = "Memory based state repository")
+public class MemoryStateRepository extends ServiceSupport implements StateRepository<String, String> {
+    private ConcurrentMap<String, String> cache = new ConcurrentHashMap<>();
+
+    @Override
+    @ManagedOperation(description = "Adds the value of the given key to the store")
+    public void setState(String key, String value) {
+        cache.put(key, value);
+    }
+
+    @Override
+    @ManagedOperation(description = "Gets the value of the given key from store")
+    public String getState(String key) {
+        return cache.get(key);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        cache = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        cache.clear();
+        cache = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/92df581c/camel-core/src/main/java/org/apache/camel/spi/StateRepository.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/StateRepository.java b/camel-core/src/main/java/org/apache/camel/spi/StateRepository.java
new file mode 100644
index 0000000..5b9cc10
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/spi/StateRepository.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Service;
+
+/**
+ * This {@link StateRepository} holds a set of key/value pairs for defining a particular <em>state</em> of a component. For instance it can be a set of indexes.
+ * <p/>
+ * An {@link IdempotentRepository} behaves more or less like a {@code Set} whereas this {@link StateRepository} behaves like a {@code Map}.
+ *
+ * @param <K> Key type
+ * @param <V> Value type
+ */
+public interface StateRepository<K, V> extends Service {
+    /**
+     * Sets the state value for the given key.
+     *
+     * @param key State key
+     * @param value State value
+     */
+    void setState(K key, V value);
+
+    /**
+     * Gets the state value for the given key. It returns {@code null} if the key is unknown.
+     *
+     * @param key State key
+     * @return State value or null the key is unknown
+     */
+    V getState(K key);
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/92df581c/camel-core/src/test/java/org/apache/camel/impl/FileStateRepositoryTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/FileStateRepositoryTest.java b/camel-core/src/test/java/org/apache/camel/impl/FileStateRepositoryTest.java
new file mode 100644
index 0000000..b5885a7
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/FileStateRepositoryTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.io.File;
+import java.nio.file.Files;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.camel.impl.FileStateRepository.fileStateRepository;
+import static org.junit.Assert.*;
+
+public class FileStateRepositoryTest {
+    private final File repositoryStore = new File("target/file-state-repository.dat");
+
+    @Before
+    public void setUp() throws Exception {
+        // Remove the repository file if needed
+        Files.deleteIfExists(repositoryStore.toPath());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldPreventUsingDelimiterInKey() throws Exception {
+        // Given a FileStateRepository
+        FileStateRepository repository = fileStateRepository(repositoryStore);
+
+        // When trying to use the key delimiter in a key
+        repository.setState("=", "value");
+
+        // Then an exception is thrown
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldPreventUsingNewLineInKey() throws Exception {
+        // Given a FileStateRepository
+        FileStateRepository repository = createRepository();
+
+        // When trying to use new line in a key
+        repository.setState("\n", "value");
+
+        // Then an exception is thrown
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldPreventUsingNewLineInValue() throws Exception {
+        // Given a FileStateRepository
+        FileStateRepository repository = createRepository();
+
+        // When trying to use new line in a key
+        repository.setState("key", "\n");
+
+        // Then an exception is thrown
+    }
+
+    @Test
+    public void shouldSaveState() throws Exception {
+        // Given an empty FileStateRepository
+        FileStateRepository repository = createRepository();
+
+        // When saving a state
+        repository.setState("key", "value");
+
+        // Then it should be retrieved afterwards
+        assertEquals("value", repository.getState("key"));
+    }
+
+    @Test
+    public void shouldUpdateState() throws Exception {
+        // Given a FileStateRepository with a state in it
+        FileStateRepository repository = createRepository();
+        repository.setState("key", "value");
+
+        // When updating the state
+        repository.setState("key", "value2");
+
+        // Then the new value should be retrieved afterwards
+        assertEquals("value2", repository.getState("key"));
+    }
+
+    @Test
+    public void shouldSynchronizeInFile() throws Exception {
+        // Given a FileStateRepository with some content
+        FileStateRepository repository = createRepository();
+        repository.setState("key1", "value1");
+        repository.setState("key2", "value2");
+        repository.setState("key3", "value3");
+
+        // When creating a new FileStateRepository with same file
+        FileStateRepository newRepository = createRepository();
+
+        // Then the new one should have the same content
+        assertEquals("value1", newRepository.getState("key1"));
+        assertEquals("value2", newRepository.getState("key2"));
+        assertEquals("value3", newRepository.getState("key3"));
+    }
+
+    @Test
+    public void shouldPreventRepositoryFileFromGrowingInfinitely() throws Exception {
+        // Given a FileStateRepository with a maximum size of 100 bytes
+        FileStateRepository repository = createRepository();
+        repository.setMaxFileStoreSize(100);
+
+        // And content just to this limit (10x10 bytes)
+        for (int i = 0; i < 10; i++) {
+            repository.setState("key", "xxxxx".replace('x', (char) ('0' + i)));
+        }
+        long previousSize = repositoryStore.length();
+
+        // When updating the state
+        repository.setState("key", "value");
+
+        // Then it should be truncated
+        assertTrue(repositoryStore.length() < previousSize);
+    }
+
+    private FileStateRepository createRepository() throws Exception {
+        FileStateRepository repository = fileStateRepository(repositoryStore);
+        repository.start();
+        return repository;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/92df581c/camel-core/src/test/java/org/apache/camel/impl/MemoryStateRepositoryTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/MemoryStateRepositoryTest.java b/camel-core/src/test/java/org/apache/camel/impl/MemoryStateRepositoryTest.java
new file mode 100644
index 0000000..52e2f42
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/MemoryStateRepositoryTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MemoryStateRepositoryTest {
+    @Test
+    public void shouldSaveState() throws Exception {
+        // Given an empty FileStateRepository
+        MemoryStateRepository repository = new MemoryStateRepository();
+
+        // When saving a state
+        repository.setState("key", "value");
+
+        // Then it should be retrieved afterwards
+        assertEquals("value", repository.getState("key"));
+    }
+
+    @Test
+    public void shouldUpdateState() throws Exception {
+        // Given a FileStateRepository with a state in it
+        MemoryStateRepository repository = new MemoryStateRepository();
+        repository.setState("key", "value");
+
+        // When updating the state
+        repository.setState("key", "value2");
+
+        // Then the new value should be retrieved afterwards
+        assertEquals("value2", repository.getState("key"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/92df581c/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 9abc23c..ea1a199 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -49,7 +49,7 @@ The Kafka component supports 1 options which are listed below.
 
 
 // endpoint options: START
-The Kafka component supports 77 endpoint options which are listed below:
+The Kafka component supports 78 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -75,6 +75,7 @@ The Kafka component supports 77 endpoint options which are listed below:
 | keyDeserializer | consumer | org.apache.kafka.common.serialization.StringDeserializer | String | Deserializer class for key that implements the Deserializer interface.
 | maxPartitionFetchBytes | consumer | 1048576 | Integer | The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be partitions max.partition.fetch.bytes. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens the consumer can get stuck trying to fetch a large message on a certain partition.
 | maxPollRecords | consumer | 500 | Integer | The maximum number of records returned in a single call to poll()
+| offsetRepository | consumer |  | String> | The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit.
 | partitionAssignor | consumer | org.apache.kafka.clients.consumer.RangeAssignor | String | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used
 | pollTimeoutMs | consumer | 5000 | Long | The timeout used when polling the KafkaConsumer.
 | seekToBeginning | consumer | false | boolean | If the option is true then KafkaConsumer will read from beginning on startup.

http://git-wip-us.apache.org/repos/asf/camel/blob/92df581c/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 98758c5..214fd2f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.StateRepository;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
@@ -102,6 +103,8 @@ public class KafkaConfiguration {
     private String consumerId;
     @UriParam(label = "consumer", defaultValue = "true")
     private Boolean autoCommitEnable = true;
+    @UriParam(label = "consumer")
+    private StateRepository<String, String> offsetRepository;
 
     //Producer configuration properties
     @UriParam(label = "producer", defaultValue = "100")
@@ -461,7 +464,7 @@ public class KafkaConfiguration {
     }
 
     public Boolean isAutoCommitEnable() {
-        return autoCommitEnable;
+        return offsetRepository == null ? autoCommitEnable : false;
     }
 
     /**
@@ -472,6 +475,18 @@ public class KafkaConfiguration {
         this.autoCommitEnable = autoCommitEnable;
     }
 
+    public StateRepository<String, String> getOffsetRepository() {
+        return offsetRepository;
+    }
+
+    /**
+     * The offset repository to use in order to locally store the offset of each partition of the topic.
+     * Defining one will disable the autocommit.
+     */
+    public void setOffsetRepository(StateRepository<String, String> offsetRepository) {
+        this.offsetRepository = offsetRepository;
+    }
+
     public Integer getAutoCommitIntervalMs() {
         return autoCommitIntervalMs;
     }
@@ -804,7 +819,7 @@ public class KafkaConfiguration {
     }
 
     /**
-     * The Simple Authentication and Security Layer (SASL) Mechanism used. 
+     * The Simple Authentication and Security Layer (SASL) Mechanism used.
      * For the valid values see <a href="http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml">http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml</a>
      */
     public void setSaslMechanism(String saslMechanism) {

http://git-wip-us.apache.org/repos/asf/camel/blob/92df581c/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 66c4335..31a4180 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -20,11 +20,13 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.spi.StateRepository;
 import org.apache.camel.util.IOHelper;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -104,7 +106,7 @@ public class KafkaConsumer extends DefaultConsumer {
             this.topicName = topicName;
             this.threadId = topicName + "-" + "Thread " + id;
             this.kafkaProps = kafkaProps;
-            
+
             ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
             try {
                 //Fix for running camel-kafka in OSGI see KAFKA-3218
@@ -122,7 +124,30 @@ public class KafkaConsumer extends DefaultConsumer {
                 LOG.info("Subscribing {} to topic {}", threadId, topicName);
                 consumer.subscribe(Arrays.asList(topicName.split(",")));
 
-                if (endpoint.getConfiguration().isSeekToBeginning()) {
+                StateRepository<String, String> offsetRepository = endpoint.getConfiguration().getOffsetRepository();
+                if (offsetRepository != null) {
+                    // This poll to ensures we have an assigned partition otherwise seek won't work
+                    ConsumerRecords poll = consumer.poll(100);
+
+                    for (TopicPartition topicPartition : (Set<TopicPartition>) consumer.assignment()) {
+                        String offsetState = offsetRepository.getState(serializeOffsetKey(topicPartition));
+                        if (offsetState != null && !offsetState.isEmpty()) {
+                            // The state contains the last read offset so you need to seek from the next one
+                            long offset = deserializeOffsetValue(offsetState) + 1;
+                            LOG.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset);
+                            consumer.seek(topicPartition, offset);
+                        } else {
+                            // If the init poll has returned some data of a currently unknown topic/partition in the state
+                            // then resume from their offset in order to avoid losing data
+                            List<ConsumerRecord<Object, Object>> partitionRecords = poll.records(topicPartition);
+                            if (!partitionRecords.isEmpty()) {
+                                long offset = partitionRecords.get(0).offset();
+                                LOG.debug("Resuming partition {} from offset {}", topicPartition.partition(), offset);
+                                consumer.seek(topicPartition, offset);
+                            }
+                        }
+                    }
+                } else if (endpoint.getConfiguration().isSeekToBeginning()) {
                     LOG.debug("{} is seeking to the beginning on topic {}", threadId, topicName);
                     // This poll to ensures we have an assigned partition otherwise seek won't work
                     consumer.poll(100);
@@ -144,9 +169,12 @@ public class KafkaConsumer extends DefaultConsumer {
                                 getExceptionHandler().handleException("Error during processing", exchange, e);
                             }
                         }
-                        // if autocommit is false
-                        if (endpoint.getConfiguration().isAutoCommitEnable() != null
+                        if (offsetRepository != null) {
+                            long partitionLastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+                            offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(partitionLastOffset));
+                        } else if (endpoint.getConfiguration().isAutoCommitEnable() != null
                             && !endpoint.getConfiguration().isAutoCommitEnable()) {
+                            // if autocommit is false
                             long partitionLastoffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                             consumer.commitSync(Collections.singletonMap(
                                 partition, new OffsetAndMetadata(partitionLastoffset + 1)));
@@ -169,5 +197,16 @@ public class KafkaConsumer extends DefaultConsumer {
         }
     }
 
+    protected String serializeOffsetKey(TopicPartition topicPartition) {
+        return topicPartition.topic() + '/' + topicPartition.partition();
+    }
+
+    protected String serializeOffsetValue(long offset) {
+        return String.valueOf(offset);
+    }
+
+    protected long deserializeOffsetValue(String offset) {
+        return Long.parseLong(offset);
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/92df581c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
new file mode 100644
index 0000000..620504f
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import java.util.Properties;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.impl.MemoryStateRepository;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.After;
+import org.junit.Test;
+
+public class KafkaConsumerOffsetRepositoryEmptyTest extends BaseEmbeddedKafkaTest {
+    private static final String TOPIC = "offset-initialize";
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint result;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
+
+    private MemoryStateRepository stateRepository;
+
+    @Override
+    protected void doPreSetup() throws Exception {
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaPort());
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
+        props.put(ProducerConfig.ACKS_CONFIG, "1");
+
+        producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
+
+        // Create the topic with 2 partitions + send 10 messages (5 in each partitions)
+        embeddedKafkaCluster.createTopic(TOPIC, 2);
+        for (int i = 0; i < 10; i++) {
+            producer.send(new ProducerRecord<>(TOPIC, i % 2, "key", "message-" + i));
+        }
+
+        stateRepository = new MemoryStateRepository();
+    }
+
+    @After
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        stateRepository = null;
+    }
+
+    /**
+     * Given an empty offset repository
+     * When consuming with this repository
+     * Then we consume according to the {@code autoOffsetReset} setting
+     */
+    @Test
+    public void shouldStartFromBeginningWithEmptyOffsetRepository() throws InterruptedException {
+        result.expectedMessageCount(10);
+        result.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4", "message-5", "message-6", "message-7",
+                                                "message-8", "message-9");
+
+        result.assertIsSatisfied(3000);
+
+        assertEquals("partition-0", "4", stateRepository.getState(TOPIC + "/0"));
+        assertEquals("partition-1", "4", stateRepository.getState(TOPIC + "/1"));
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+        registry.bind("offset", stateRepository);
+        return registry;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC +  //
+                             "&groupId=A" +                            //
+                             "&autoOffsetReset=earliest" +             // Ask to start from the beginning if we have unknown offset
+                             "&consumersCount=2" +                     // We have 2 partitions, we want 1 consumer per partition
+                             "&offsetRepository=#offset")              // Keep the offset in our repository
+                        .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/92df581c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
new file mode 100644
index 0000000..7e71c3e
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import java.util.Properties;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.impl.MemoryStateRepository;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.After;
+import org.junit.Test;
+
+public class KafkaConsumerOffsetRepositoryResumeTest extends BaseEmbeddedKafkaTest {
+    private static final String TOPIC = "offset-resume";
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint result;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
+
+    private MemoryStateRepository stateRepository;
+
+    @Override
+    protected void doPreSetup() throws Exception {
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaPort());
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
+        props.put(ProducerConfig.ACKS_CONFIG, "1");
+
+        producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
+
+        // Create the topic with 2 partitions + send 10 messages (5 in each partitions)
+        embeddedKafkaCluster.createTopic(TOPIC, 2);
+        for (int i = 0; i < 10; i++) {
+            producer.send(new ProducerRecord<>(TOPIC, i % 2, "key", "message-" + i));
+        }
+
+        // Create the state repository with some initial offsets
+        stateRepository = new MemoryStateRepository();
+        stateRepository.setState(TOPIC + "/0", "2");
+        stateRepository.setState(TOPIC + "/1", "3");
+    }
+
+    @After
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        stateRepository = null;
+    }
+
+    /**
+     * Given an offset repository with values
+     * When consuming with this repository
+     * Then we're consuming from the saved offsets
+     */
+    @Test
+    public void shouldResumeFromAnyParticularOffset() throws InterruptedException {
+        result.expectedMessageCount(3);
+        result.expectedBodiesReceivedInAnyOrder("message-6", "message-8", "message-9");
+
+        result.assertIsSatisfied(3000);
+
+        assertEquals("partition-0", "4", stateRepository.getState(TOPIC + "/0"));
+        assertEquals("partition-1", "4", stateRepository.getState(TOPIC + "/1"));
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+        registry.bind("offset", stateRepository);
+        return registry;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC +  //
+                             "&groupId=A" +                            //
+                             "&autoOffsetReset=earliest" +             // Ask to start from the beginning if we have unknown offset
+                             "&consumersCount=2" +                     // We have 2 partitions, we want 1 consumer per partition
+                             "&offsetRepository=#offset")              // Keep the offset in our repository
+                        .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/92df581c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
index 7162521..c8c74e7 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
@@ -67,6 +67,10 @@ public class EmbeddedKafkaCluster {
         return null;
     }
 
+    public void createTopic(String topic, int partitionCount) {
+        AdminUtils.createTopic(getZkUtils(), topic, partitionCount, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
+    }
+
     public void createTopics(String... topics) {
         for (String topic : topics) {
             AdminUtils.createTopic(getZkUtils(), topic, 2, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);