You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sh...@apache.org on 2016/03/30 10:11:41 UTC

ignite git commit: IGNITE-2730: Ignite Events Source Streaming to Kafka. - Fixes #560.

Repository: ignite
Updated Branches:
  refs/heads/master a1a6bf25a -> 12c707c81


IGNITE-2730: Ignite Events Source Streaming to Kafka. - Fixes #560.

Signed-off-by: shtykh_roman <rs...@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/12c707c8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/12c707c8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/12c707c8

Branch: refs/heads/master
Commit: 12c707c81792e20c10cece742512412a7f24dcfb
Parents: a1a6bf2
Author: shtykh_roman <rs...@yahoo.com>
Authored: Wed Mar 30 17:09:33 2016 +0900
Committer: shtykh_roman <rs...@yahoo.com>
Committed: Wed Mar 30 17:09:33 2016 +0900

----------------------------------------------------------------------
 modules/kafka/README.txt                        |  81 ++++-
 .../kafka/connect/IgniteSourceConnector.java    |  81 +++++
 .../kafka/connect/IgniteSourceConstants.java    |  44 +++
 .../stream/kafka/connect/IgniteSourceTask.java  | 328 +++++++++++++++++++
 .../serialization/CacheEventConverter.java      |  66 ++++
 .../serialization/CacheEventDeserializer.java   |  54 +++
 .../serialization/CacheEventSerializer.java     |  54 +++
 .../kafka/IgniteKafkaStreamerSelfTestSuite.java |   4 +-
 .../kafka/KafkaIgniteStreamerSelfTest.java      |  11 +-
 .../ignite/stream/kafka/TestKafkaBroker.java    |  27 +-
 .../kafka/connect/IgniteSinkConnectorTest.java  |  13 +-
 .../connect/IgniteSourceConnectorMock.java      |  31 ++
 .../connect/IgniteSourceConnectorTest.java      | 327 ++++++++++++++++++
 .../kafka/connect/IgniteSourceTaskMock.java     |  31 ++
 .../kafka/connect/TestCacheEventFilter.java     |  31 ++
 .../kafka/src/test/resources/example-ignite.xml |   4 +-
 16 files changed, 1156 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/README.txt
----------------------------------------------------------------------
diff --git a/modules/kafka/README.txt b/modules/kafka/README.txt
index f4e56bd..3a1a5aa 100644
--- a/modules/kafka/README.txt
+++ b/modules/kafka/README.txt
@@ -33,7 +33,7 @@ interested in):
 </project>
 
 
-## Streaming Data via Kafka Connect
+## Streaming Data to Ignite via Kafka Connect
 
 Sink Connector will help you export data from Kafka to Ignite cache. It polls data from Kafka topics and writes it to the user-specified cache.
 For more information on Kafka Connect, see [Kafka Documentation](http://kafka.apache.org/documentation.html#connect).
@@ -46,8 +46,8 @@ as described in the following subsection.
 1. Put the following jar files on Kafka's classpath
 - ignite-kafka-connect-x.x.x-SNAPSHOT.jar
 - ignite-core-x.x.x-SNAPSHOT.jar
+- ignite-spring-x.x.x-SNAPSHOT.jar
 - cache-api-1.0.0.jar
-- ignite-spring-1.5.0-SNAPSHOT.jar
 - spring-aop-4.1.0.RELEASE.jar
 - spring-beans-4.1.0.RELEASE.jar
 - spring-context-4.1.0.RELEASE.jar
@@ -127,3 +127,80 @@ k1,v1
 ```
 http://node1:8080/ignite?cmd=size&cacheName=cache1
 ```
+
+## Streaming Cache Event Data to Kafka via Kafka Connect
+
+Source connector enables listening to Ignite cache events and, upon filtering, stream them to Kafka.
+
+Connector can be found in 'optional/ignite-kafka.' It and its dependencies have to be on the classpath of a Kafka running instance,
+as described in the following subsection.
+
+### Setting up and Running
+
+1. Put the following jar files on Kafka's classpath
+- ignite-kafka-connect-x.x.x-SNAPSHOT.jar
+- ignite-core-x.x.x-SNAPSHOT.jar
+- cache-api-1.0.0.jar
+- ignite-spring-1.5.0-SNAPSHOT.jar
+- spring-aop-4.1.0.RELEASE.jar
+- spring-beans-4.1.0.RELEASE.jar
+- spring-context-4.1.0.RELEASE.jar
+- spring-core-4.1.0.RELEASE.jar
+- spring-expression-4.1.0.RELEASE.jar
+- commons-logging-1.1.1.jar
+
+2. Prepare worker configurations, e.g.,
+```
+bootstrap.servers=localhost:9092
+
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter
+key.converter.schemas.enable=false
+value.converter.schemas.enable=false
+
+internal.key.converter=org.apache.kafka.connect.storage.StringConverter
+internal.value.converter=org.apache.kafka.connect.storage.StringConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+
+offset.storage.file.filename=/tmp/connect.offsets
+offset.flush.interval.ms=10000
+```
+
+Note that the current implementation ignores key and schema of Kafka Connect, and stores marshalled cache events
+using org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter.
+
+3. Prepare connector configurations, e.g.,
+```
+# connector
+name=ignite-src-connector
+connector.class=IgniteSourceConnector
+tasks.max=2
+
+# cache
+topicNames=testTopic1,testTopic2
+cacheEvts=put,remove
+## if you decide to filter remotely (recommended)
+cacheFilterCls=MyFilter
+cacheName=cache1
+igniteCfg=/some-path/ignite.xml
+```
+where 'cacheName' is the name of the cache you specify in '/some-path/ignite.xml' and the data from 'testTopic1,testTopic2'
+will be pulled and stored. Also consider using 'evtBufferSize' and 'evtBatchSize' for tuning the internal queue
+used to safely transfer data from Ignite cache to Kafka.
+
+The following cache events can be specified in the connector configurations:
+- CREATED
+- DESTROYED
+- PUT
+- READ
+- REMOVED
+- LOCKED
+- UNLOCKED
+- SWAPPED
+- UNSWAPPED
+- EXPIRED
+
+For a simple cache configuration file example, see example-ignite.xml in tests.
+
+4. Start the connector, as described in [Kafka Documentation](http://kafka.apache.org/documentation.html#connect).

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java
new file mode 100644
index 0000000..59e2ed0
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceConnector;
+
+/**
+ * Source connector to manage source tasks that listens to registered Ignite grid events and forward them to Kafka.
+ *
+ * Note that only cache events are enabled for streaming.
+ */
+public class IgniteSourceConnector extends SourceConnector {
+    /** Source properties. */
+    private Map<String, String> configProps;
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start(Map<String, String> props) {
+        try {
+            A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_NAME), "cache name");
+            A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_CFG_PATH), "path to cache config file");
+            A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_EVENTS), "Registered cache events");
+            A.notNullOrEmpty(props.get(IgniteSourceConstants.TOPIC_NAMES), "Kafka topics");
+        }
+        catch (IllegalArgumentException e) {
+            throw new ConnectException("Cannot start IgniteSourceConnector due to configuration error", e);
+        }
+
+        configProps = props;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Class<? extends Task> taskClass() {
+        return IgniteSourceTask.class;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Map<String, String>> taskConfigs(int maxTasks) {
+        List<Map<String, String>> taskConfigs = new ArrayList<>();
+        Map<String, String> taskProps = new HashMap<>();
+
+        taskProps.putAll(configProps);
+
+        for (int i = 0; i < maxTasks; i++)
+            taskConfigs.add(taskProps);
+
+        return taskConfigs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConstants.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConstants.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConstants.java
new file mode 100644
index 0000000..7d590e5
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConstants.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Sink configuration strings.
+ */
+public class IgniteSourceConstants {
+    /** Ignite configuration file path. */
+    public static final String CACHE_CFG_PATH = "igniteCfg";
+
+    /** Cache name. */
+    public static final String CACHE_NAME = "cacheName";
+
+    /** Events to be listened to. Names corresponds to {@link IgniteSourceTask.CacheEvt}. */
+    public static final String CACHE_EVENTS = "cacheEvts";
+
+    /** Internal buffer size. */
+    public static final String INTL_BUF_SIZE = "evtBufferSize";
+
+    /** Size of one chunk drained from the internal buffer. */
+    public static final String INTL_BATCH_SIZE = "evtBatchSize";
+
+    /** User-defined filter class. */
+    public static final String CACHE_FILTER_CLASS = "cacheFilterCls";
+
+    /** Kafka topic. */
+    public static final String TOPIC_NAMES = "topicNames";
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
new file mode 100644
index 0000000..9eb183c
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Task to consume remote cluster cache events from the grid and inject them into Kafka.
+ * <p>
+ * Note that a task will create a bounded queue in the grid for more reliable data transfer.
+ * Queue size can be changed by {@link IgniteSourceConstants#INTL_BUF_SIZE}.
+ */
+public class IgniteSourceTask extends SourceTask {
+    /** Logger. */
+    private static final Logger log = LoggerFactory.getLogger(IgniteSourceTask.class);
+
+    /** Event buffer size. */
+    private static int evtBufSize = 100000;
+
+    /** Event buffer. */
+    private static BlockingQueue<CacheEvent> evtBuf = new LinkedBlockingQueue<>(evtBufSize);
+
+    /** Max number of events taken from the buffer at once. */
+    private static int evtBatchSize = 100;
+
+    /** Flag for stopped state. */
+    private static volatile boolean stopped = true;
+
+    /** Ignite grid configuration file. */
+    private static String igniteConfigFile;
+
+    /** Cache name. */
+    private static String cacheName;
+
+    /** Remote Listener id. */
+    private static UUID rmtLsnrId;
+
+    /** Local listener. */
+    private static TaskLocalListener locLsnr = new TaskLocalListener();
+
+    /** Remote filter. */
+    private static TaskRemoteFilter rmtLsnr = new TaskRemoteFilter();
+
+    /** User-defined filter. */
+    private static IgnitePredicate<CacheEvent> filter;
+
+    /** Topic. */
+    private static String topics[];
+
+    /** Offset. */
+    private static final Map<String, Long> offset = Collections.singletonMap("offset", 0L);
+
+    /** Partition. */
+    private static final Map<String, String> srcPartition = Collections.singletonMap("cache", null);
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return new IgniteSinkConnector().version();
+    }
+
+    /**
+     * Filtering is done remotely. Local listener buffers data for injection into Kafka.
+     *
+     * @param props Task properties.
+     */
+    @Override public void start(Map<String, String> props) {
+        // Each task has the same parameters -- avoid setting more than once.
+        if (cacheName != null)
+            return;
+
+        cacheName = props.get(IgniteSourceConstants.CACHE_NAME);
+        igniteConfigFile = props.get(IgniteSourceConstants.CACHE_CFG_PATH);
+        topics = props.get(IgniteSourceConstants.TOPIC_NAMES).split("\\s*,\\s*");
+
+        if (props.containsKey(IgniteSourceConstants.INTL_BUF_SIZE))
+            evtBufSize = Integer.parseInt(props.get(IgniteSourceConstants.INTL_BUF_SIZE));
+
+        if (props.containsKey(IgniteSourceConstants.INTL_BATCH_SIZE))
+            evtBatchSize = Integer.parseInt(props.get(IgniteSourceConstants.INTL_BATCH_SIZE));
+
+        if (props.containsKey(IgniteSourceConstants.CACHE_FILTER_CLASS)) {
+            String filterCls = props.get(IgniteSourceConstants.CACHE_FILTER_CLASS);
+            if (filterCls != null && !filterCls.isEmpty()) {
+                try {
+                    Class<? extends IgnitePredicate<CacheEvent>> clazz =
+                        (Class<? extends IgnitePredicate<CacheEvent>>)Class.forName(filterCls);
+
+                    filter = clazz.newInstance();
+                }
+                catch (Exception e) {
+                    log.error("Failed to instantiate the provided filter! " +
+                        "User-enabled filtering is ignored!", e);
+                }
+            }
+        }
+
+        try {
+            int[] evts = cacheEvents(props.get(IgniteSourceConstants.CACHE_EVENTS));
+
+            rmtLsnrId = IgniteGrid.getIgnite().events(IgniteGrid.getIgnite().cluster().forCacheNodes(cacheName))
+                .remoteListen(locLsnr, rmtLsnr, evts);
+        }
+        catch (Exception e) {
+            log.error("Failed to register event listener!", e);
+
+            throw new ConnectException(e);
+        }
+        finally {
+            stopped = false;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<SourceRecord> poll() throws InterruptedException {
+        ArrayList<SourceRecord> records = new ArrayList<>(evtBatchSize);
+        ArrayList<CacheEvent> evts = new ArrayList<>(evtBatchSize);
+
+        if (stopped)
+            return records;
+
+        try {
+            if (evtBuf.drainTo(evts, evtBatchSize) > 0) {
+                for (CacheEvent evt : evts) {
+                    // schema and keys are ignored.
+                    for (String topic : topics)
+                        records.add(new SourceRecord(srcPartition, offset, topic, null, evt));
+                }
+
+                return records;
+            }
+        }
+        catch (IgniteException e) {
+            log.error("Error when polling event queue!", e);
+        }
+
+        // for shutdown.
+        return null;
+    }
+
+    /**
+     * Converts comma-delimited cache events strings to Ignite internal representation.
+     *
+     * @param evtPropsStr Comma-delimited cache event names.
+     * @return Ignite internal representation of cache events to be registered with the remote listener.
+     * @throws Exception If error.
+     */
+    private int[] cacheEvents(String evtPropsStr) throws Exception {
+        String[] evtStr = evtPropsStr.split("\\s*,\\s*");
+
+        if (evtStr.length == 0)
+            return EventType.EVTS_CACHE;
+
+        int[] evts = new int[evtStr.length];
+
+        try {
+            for (int i = 0; i < evtStr.length; i++)
+                evts[i] = CacheEvt.valueOf(evtStr[i].toUpperCase()).getId();
+        }
+        catch (Exception e) {
+            log.error("Failed to recognize the provided cache event!", e);
+
+            throw new Exception(e);
+        }
+        return evts;
+    }
+
+    /**
+     * Stops the grid client.
+     */
+    @Override public synchronized void stop() {
+        if (stopped)
+            return;
+
+        stopped = true;
+
+        stopRemoteListen();
+
+        IgniteGrid.getIgnite().close();
+    }
+
+    /**
+     * Stops the remote listener.
+     */
+    protected void stopRemoteListen() {
+        if (rmtLsnrId != null)
+            IgniteGrid.getIgnite().events(IgniteGrid.getIgnite().cluster().forCacheNodes(cacheName))
+                .stopRemoteListen(rmtLsnrId);
+
+        rmtLsnrId = null;
+    }
+
+    /**
+     * Local listener buffering cache events to be further sent to Kafka.
+     */
+    private static class TaskLocalListener implements IgniteBiPredicate<UUID, CacheEvent> {
+
+        @Override public boolean apply(UUID id, CacheEvent evt) {
+            try {
+                if (!evtBuf.offer(evt, 10, TimeUnit.MILLISECONDS))
+                    log.error("Failed to buffer event {}", evt.name());
+            }
+            catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+            return true;
+        }
+    }
+
+    /**
+     * Remote filter.
+     */
+    private static class TaskRemoteFilter implements IgnitePredicate<CacheEvent> {
+        @IgniteInstanceResource
+        Ignite ignite;
+
+        @Override public boolean apply(CacheEvent evt) {
+
+            Affinity affinity = ignite.affinity(cacheName);
+            ClusterNode evtNode = evt.eventNode();
+
+            if (affinity.isPrimary(evtNode, evt.key())) {
+                // Process this event. Ignored on backups.
+                if (filter != null && filter.apply(evt))
+                    return false;
+
+                return true;
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     * Grid instance initialized on demand.
+     */
+    private static class IgniteGrid {
+        /** Constructor. */
+        private IgniteGrid() {
+        }
+
+        /** Instance holder. */
+        private static class Holder {
+            private static final Ignite IGNITE = Ignition.start(igniteConfigFile);
+        }
+
+        /**
+         * Obtains grid instance.
+         *
+         * @return Grid instance.
+         */
+        private static Ignite getIgnite() {
+            return Holder.IGNITE;
+        }
+    }
+
+    /** Cache events available for listening. */
+    private enum CacheEvt {
+        CREATED(EventType.EVT_CACHE_ENTRY_CREATED),
+        DESTROYED(EventType.EVT_CACHE_ENTRY_DESTROYED),
+        PUT(EventType.EVT_CACHE_OBJECT_PUT),
+        READ(EventType.EVT_CACHE_OBJECT_READ),
+        REMOVED(EventType.EVT_CACHE_OBJECT_REMOVED),
+        LOCKED(EventType.EVT_CACHE_OBJECT_LOCKED),
+        UNLOCKED(EventType.EVT_CACHE_OBJECT_UNLOCKED),
+        SWAPPED(EventType.EVT_CACHE_OBJECT_SWAPPED),
+        UNSWAPPED(EventType.EVT_CACHE_OBJECT_UNSWAPPED),
+        EXPIRED(EventType.EVT_CACHE_OBJECT_EXPIRED);
+
+        /** Internal Ignite event id. */
+        private final int id;
+
+        /**
+         * Constructor.
+         *
+         * @param id Internal Ignite event id.
+         */
+        CacheEvt(int id) {
+            this.id = id;
+        }
+
+        /**
+         * Gets Ignite event id.
+         *
+         * @return Ignite event id.
+         */
+        int getId() {
+            return id;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventConverter.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventConverter.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventConverter.java
new file mode 100644
index 0000000..57eb7de
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventConverter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect.serialization;
+
+import java.util.Map;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.storage.Converter;
+
+/**
+ * {@link CacheEvent} converter for Connect API.
+ */
+public class CacheEventConverter implements Converter {
+    private final CacheEventDeserializer deserializer = new CacheEventDeserializer();
+    private final CacheEventSerializer serializer = new CacheEventSerializer();
+
+    /** {@inheritDoc} */
+    @Override public void configure(Map<String, ?> map, boolean b) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] fromConnectData(String topic, Schema schema, Object o) {
+        try {
+            return serializer.serialize(topic, (CacheEvent)o);
+        }
+        catch (SerializationException e) {
+            throw new DataException("Failed to convert to byte[] due to a serialization error", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public SchemaAndValue toConnectData(String topic, byte[] bytes) {
+        CacheEvent evt;
+
+        try {
+            evt = deserializer.deserialize(topic, bytes);
+        }
+        catch (SerializationException e) {
+            throw new DataException("Failed to convert to Kafka Connect data due to a serialization error", e);
+        }
+
+        if (evt == null) {
+            return SchemaAndValue.NULL;
+        }
+        return new SchemaAndValue(null, evt);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventDeserializer.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventDeserializer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventDeserializer.java
new file mode 100644
index 0000000..47ce1ca
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventDeserializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect.serialization;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * Deserializer based on {@link JdkMarshaller}.
+ */
+public class CacheEventDeserializer implements Deserializer<CacheEvent> {
+    /** Marshaller. */
+    private static final Marshaller marsh = new JdkMarshaller();
+
+    /** {@inheritDoc} */
+    @Override public void configure(Map<String, ?> map, boolean b) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheEvent deserialize(String topic, byte[] bytes) {
+        try {
+            return marsh.unmarshal(bytes, getClass().getClassLoader());
+        }
+        catch (IgniteCheckedException e) {
+            throw new SerializationException("Failed to deserialize cache event!", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventSerializer.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventSerializer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventSerializer.java
new file mode 100644
index 0000000..2f2d668
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect.serialization;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * Serializer based on {@link JdkMarshaller}.
+ */
+public class CacheEventSerializer implements Serializer<CacheEvent> {
+    /** Marshaller. */
+    private static final Marshaller marsh = new JdkMarshaller();
+
+    /** {@inheritDoc} */
+    @Override public void configure(Map<String, ?> map, boolean b) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] serialize(String topic, CacheEvent event) {
+        try {
+            return marsh.marshal(event);
+        }
+        catch (IgniteCheckedException e) {
+            throw new SerializationException("Failed to serialize cache event!", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
index 731f540..c8d413a 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.stream.kafka;
 
 import junit.framework.TestSuite;
 import org.apache.ignite.stream.kafka.connect.IgniteSinkConnectorTest;
+import org.apache.ignite.stream.kafka.connect.IgniteSourceConnectorTest;
 
 /**
  * Apache Kafka streamers tests.
@@ -34,8 +35,9 @@ public class IgniteKafkaStreamerSelfTestSuite extends TestSuite {
         // Kafka streamer.
         suite.addTest(new TestSuite(KafkaIgniteStreamerSelfTest.class));
 
-        // Kafka streamer via Connect API.
+        // Kafka streamers via Connect API.
         suite.addTest(new TestSuite(IgniteSinkConnectorTest.class));
+        suite.addTest(new TestSuite(IgniteSourceConnectorTest.class));
 
         return suite;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
index 829c877..4918f87 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
@@ -28,8 +28,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import kafka.consumer.ConsumerConfig;
-import kafka.producer.KeyedMessage;
-import kafka.producer.Producer;
 import kafka.serializer.StringDecoder;
 import kafka.utils.VerifiableProperties;
 import org.apache.ignite.Ignite;
@@ -39,6 +37,7 @@ import org.apache.ignite.events.CacheEvent;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
 
@@ -116,7 +115,7 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
 
         Collections.shuffle(subnet);
 
-        List<KeyedMessage<String, String>> messages = new ArrayList<>(CNT);
+        List<ProducerRecord<String, String>> messages = new ArrayList<>(CNT);
 
         Map<String, String> keyValMap = new HashMap<>();
 
@@ -127,14 +126,12 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
 
             String msg = runtime + VALUE_URL + ip;
 
-            messages.add(new KeyedMessage<>(topic, ip, msg));
+            messages.add(new ProducerRecord<>(topic, ip, msg));
 
             keyValMap.put(ip, msg);
         }
 
-        Producer<String, String> producer = embeddedBroker.sendMessages(messages);
-
-        producer.close();
+        embeddedBroker.sendMessages(messages);
 
         return keyValMap;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
index 70acb78..4c5dc51 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
@@ -25,9 +25,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeoutException;
-import kafka.producer.KeyedMessage;
-import kafka.producer.Producer;
-import kafka.producer.ProducerConfig;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.utils.SystemTime$;
@@ -37,6 +34,9 @@ import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
 import org.apache.curator.test.TestingServer;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import scala.Tuple2;
 
 /**
@@ -106,15 +106,17 @@ public class TestKafkaBroker {
     /**
      * Sends a message to Kafka broker.
      *
-     * @param keyedMessages List of keyed messages.
+     * @param records List of records.
      * @return Producer used to send the message.
      */
-    public Producer<String, String> sendMessages(List<KeyedMessage<String, String>> keyedMessages) {
-        Producer<String, String> producer = new Producer<>(getProducerConfig());
+    public void sendMessages(List<ProducerRecord<String, String>> records) {
+        Producer<String, String> producer = new KafkaProducer<>(getProducerConfig());
 
-        producer.send(scala.collection.JavaConversions.asScalaBuffer(keyedMessages));
+        for (ProducerRecord<String, String> rec : records)
+            producer.send(rec);
 
-        return producer;
+        producer.flush();
+        producer.close();
     }
 
     /**
@@ -185,6 +187,7 @@ public class TestKafkaBroker {
         props.put("offsets.topic.replication.factor", "1");
         props.put("log.dir", createTmpDir("_cfg").getAbsolutePath());
         props.put("log.flush.interval.messages", "1");
+        props.put("log.flush.interval.ms", "10");
 
         return props;
     }
@@ -212,14 +215,14 @@ public class TestKafkaBroker {
      *
      * @return Kafka Producer config.
      */
-    private ProducerConfig getProducerConfig() {
+    private Properties getProducerConfig() {
         Properties props = new Properties();
 
-        props.put("metadata.broker.list", getBrokerAddress());
         props.put("bootstrap.servers", getBrokerAddress());
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
-        return new ProducerConfig(props);
+        return props;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
index a8583d0..6e6d65d 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
@@ -23,8 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import kafka.producer.KeyedMessage;
-import kafka.producer.Producer;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CachePeekMode;
@@ -33,6 +31,7 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.stream.kafka.TestKafkaBroker;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
@@ -183,7 +182,7 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
      * @return Map of key value messages.
      */
     private Map<String, String> produceStream(String topic) {
-        List<KeyedMessage<String, String>> messages = new ArrayList<>(EVENT_CNT);
+        List<ProducerRecord<String, String>> messages = new ArrayList<>(EVENT_CNT);
 
         Map<String, String> keyValMap = new HashMap<>();
 
@@ -193,14 +192,12 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
             String key = topic + "_" + String.valueOf(evt);
             String msg = runtime + String.valueOf(evt);
 
-            messages.add(new KeyedMessage<>(topic, key, msg));
+            messages.add(new ProducerRecord<>(topic, key, msg));
 
             keyValMap.put(key, msg);
         }
 
-        Producer<String, String> producer = kafkaBroker.sendMessages(messages);
-
-        producer.close();
+        kafkaBroker.sendMessages(messages);
 
         return keyValMap;
     }
@@ -216,7 +213,7 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
 
         props.put(ConnectorConfig.TOPICS_CONFIG, topics);
         props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
-        props.put(ConnectorConfig.NAME_CONFIG, "test-connector");
+        props.put(ConnectorConfig.NAME_CONFIG, "test-sink-connector");
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSinkConnector.class.getName());
         props.put(IgniteSinkConstants.CACHE_NAME, "testCache");
         props.put(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE, "true");

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java
new file mode 100644
index 0000000..d983c67
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import org.apache.kafka.connect.connector.Task;
+
+/**
+ * Source connector mock for tests for using the task mock.
+ */
+public class IgniteSourceConnectorMock extends IgniteSourceConnector {
+
+    /** {@inheritDoc} */
+    @Override public Class<? extends Task> taskClass() {
+        return IgniteSourceTaskMock.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
new file mode 100644
index 0000000..13b6887
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.stream.kafka.TestKafkaBroker;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.FutureCallback;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
+/**
+ * Tests for {@link IgniteSourceConnector}.
+ */
+public class IgniteSourceConnectorTest extends GridCommonAbstractTest {
+    /** Number of input messages. */
+    private static final int EVENT_CNT = 100;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "testCache";
+
+    /** Test topics created by connector. */
+    private static final String[] TOPICS = {"test1", "test2"};
+
+    /** Test Kafka broker. */
+    private TestKafkaBroker kafkaBroker;
+
+    /** Worker to run tasks. */
+    private Worker worker;
+
+    /** Workers' herder. */
+    private Herder herder;
+
+    /** Ignite server node shared among tests. */
+    private static Ignite grid;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void beforeTestsStarted() throws Exception {
+        IgniteConfiguration cfg = loadConfiguration("modules/kafka/src/test/resources/example-ignite.xml");
+
+        cfg.setClientMode(false);
+
+        grid = startGrid("igniteServerNode", cfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        kafkaBroker = new TestKafkaBroker();
+
+        WorkerConfig workerConfig = new StandaloneConfig(makeWorkerProps());
+
+        MemoryOffsetBackingStore offsetBackingStore = new MemoryOffsetBackingStore();
+        offsetBackingStore.configure(workerConfig.originals());
+
+        worker = new Worker(workerConfig, offsetBackingStore);
+        worker.start();
+
+        herder = new StandaloneHerder(worker);
+        herder.start();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        herder.stop();
+
+        worker.stop();
+
+        kafkaBroker.shutdown();
+
+        grid.cache(CACHE_NAME).clear();
+
+        // reset cache name to overwrite task configurations.
+        Field field = IgniteSourceTask.class.getDeclaredField("cacheName");
+
+        field.setAccessible(true);
+        field.set(IgniteSourceTask.class, null);
+    }
+
+    /**
+     * Tests data flow from injecting data into grid and transferring it to Kafka cluster
+     * without user-specified filter.
+     *
+     * @throws Exception Thrown in case of the failure.
+     */
+    public void testEventsInjectedIntoKafkaWithoutFilter() throws Exception {
+        Map<String, String> srcProps = makeSourceProps(Utils.join(TOPICS, ","));
+
+        srcProps.remove(IgniteSourceConstants.CACHE_FILTER_CLASS);
+
+        doTest(srcProps, false);
+    }
+
+    /**
+     * Tests data flow from injecting data into grid and transferring it to Kafka cluster.
+     *
+     * @throws Exception Thrown in case of the failure.
+     */
+    public void testEventsInjectedIntoKafka() throws Exception {
+        doTest(makeSourceProps(Utils.join(TOPICS, ",")), true);
+    }
+
+    /**
+     * Tests the source with the specified source configurations.
+     *
+     * @param srcProps Source properties.
+     * @param conditioned Flag indicating whether filtering is enabled.
+     * @throws Exception Fails if error.
+     */
+    private void doTest(Map<String, String> srcProps, boolean conditioned) throws Exception {
+        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
+            @Override
+            public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
+                if (error != null)
+                    throw new RuntimeException("Failed to create a job!", error);
+            }
+        });
+
+        herder.putConnectorConfig(
+            srcProps.get(ConnectorConfig.NAME_CONFIG),
+            srcProps, true, cb);
+
+        cb.get();
+
+        // Ugh! To be sure Kafka Connect's worker thread is properly started...
+        Thread.sleep(5000);
+
+        final CountDownLatch latch = new CountDownLatch(EVENT_CNT);
+
+        final IgnitePredicate<CacheEvent> locLsnr = new IgnitePredicate<CacheEvent>() {
+            @Override public boolean apply(CacheEvent evt) {
+                assert evt != null;
+
+                latch.countDown();
+
+                return true;
+            }
+        };
+
+        grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).localListen(locLsnr, EVT_CACHE_OBJECT_PUT);
+
+        IgniteCache<String, String> cache = grid.cache(CACHE_NAME);
+
+        assertEquals(0, cache.size(CachePeekMode.PRIMARY));
+
+        Map<String, String> keyValMap = new HashMap<>(EVENT_CNT);
+
+        keyValMap.putAll(sendData());
+
+        // Checks all events are processed.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+        grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).stopLocalListen(locLsnr);
+
+        assertEquals(EVENT_CNT, cache.size(CachePeekMode.PRIMARY));
+
+        // Checks the events are transferred to Kafka broker.
+        checkDataDelivered(conditioned);
+    }
+
+    /**
+     * Sends messages to the grid.
+     *
+     * @return Map of key value messages.
+     */
+    private Map<String, String> sendData() throws IOException {
+        Map<String, String> keyValMap = new HashMap<>();
+
+        for (int evt = 0; evt < EVENT_CNT; evt++) {
+            long runtime = System.currentTimeMillis();
+
+            String key = "test_" + String.valueOf(evt);
+            String msg = runtime + String.valueOf(evt);
+
+            if (evt >= EVENT_CNT / 2)
+                key = "conditioned_" + key;
+
+            grid.cache(CACHE_NAME).put(key, msg);
+
+            keyValMap.put(key, msg);
+        }
+
+        return keyValMap;
+    }
+
+    /**
+     * Checks if events were delivered to Kafka server.
+     *
+     * @param conditioned Flag indicating whether filtering is enabled.
+     */
+    private void checkDataDelivered(boolean conditioned) {
+        Properties props = new Properties();
+
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-grp");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
+        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+            "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+            "org.apache.ignite.stream.kafka.connect.serialization.CacheEventDeserializer");
+
+        KafkaConsumer<String, CacheEvent> consumer = new KafkaConsumer<>(props);
+
+        consumer.subscribe(Arrays.asList(TOPICS));
+
+        int evtCnt = 0;
+        long start = System.currentTimeMillis();
+
+        try {
+            while (false || (System.currentTimeMillis() - start) < 10000) {
+                ConsumerRecords<String, CacheEvent> records = consumer.poll(10);
+                for (ConsumerRecord<String, CacheEvent> record : records) {
+                    System.out.println("Event: offset = " + record.offset() + ", key = " + record.key()
+                        + ", value = " + record.value().toString());
+
+                    evtCnt++;
+                }
+            }
+        }
+        catch (WakeupException e) {
+            // ignore for shutdown.
+        }
+        finally {
+            consumer.close();
+
+            if (conditioned)
+                assertTrue(evtCnt == (EVENT_CNT * TOPICS.length) / 2);
+            else
+                assertTrue(evtCnt == EVENT_CNT * TOPICS.length);
+        }
+    }
+
+    /**
+     * Creates properties for test source connector.
+     *
+     * @param topics Topics.
+     * @return Test source connector properties.
+     */
+    private Map<String, String> makeSourceProps(String topics) {
+        Map<String, String> props = new HashMap<>();
+
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
+        props.put(ConnectorConfig.NAME_CONFIG, "test-src-connector");
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSourceConnectorMock.class.getName());
+        props.put(IgniteSourceConstants.CACHE_NAME, "testCache");
+        props.put(IgniteSourceConstants.CACHE_CFG_PATH, "example-ignite.xml");
+        props.put(IgniteSourceConstants.TOPIC_NAMES, topics);
+        props.put(IgniteSourceConstants.CACHE_EVENTS, "put");
+        props.put(IgniteSourceConstants.CACHE_FILTER_CLASS, TestCacheEventFilter.class.getName());
+        props.put(IgniteSourceConstants.INTL_BUF_SIZE, "1000000");
+
+        return props;
+    }
+
+    /**
+     * Creates properties for Kafka Connect workers.
+     *
+     * @return Worker configurations.
+     */
+    private Map<String, String> makeWorkerProps() throws IOException {
+        Map<String, String> props = new HashMap<>();
+
+        props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put("internal.key.converter.schemas.enable", "false");
+        props.put("internal.value.converter.schemas.enable", "false");
+        props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter");
+        props.put("key.converter.schemas.enable", "false");
+        props.put("value.converter.schemas.enable", "false");
+        props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
+        // fast flushing for testing.
+        props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
+
+        return props;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java
new file mode 100644
index 0000000..5237e98
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Source task mock for tests. It avoids closing the grid from test to test.
+ */
+public class IgniteSourceTaskMock extends IgniteSourceTask {
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        stopRemoteListen();
+
+        // don't stop the grid for tests.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java
new file mode 100644
index 0000000..8978db0
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.lang.IgnitePredicate;
+
+/**
+ * Test user-defined filter.
+ */
+class TestCacheEventFilter implements IgnitePredicate<CacheEvent> {
+
+    @Override public boolean apply(CacheEvent event) {
+        return ((String)event.key()).startsWith("conditioned_");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/resources/example-ignite.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/resources/example-ignite.xml b/modules/kafka/src/test/resources/example-ignite.xml
index fbb05d3..f23a306 100644
--- a/modules/kafka/src/test/resources/example-ignite.xml
+++ b/modules/kafka/src/test/resources/example-ignite.xml
@@ -30,6 +30,8 @@
         http://www.springframework.org/schema/util
         http://www.springframework.org/schema/util/spring-util.xsd">
     <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <!-- Enable peer class loading for remote events. -->
+        <property name="peerClassLoadingEnabled" value="true"/>
         <!-- Enable client mode. -->
         <property name="clientMode" value="true"/>
 
@@ -48,7 +50,7 @@
         <!-- Enable cache events. -->
         <property name="includeEventTypes">
             <list>
-                <!-- Cache events (only EVT_CACHE_OBJECT_PUT for tests). -->
+                <!-- Cache events. -->
                 <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
             </list>
         </property>