You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/18 09:36:17 UTC
[07/14] ignite git commit: IGNITE-529 Initial implementation
IGNITE-529 Initial implementation
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9a5fc056
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9a5fc056
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9a5fc056
Branch: refs/heads/master
Commit: 9a5fc056786494e748c982e0c766fa9842ba523e
Parents: b382062
Author: shtykh_roman <rs...@yahoo.com>
Authored: Tue Nov 17 17:38:12 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Nov 17 17:38:12 2015 +0300
----------------------------------------------------------------------
modules/flume/README.md | 40 ++++
modules/flume/pom.xml | 77 ++++++++
.../ignite/stream/flume/EventTransformer.java | 36 ++++
.../apache/ignite/stream/flume/IgniteSink.java | 186 +++++++++++++++++++
.../stream/flume/IgniteSinkConstants.java | 35 ++++
.../ignite/stream/flume/IgniteSinkTest.java | 142 ++++++++++++++
.../stream/flume/IgniteSinkTestSuite.java | 37 ++++
.../stream/flume/TestEventTransformer.java | 66 +++++++
.../flume/src/test/resources/example-ignite.xml | 71 +++++++
pom.xml | 1 +
10 files changed, 691 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/README.md
----------------------------------------------------------------------
diff --git a/modules/flume/README.md b/modules/flume/README.md
new file mode 100644
index 0000000..2247cf3
--- /dev/null
+++ b/modules/flume/README.md
@@ -0,0 +1,40 @@
+#Flume NG sink
+
+## Setting up and running
+
+1. Create a transformer by implementing EventTransformer interface.
+2. Build it and copy to ${FLUME_HOME}/plugins.d/ignite-sink/lib.
+3. Copy other Ignite-related jar files to ${FLUME_HOME}/plugins.d/ignite-sink/libext to have them as shown below.
+
+```
+plugins.d/
+`-- ignite
+ |-- lib
+ | `-- ignite-flume-transformer-x.x.x.jar <-- your jar
+ `-- libext
+ |-- cache-api-1.0.0.jar
+ |-- ignite-core-x.x.x.jar
+ |-- ignite-flume-x.x.x.jar
+ |-- ignite-spring-x.x.x.jar
+ |-- spring-aop-4.1.0.RELEASE.jar
+ |-- spring-beans-4.1.0.RELEASE.jar
+ |-- spring-context-4.1.0.RELEASE.jar
+ |-- spring-core-4.1.0.RELEASE.jar
+ `-- spring-expression-4.1.0.RELEASE.jar
+```
+
+4. In Flume configuration file, specify Ignite configuration XML file's location with cache properties
+(see [Apache Ignite](https://apacheignite.readme.io/) with cache name specified for cache creation,
+cache name (same as in Ignite configuration file), your EventTransformer's implementation class,
+and, optionally, batch size (default -- 100).
+
+```
+# Describe the sink
+a1.sinks.k1.type = org.apache.ignite.stream.flume.IgniteSink
+a1.sinks.k1.igniteCfg = /some-path/ignite.xml
+a1.sinks.k1.cacheName = testCache
+a1.sinks.k1.eventTransformer = my.company.MyEventTransformer
+a1.sinks.k1.batchSize = 100
+```
+
+After specifying your source and channel (see Flume's docs), you are ready to run a Flume agent.
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/pom.xml
----------------------------------------------------------------------
diff --git a/modules/flume/pom.xml b/modules/flume/pom.xml
new file mode 100644
index 0000000..cd4ee98
--- /dev/null
+++ b/modules/flume/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <artifactId>ignite-flume</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <url>http://ignite.apache.org</url>
+
+ <properties>
+ <flume-ng.version>1.6.0</flume-ng.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <version>${flume-ng.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spring</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-log4j</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/main/java/org/apache/ignite/stream/flume/EventTransformer.java
----------------------------------------------------------------------
diff --git a/modules/flume/src/main/java/org/apache/ignite/stream/flume/EventTransformer.java b/modules/flume/src/main/java/org/apache/ignite/stream/flume/EventTransformer.java
new file mode 100644
index 0000000..e85a98b
--- /dev/null
+++ b/modules/flume/src/main/java/org/apache/ignite/stream/flume/EventTransformer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.flume;
+
+import java.util.List;
+import java.util.Map;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Flume event transformer to convert a list of Flume {@link Event} to cache entries.
+ */
+public interface EventTransformer<Event, K, V> {
+
+ /**
+ * Transforms a list of Flume {@link Event} to cache entries.
+ *
+ * @param events List of Flume events to transform.
+ * @return Cache entries to be written into the grid.
+ */
+ @Nullable Map<K, V> transform(List<Event> events);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSink.java
----------------------------------------------------------------------
diff --git a/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSink.java b/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSink.java
new file mode 100644
index 0000000..e6e7e90
--- /dev/null
+++ b/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSink.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.flume;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flume sink for Apache Ignite.
+ */
+public class IgniteSink extends AbstractSink implements Configurable {
+ /** Logger. */
+ private static final Logger log = LoggerFactory.getLogger(IgniteSink.class);
+
+ /** Default batch size. */
+ private static final int DFLT_BATCH_SIZE = 100;
+
+ /** Ignite configuration file. */
+ private String springCfgPath;
+
+ /** Cache name. */
+ private String cacheName;
+
+ /** Event transformer implementation class. */
+ private String eventTransformerCls;
+
+ /** Number of events to be written per Flume transaction. */
+ private int batchSize;
+
+ /** Monitoring counter. */
+ private SinkCounter sinkCounter;
+
+ /** Event transformer. */
+ private EventTransformer<Event, Object, Object> eventTransformer;
+
+ /** Ignite instance. */
+ private Ignite ignite;
+
+ /** Empty constructor. */
+ public IgniteSink() {
+ }
+
+ /**
+ * Sink configurations with Ignite-specific settings.
+ *
+ * @param context Context for sink.
+ */
+ @Override public void configure(Context context) {
+ springCfgPath = context.getString(IgniteSinkConstants.CFG_PATH);
+ cacheName = context.getString(IgniteSinkConstants.CFG_CACHE_NAME);
+ eventTransformerCls = context.getString(IgniteSinkConstants.CFG_EVENT_TRANSFORMER);
+ batchSize = context.getInteger(IgniteSinkConstants.CFG_BATCH_SIZE, DFLT_BATCH_SIZE);
+
+ if (sinkCounter == null)
+ sinkCounter = new SinkCounter(getName());
+ }
+
+ /**
+ * Starts a grid and initializes na event transformer.
+ */
+ @SuppressWarnings("unchecked")
+ @Override synchronized public void start() {
+ A.notNull(springCfgPath, "Ignite config file");
+ A.notNull(cacheName, "Cache name");
+ A.notNull(eventTransformerCls, "Event transformer class");
+
+ sinkCounter.start();
+
+ try {
+ if (ignite == null)
+ ignite = Ignition.start(springCfgPath);
+
+ if (eventTransformerCls != null && !eventTransformerCls.isEmpty()) {
+ Class<? extends EventTransformer> clazz =
+ (Class<? extends EventTransformer<Event, Object, Object>>)Class.forName(eventTransformerCls);
+
+ eventTransformer = clazz.newInstance();
+ }
+ }
+ catch (Exception e) {
+ log.error("Failed to start grid", e);
+
+ throw new FlumeException("Failed to start grid", e);
+ }
+
+ super.start();
+ }
+
+ /**
+ * Stops the grid.
+ */
+ @Override synchronized public void stop() {
+ if (ignite != null)
+ ignite.close();
+
+ sinkCounter.stop();
+
+ super.stop();
+ }
+
+ /**
+ * Processes Flume events.
+ */
+ @Override public Status process() throws EventDeliveryException {
+ Channel channel = getChannel();
+
+ Transaction transaction = channel.getTransaction();
+
+ int eventCount = 0;
+
+ try {
+ transaction.begin();
+
+ List<Event> batch = new ArrayList<>(batchSize);
+
+ for (; eventCount < batchSize; ++eventCount) {
+ Event event = channel.take();
+
+ if (event == null) {
+ break;
+ }
+
+ batch.add(event);
+ }
+
+ if (!batch.isEmpty()) {
+ ignite.cache(cacheName).putAll(eventTransformer.transform(batch));
+
+ if (batch.size() < batchSize)
+ sinkCounter.incrementBatchUnderflowCount();
+ else
+ sinkCounter.incrementBatchCompleteCount();
+ }
+ else {
+ sinkCounter.incrementBatchEmptyCount();
+ }
+
+ sinkCounter.addToEventDrainAttemptCount(batch.size());
+
+ transaction.commit();
+
+ sinkCounter.addToEventDrainSuccessCount(batch.size());
+ }
+ catch (Exception e) {
+ log.error("Failed to process events", e);
+
+ transaction.rollback();
+
+ throw new EventDeliveryException(e);
+ }
+ finally {
+ transaction.close();
+ }
+
+ return eventCount == 0 ? Status.BACKOFF : Status.READY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSinkConstants.java
----------------------------------------------------------------------
diff --git a/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSinkConstants.java b/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSinkConstants.java
new file mode 100644
index 0000000..ddefb24
--- /dev/null
+++ b/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSinkConstants.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.flume;
+
+/**
+ * Configuration strings for sending data to cache. Used in Flume agent configuration file.
+ */
+public class IgniteSinkConstants {
+ /** Ignite configuration file path. */
+ public static final String CFG_PATH = "igniteCfg";
+
+ /** Cache name. */
+ public static final String CFG_CACHE_NAME = "cacheName";
+
+ /** Event transformer implementation. */
+ public static final String CFG_EVENT_TRANSFORMER = "eventTransformer";
+
+ /** Batch size. */
+ public static final String CFG_BATCH_SIZE = "batchSize";
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTest.java
----------------------------------------------------------------------
diff --git a/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTest.java b/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTest.java
new file mode 100644
index 0000000..2f33ed4
--- /dev/null
+++ b/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.flume;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
+/**
+ * {@link IgniteSink} test.
+ */
+public class IgniteSinkTest extends GridCommonAbstractTest {
+ /** Number of events to be sent to memory channel. */
+ private static final int EVENT_CNT = 10000;
+
+ /** Cache name. */
+ private static final String CACHE_NAME = "testCache";
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception {@link Exception}.
+ */
+ public void testSink() throws Exception {
+ IgniteConfiguration cfg = loadConfiguration("modules/flume/src/test/resources/example-ignite.xml");
+
+ cfg.setClientMode(false);
+
+ final Ignite grid = startGrid("igniteServerNode", cfg);
+
+ Context channelContext = new Context();
+
+ channelContext.put("capacity", String.valueOf(EVENT_CNT));
+ channelContext.put("transactionCapacity", String.valueOf(EVENT_CNT));
+
+ Channel memoryChannel = new MemoryChannel();
+
+ Configurables.configure(memoryChannel, channelContext);
+
+ final CountDownLatch latch = new CountDownLatch(EVENT_CNT);
+
+ final IgnitePredicate<Event> putLsnr = new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ assert evt != null;
+
+ latch.countDown();
+
+ return true;
+ }
+ };
+
+ IgniteSink sink = new IgniteSink() {
+ // Setting the listener on cache before sink processing starts.
+ @Override synchronized public void start() {
+ super.start();
+
+ grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).localListen(putLsnr, EVT_CACHE_OBJECT_PUT);
+ }
+ };
+
+ sink.setName("IgniteSink");
+ sink.setChannel(memoryChannel);
+
+ Context ctx = new Context();
+
+ ctx.put(IgniteSinkConstants.CFG_CACHE_NAME, CACHE_NAME);
+ ctx.put(IgniteSinkConstants.CFG_PATH, "example-ignite.xml");
+ ctx.put(IgniteSinkConstants.CFG_EVENT_TRANSFORMER, "org.apache.ignite.stream.flume.TestEventTransformer");
+
+ Configurables.configure(sink, ctx);
+
+ sink.start();
+
+ try {
+ Transaction tx = memoryChannel.getTransaction();
+
+ tx.begin();
+
+ for (int i = 0; i < EVENT_CNT; i++)
+ memoryChannel.put(EventBuilder.withBody((String.valueOf(i) + ": " + i).getBytes()));
+
+ tx.commit();
+ tx.close();
+
+ Sink.Status status = Sink.Status.READY;
+
+ while (status != Sink.Status.BACKOFF) {
+ status = sink.process();
+ }
+ }
+ finally {
+ sink.stop();
+ }
+
+ // Checks that 10000 events successfully processed in 10 seconds.
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).stopLocalListen(putLsnr);
+
+ IgniteCache<String, Integer> cache = grid.cache(CACHE_NAME);
+
+ // Checks that each event was processed properly.
+ for (int i = 0; i < EVENT_CNT; i++) {
+ assertEquals(i, (int)cache.get(String.valueOf(i)));
+ }
+
+ assertEquals(EVENT_CNT, cache.size(CachePeekMode.PRIMARY));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTestSuite.java b/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTestSuite.java
new file mode 100644
index 0000000..ad6d162
--- /dev/null
+++ b/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTestSuite.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.flume;
+
+import junit.framework.TestSuite;
+
+/**
+ * Tests for a Flume sink for Ignite.
+ */
+public class IgniteSinkTestSuite extends TestSuite {
+ /**
+ * @return Test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("Apache Flume NG Sink Test Suite");
+
+ suite.addTest(new TestSuite(IgniteSinkTest.class));
+
+ return suite;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/test/java/org/apache/ignite/stream/flume/TestEventTransformer.java
----------------------------------------------------------------------
diff --git a/modules/flume/src/test/java/org/apache/ignite/stream/flume/TestEventTransformer.java b/modules/flume/src/test/java/org/apache/ignite/stream/flume/TestEventTransformer.java
new file mode 100644
index 0000000..c15efbf
--- /dev/null
+++ b/modules/flume/src/test/java/org/apache/ignite/stream/flume/TestEventTransformer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.flume;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.flume.Event;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A test transformer to convert {@link org.apache.flume.Event} to cacheable entries.
+ */
+public class TestEventTransformer implements EventTransformer<Event, String, Integer> {
+
+ /**
+ * Transforms a Flume event to cacheable entries.
+ *
+ * @param event Flume event to transform.
+ * @return Map of cacheable entries.
+ */
+ private Map<String, Integer> transform(Event event) {
+ final Map<String, Integer> map = new HashMap<>();
+
+ String eventStr = new String(event.getBody());
+
+ if (!eventStr.isEmpty()) {
+ String[] tokens = eventStr.split(":"); // Expects column-delimited one line.
+
+ map.put(tokens[0].trim(), Integer.valueOf(tokens[1].trim()));
+ }
+
+ return map;
+ }
+
+ /**
+ * Transforms a list of Flume event to cacheable entries.
+ *
+ * @param events Flume events to transform.
+ * @return Map of cacheable entries.
+ */
+ @Nullable @Override public Map<String, Integer> transform(List<Event> events) {
+ final Map<String, Integer> map = new HashMap<>();
+
+ for (Event event : events) {
+ map.putAll(transform(event));
+ }
+
+ return map;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/test/resources/example-ignite.xml
----------------------------------------------------------------------
diff --git a/modules/flume/src/test/resources/example-ignite.xml b/modules/flume/src/test/resources/example-ignite.xml
new file mode 100644
index 0000000..fbb05d3
--- /dev/null
+++ b/modules/flume/src/test/resources/example-ignite.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Ignite configuration with all defaults and enabled p2p deployment and enabled events.
+ Used for testing IgniteSink running Ignite in a client mode.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <!-- Enable client mode. -->
+ <property name="clientMode" value="true"/>
+
+ <!-- Cache accessed from IgniteSink. -->
+ <property name="cacheConfiguration">
+ <list>
+ <!-- Partitioned cache example configuration with configurations adjusted to server nodes'. -->
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="atomicityMode" value="ATOMIC"/>
+
+ <property name="name" value="testCache"/>
+ </bean>
+ </list>
+ </property>
+
+ <!-- Enable cache events. -->
+ <property name="includeEventTypes">
+ <list>
+ <!-- Cache events (only EVT_CACHE_OBJECT_PUT for tests). -->
+ <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
+ </list>
+ </property>
+
+ <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:47500</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+</beans>
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5f06555..c40b551 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,6 +73,7 @@
<module>modules/cloud</module>
<module>modules/mesos</module>
<module>modules/kafka</module>
+ <module>modules/flume</module>
<module>modules/yarn</module>
<module>modules/jms11</module>
<module>modules/mqtt</module>