You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2016/10/05 20:31:18 UTC

nifi git commit: NIFI-2398 - GetIgnite processor

Repository: nifi
Updated Branches:
  refs/heads/master e46fea920 -> da33e2859


NIFI-2398 - GetIgnite processor

This closes #721.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/da33e285
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/da33e285
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/da33e285

Branch: refs/heads/master
Commit: da33e2859ce45321d28901e5820c38a37dcfc709
Parents: e46fea9
Author: mans2singh <ma...@yahoo.com>
Authored: Sun Jul 24 16:18:55 2016 -0700
Committer: Pierre Villard <pi...@gmail.com>
Committed: Wed Oct 5 22:31:01 2016 +0200

----------------------------------------------------------------------
 .../ignite/AbstractIgniteProcessor.java         |  30 +-
 .../cache/AbstractIgniteCacheProcessor.java     |  35 +-
 .../processors/ignite/cache/GetIgniteCache.java | 117 ++++++
 .../processors/ignite/cache/PutIgniteCache.java |  41 +-
 .../org.apache.nifi.processor.Processor         |   1 +
 .../ignite/cache/ITGetIgniteCache.java          | 184 +++++++++
 .../ignite/cache/ITPutIgniteCache.java          |  67 +++-
 .../ignite/cache/TestGetIgniteCache.java        | 383 +++++++++++++++++++
 .../ignite/cache/TestPutIgniteCache.java        |   9 +-
 9 files changed, 817 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java
index 1feaf83..28b7b52 100644
--- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.nifi.processors.ignite;
 
+import java.util.List;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.Ignition;
@@ -88,20 +90,28 @@ public abstract class AbstractIgniteProcessor extends AbstractProcessor  {
     public void initializeIgnite(ProcessContext context) {
 
         if ( getIgnite() != null ) {
-            getLogger().warn("Ignite already initialized");
+            getLogger().info("Ignite already initialized");
             return;
         }
 
-        Ignition.setClientMode(true);
 
-        String configuration = context.getProperty(IGNITE_CONFIGURATION_FILE).getValue();
-        getLogger().info("Initializing ignite with configuration {} ", new Object[] { configuration });
-        if ( StringUtils.isEmpty(configuration) ) {
-            ignite = Ignition.start();
-        } else {
-            ignite = Ignition.start(configuration);
-        }
+        synchronized(Ignition.class) {
+            List<Ignite> grids = Ignition.allGrids();
 
-    }
+            if ( grids.size() == 1 ) {
+                getLogger().info("Ignite grid already available");
+                ignite = grids.get(0);
+                return;
+            }
+            Ignition.setClientMode(true);
 
+            String configuration = context.getProperty(IGNITE_CONFIGURATION_FILE).getValue();
+            getLogger().info("Initializing ignite with configuration {} ", new Object[] { configuration });
+            if ( StringUtils.isEmpty(configuration) ) {
+                ignite = Ignition.start();
+            } else {
+                ignite = Ignition.start(configuration);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java
index 8e1a7cb..ca6136c 100644
--- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java
@@ -18,12 +18,12 @@ package org.apache.nifi.processors.ignite.cache;
 
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 import org.apache.ignite.IgniteCache;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression.ResultType;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
@@ -47,9 +47,17 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess
             .build();
 
     /**
-     * Property descriptors
+     * The Ignite cache key attribute
      */
-    protected static List<PropertyDescriptor> descriptors;
+    public static final PropertyDescriptor IGNITE_CACHE_ENTRY_KEY = new PropertyDescriptor.Builder()
+            .displayName("Ignite Cache Entry Identifier")
+            .name("ignite-cache-entry-identifier")
+            .description("A FlowFile attribute, or attribute expression used " +
+                "for determining Ignite cache key for the Flow File content")
+            .required(true)
+            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
+            .expressionLanguageSupported(true)
+            .build();
 
     /**
      * Relations
@@ -57,16 +65,19 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess
     protected static Set<Relationship> relationships;
 
     /**
-     * Ignite cache instance
+     * Ignite cache name
      */
-    private transient IgniteCache<String,byte[]> igniteCache;
+    private String cacheName;
 
     /**
      * Get ignite cache instance
      * @return ignite cache instance
      */
     protected IgniteCache<String, byte[]> getIgniteCache() {
-        return igniteCache;
+         if ( getIgnite() == null )
+            return null;
+         else
+            return getIgnite().getOrCreateCache(cacheName);
     }
 
     static {
@@ -96,8 +107,7 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess
                 super.initializeIgnite(context);
             }
 
-            String cacheName = context.getProperty(CACHE_NAME).getValue();
-            igniteCache = getIgnite().getOrCreateCache(cacheName);
+            cacheName = context.getProperty(CACHE_NAME).getValue();
 
         } catch (Exception e) {
             getLogger().error("Failed to initialize ignite cache due to {}", new Object[] { e }, e);
@@ -108,12 +118,11 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess
     /**
      * Close Ignite cache instance and calls base class closeIgnite
      */
-    @OnStopped
+    @OnShutdown
     public void closeIgniteCache() {
-        if (igniteCache != null) {
+        if (getIgniteCache() != null) {
             getLogger().info("Closing ignite cache");
-            igniteCache.close();
-            igniteCache = null;
+            getIgniteCache().close();
         }
         super.closeIgnite();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/GetIgniteCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/GetIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/GetIgniteCache.java
new file mode 100644
index 0000000..77fc055
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/GetIgniteCache.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.ignite.cache;
+
+import java.io.ByteArrayInputStream;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+/**
+ * Get cache processors which gets byte array for the key from Ignite cache and set the array
+ * as the FlowFile content.
+ */
+@EventDriven
+@SupportsBatching
+@Tags({ "Ignite", "get", "read", "cache", "key" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Get the byte array from Ignite Cache and adds it as the content of a FlowFile." +
+    "The processor uses the value of FlowFile attribute (Ignite cache entry key) as the cache key lookup. " +
+    "If the entry corresponding to the key is not found in the cache an error message is associated with the FlowFile " +
+    "Note - The Ignite Kernel periodically outputs node performance statistics to the logs. This message " +
+    " can be turned off by setting the log level for logger 'org.apache.ignite' to WARN in the logback.xml configuration file.")
+@WritesAttributes({
+    @WritesAttribute(attribute = GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, description = "The reason for getting entry from cache"),
+    })
+@SeeAlso({PutIgniteCache.class})
+public class GetIgniteCache extends AbstractIgniteCacheProcessor {
+
+    /** Flow file attribute keys and messages */
+    public static final String IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY = "ignite.cache.get.failed.reason";
+    public static final String IGNITE_GET_FAILED_MISSING_KEY_MESSAGE = "The FlowFile key attribute was missing";
+    public static final String IGNITE_GET_FAILED_MISSING_ENTRY_MESSAGE = "The cache byte array entry was null or zero length";
+    public static final String IGNITE_GET_FAILED_MESSAGE_PREFIX = "The cache request failed because of ";
+
+    /**
+     * Property descriptors
+     */
+    protected static final List<PropertyDescriptor> descriptors =
+        Arrays.asList(IGNITE_CONFIGURATION_FILE, CACHE_NAME, IGNITE_CACHE_ENTRY_KEY);
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @OnScheduled
+    public final void initialize(ProcessContext context) throws ProcessException {
+        super.initializeIgniteCache(context);
+    }
+
+    /**
+     * Handle flow file and gets the entry from the cache based on the key attribute
+     */
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+
+        if (flowFile == null) {
+            return;
+        }
+
+        String key = context.getProperty(IGNITE_CACHE_ENTRY_KEY).evaluateAttributeExpressions(flowFile).getValue();
+        if ( StringUtils.isEmpty(key) ) {
+            flowFile = session.putAttribute(flowFile, IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
+            session.transfer(flowFile, REL_FAILURE);
+        } else {
+            try {
+                byte [] value = getIgniteCache().get(key);
+                if ( value == null || value.length == 0 ) {
+                    flowFile = session.putAttribute(flowFile, IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY,
+                        IGNITE_GET_FAILED_MISSING_ENTRY_MESSAGE);
+                    session.transfer(flowFile, REL_FAILURE);
+                } else {
+                    ByteArrayInputStream bais = new ByteArrayInputStream(value);
+                    flowFile = session.importFrom(bais, flowFile);
+                    session.transfer(flowFile,REL_SUCCESS);
+                }
+            } catch(Exception exception) {
+                flowFile = session.putAttribute(flowFile, IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY,
+                     IGNITE_GET_FAILED_MESSAGE_PREFIX + exception);
+                getLogger().error("Failed to get value for key {} from IgniteDB due to {}", new Object[] { key, exception }, exception);
+                session.transfer(flowFile, REL_FAILURE);
+                context.yield();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java
index 316ed8f..2d1471e 100644
--- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -34,12 +35,13 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.AttributeExpression.ResultType;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -71,6 +73,7 @@ import org.apache.nifi.stream.io.StreamUtils;
     @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, description = "The total number of failed FlowFiles in the batch"),
     @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, description = "The failed reason attribute key")
     })
+@SeeAlso({GetIgniteCache.class})
 public class PutIgniteCache extends AbstractIgniteCacheProcessor {
 
     /**
@@ -138,16 +141,6 @@ public class PutIgniteCache extends AbstractIgniteCacheProcessor {
             .sensitive(false)
             .build();
 
-    public static final PropertyDescriptor IGNITE_CACHE_ENTRY_KEY = new PropertyDescriptor.Builder()
-            .displayName("Ignite Cache Entry Identifier")
-            .name("ignite-cache-entry-identifier")
-            .description("A FlowFile attribute, or attribute expression used " +
-                "for determining Ignite cache key for the Flow File content")
-            .required(true)
-            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
-            .expressionLanguageSupported(true)
-            .build();
-
     /** Flow file attribute keys and messages */
     public static final String IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT = "ignite.cache.batch.flow.file.total.count";
     public static final String IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER = "ignite.cache.batch.flow.file.item.number";
@@ -160,17 +153,15 @@ public class PutIgniteCache extends AbstractIgniteCacheProcessor {
     public static final String IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE = "The FlowFile key attribute was missing";
     public static final String IGNITE_BATCH_FLOW_FILE_FAILED_ZERO_SIZE_MESSAGE = "The FlowFile size was zero";
 
-    static {
-        descriptors = new ArrayList<>();
-        descriptors.add(IGNITE_CONFIGURATION_FILE);
-        descriptors.add(CACHE_NAME);
-        descriptors.add(BATCH_SIZE);
-        descriptors.add(IGNITE_CACHE_ENTRY_KEY);
-        descriptors.add(DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS);
-        descriptors.add(DATA_STREAMER_PER_NODE_BUFFER_SIZE);
-        descriptors.add(DATA_STREAMER_AUTO_FLUSH_FREQUENCY);
-        descriptors.add(DATA_STREAMER_ALLOW_OVERRIDE);
-    }
+    /**
+     * Property descriptors
+     */
+    protected static final List<PropertyDescriptor> descriptors =
+        Arrays.asList(IGNITE_CONFIGURATION_FILE,CACHE_NAME,BATCH_SIZE,
+            IGNITE_CACHE_ENTRY_KEY,
+            DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS,
+            DATA_STREAMER_PER_NODE_BUFFER_SIZE,
+            DATA_STREAMER_AUTO_FLUSH_FREQUENCY,DATA_STREAMER_ALLOW_OVERRIDE);
 
     /**
      * Data streamer instance
@@ -190,9 +181,13 @@ public class PutIgniteCache extends AbstractIgniteCacheProcessor {
         if (igniteDataStreamer != null) {
             getLogger().info("Closing ignite data streamer");
             igniteDataStreamer.flush();
-            igniteDataStreamer.close();
             igniteDataStreamer = null;
         }
+    }
+
+    @OnShutdown
+    public final void closeIgniteDataStreamerAndCache() {
+        closeIgniteDataStreamer();
         super.closeIgniteCache();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index d1da921..931f94f 100644
--- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,3 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 org.apache.nifi.processors.ignite.cache.PutIgniteCache
+org.apache.nifi.processors.ignite.cache.GetIgniteCache

http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITGetIgniteCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITGetIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITGetIgniteCache.java
new file mode 100644
index 0000000..3adf407
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITGetIgniteCache.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.ignite.cache;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ITGetIgniteCache {
+
+    private static final String CACHE_NAME = "testCache";
+    private static TestRunner runner;
+    private static GetIgniteCache getIgniteCache;
+    private static Map<String,String> properties1;
+    private static HashMap<String, String> properties2;
+
+    @BeforeClass
+    public static void setUp() throws IOException {
+        getIgniteCache = new GetIgniteCache();
+        properties1 = new HashMap<String,String>();
+        properties2 = new HashMap<String,String>();
+    }
+
+    @AfterClass
+    public static void teardown() {
+        runner = null;
+        IgniteCache<String, byte[]> cache = getIgniteCache.getIgniteCache();
+        if (cache != null )
+            cache.destroy();
+        getIgniteCache = null;
+    }
+
+    @Test
+    public void testgetIgniteCacheOnTriggerFileConfigurationOneFlowFile() throws IOException, InterruptedException {
+        runner = TestRunners.newTestRunner(getIgniteCache);
+        runner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
+        runner.setProperty(GetIgniteCache.IGNITE_CONFIGURATION_FILE,
+                "file:///" + new File(".").getAbsolutePath() + "/src/test/resources/test-ignite.xml");
+        runner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+        runner.assertValid();
+        properties1.put("igniteKey", "key5");
+        runner.enqueue("test5".getBytes(),properties1);
+
+        getIgniteCache.initialize(runner.getProcessContext());
+
+        getIgniteCache.getIgniteCache().put("key5", "test5".getBytes());
+        runner.run(1, false, true);
+
+        runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 1);
+        List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+        assertEquals(1, sucessfulFlowFiles.size());
+        List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+        assertEquals(0, failureFlowFiles.size());
+
+        final MockFlowFile out = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
+
+        out.assertContentEquals("test5".getBytes());
+        Assert.assertArrayEquals("test5".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key5"));
+        runner.shutdown();
+    }
+
+    @Test
+    public void testgetIgniteCacheOnTriggerNoConfigurationTwoFlowFile() throws IOException, InterruptedException {
+        runner = TestRunners.newTestRunner(getIgniteCache);
+        runner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
+        runner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+        runner.assertValid();
+        properties1.put("igniteKey", "key51");
+        runner.enqueue("test1".getBytes(),properties1);
+        properties2.put("igniteKey", "key52");
+        runner.enqueue("test2".getBytes(),properties2);
+        getIgniteCache.initialize(runner.getProcessContext());
+
+        getIgniteCache.getIgniteCache().put("key51", "test51".getBytes());
+        getIgniteCache.getIgniteCache().put("key52", "test52".getBytes());
+        runner.run(2, false, true);
+
+        runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2);
+        List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+        assertEquals(2, sucessfulFlowFiles.size());
+        List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+        assertEquals(0, failureFlowFiles.size());
+
+        final MockFlowFile out = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
+
+        out.assertContentEquals("test51".getBytes());
+        Assert.assertArrayEquals("test51".getBytes(),
+                (byte[])getIgniteCache.getIgniteCache().get("key51"));
+
+        final MockFlowFile out2 = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(1);
+
+        out2.assertContentEquals("test52".getBytes());
+        Assert.assertArrayEquals("test52".getBytes(),
+                (byte[])getIgniteCache.getIgniteCache().get("key52"));
+
+        runner.shutdown();
+    }
+
+    @Test
+    public void testgetIgniteCacheOnTriggerNoConfigurationTwoFlowFileStopStart2Times() throws IOException, InterruptedException {
+        runner = TestRunners.newTestRunner(getIgniteCache);
+        runner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
+        runner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+        runner.assertValid();
+        properties1.put("igniteKey", "key51");
+        runner.enqueue("test1".getBytes(),properties1);
+        properties2.put("igniteKey", "key52");
+        runner.enqueue("test2".getBytes(),properties2);
+        getIgniteCache.initialize(runner.getProcessContext());
+
+        getIgniteCache.getIgniteCache().put("key51", "test51".getBytes());
+        getIgniteCache.getIgniteCache().put("key52", "test52".getBytes());
+        runner.run(2, false, true);
+
+        runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2);
+
+        getIgniteCache.closeIgniteCache();
+
+        runner.clearTransferState();
+
+        // reinit and check first time
+        runner.assertValid();
+        properties1.put("igniteKey", "key51");
+        runner.enqueue("test1".getBytes(),properties1);
+        properties2.put("igniteKey", "key52");
+        runner.enqueue("test2".getBytes(),properties2);
+        getIgniteCache.initialize(runner.getProcessContext());
+
+        runner.run(2, false, true);
+
+        runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2);
+
+        getIgniteCache.closeIgniteCache();
+
+        runner.clearTransferState();
+
+        // reinit and check second time
+        runner.assertValid();
+        properties1.put("igniteKey", "key51");
+        runner.enqueue("test1".getBytes(),properties1);
+        properties2.put("igniteKey", "key52");
+        runner.enqueue("test2".getBytes(),properties2);
+        getIgniteCache.initialize(runner.getProcessContext());
+
+        runner.run(2, false, true);
+
+        runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2);
+
+        getIgniteCache.closeIgniteCache();
+
+        runner.clearTransferState();
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java
index e8a469e..7c67536 100644
--- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java
@@ -50,6 +50,7 @@ public class ITPutIgniteCache {
     @AfterClass
     public static void teardown() {
         runner = null;
+        putIgniteCache.getIgniteCache().destroy();
         putIgniteCache = null;
     }
 
@@ -82,8 +83,9 @@ public class ITPutIgniteCache {
         out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0");
 
         out.assertContentEquals("test".getBytes());
+        System.out.println("Value was: " + new String((byte[])putIgniteCache.getIgniteCache().get("key5")));
         Assert.assertArrayEquals("test".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key5"));
-        runner.shutdown();
+        putIgniteCache.getIgniteCache().remove("key5");
     }
 
     @Test
@@ -115,6 +117,7 @@ public class ITPutIgniteCache {
         out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0");
 
         out.assertContentEquals("test1".getBytes());
+        System.out.println("value was " + new String(putIgniteCache.getIgniteCache().get("key51")));
         Assert.assertArrayEquals("test1".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key51"));
 
         final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(1);
@@ -126,7 +129,67 @@ public class ITPutIgniteCache {
 
         out2.assertContentEquals("test2".getBytes());
         Assert.assertArrayEquals("test2".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key52"));
+        putIgniteCache.getIgniteCache().remove("key52");
+        putIgniteCache.getIgniteCache().remove("key51");
+
+    }
+
+    @Test
+    public void testPutIgniteCacheOnTriggerNoConfigurationTwoFlowFileStopAndStart2Times() throws IOException, InterruptedException {
+        runner = TestRunners.newTestRunner(putIgniteCache);
+        runner.setProperty(PutIgniteCache.BATCH_SIZE, "5");
+        runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME);
+        runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1");
+        runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+        runner.assertValid();
+        properties1.put("igniteKey", "key51");
+        runner.enqueue("test1".getBytes(),properties1);
+        properties2.put("igniteKey", "key52");
+        runner.enqueue("test2".getBytes(),properties2);
+        runner.run(1, false, true);
+        putIgniteCache.getIgniteCache().remove("key51");
+        putIgniteCache.getIgniteCache().remove("key52");
+
+        runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2);
+        putIgniteCache.getIgniteCache().remove("key52");
+        putIgniteCache.getIgniteCache().remove("key52");
+
+        // Close and restart first time
+        putIgniteCache.closeIgniteDataStreamer();
+
+        runner.clearTransferState();
+
+        putIgniteCache.initilizeIgniteDataStreamer(runner.getProcessContext());
+
+        runner.assertValid();
+        properties1.put("igniteKey", "key51");
+        runner.enqueue("test1".getBytes(),properties1);
+        properties2.put("igniteKey", "key52");
+        runner.enqueue("test2".getBytes(),properties2);
+        runner.run(1, false, true);
+
+        runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2);
+        putIgniteCache.getIgniteCache().remove("key51");
+        putIgniteCache.getIgniteCache().remove("key52");
+
+        // Close and restart second time
+        putIgniteCache.closeIgniteDataStreamer();
+
+        runner.clearTransferState();
+
+        putIgniteCache.initilizeIgniteDataStreamer(runner.getProcessContext());
+
+        runner.assertValid();
+        properties1.put("igniteKey", "key51");
+        runner.enqueue("test1".getBytes(),properties1);
+        properties2.put("igniteKey", "key52");
+        runner.enqueue("test2".getBytes(),properties2);
+        runner.run(1, false, true);
+
+        runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2);
+        putIgniteCache.getIgniteCache().remove("key52");
+        putIgniteCache.getIgniteCache().remove("key51");
 
-        runner.shutdown();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestGetIgniteCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestGetIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestGetIgniteCache.java
new file mode 100644
index 0000000..8692383
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestGetIgniteCache.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.ignite.cache;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestGetIgniteCache {
+
+    private static final String CACHE_NAME = "testCache";
+    private TestRunner getRunner;
+    private GetIgniteCache getIgniteCache;
+    private Map<String,String> properties1;
+    private Map<String,String> properties2;
+    private static Ignite ignite;
+
+    @BeforeClass
+    public static void setUpClass() {
+        ignite = Ignition.start("test-ignite.xml");
+
+    }
+
+    @AfterClass
+    public static void tearDownClass() {
+        ignite.close();
+        Ignition.stop(true);
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        getIgniteCache = new GetIgniteCache() {
+            @Override
+            protected Ignite getIgnite() {
+                return TestGetIgniteCache.ignite;
+            }
+
+        };
+
+        properties1 = new HashMap<String,String>();
+        properties1.put("igniteKey", "key1");
+        properties2 = new HashMap<String,String>();
+        properties2.put("igniteKey", "key2");
+
+    }
+
+    @After
+    public void teardown() {
+        getRunner = null;
+        ignite.destroyCache(CACHE_NAME);
+    }
+
+    @Test
+    public void testGetIgniteCacheDefaultConfOneFlowFileWithPlainKey() throws IOException, InterruptedException {
+
+        getRunner = TestRunners.newTestRunner(getIgniteCache);
+        getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "mykey");
+
+        getRunner.assertValid();
+        getRunner.enqueue(new byte[] {});
+
+        getIgniteCache.initialize(getRunner.getProcessContext());
+
+        getIgniteCache.getIgniteCache().put("mykey", "test".getBytes());
+
+        getRunner.run(1, false, true);
+
+        getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 1);
+        List<MockFlowFile> getSucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+        assertEquals(1, getSucessfulFlowFiles.size());
+        List<MockFlowFile> getFailureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+        assertEquals(0, getFailureFlowFiles.size());
+
+        final MockFlowFile getOut = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
+        getOut.assertContentEquals("test".getBytes());
+
+        getRunner.shutdown();
+    }
+
+    @Test
+    public void testGetIgniteCacheNullGetCacheThrowsException() throws IOException, InterruptedException {
+
+        getIgniteCache = new GetIgniteCache() {
+            @Override
+            protected Ignite getIgnite() {
+                return TestGetIgniteCache.ignite;
+            }
+
+            @Override
+            protected IgniteCache<String, byte[]> getIgniteCache() {
+                return null;
+            }
+
+        };
+        getRunner = TestRunners.newTestRunner(getIgniteCache);
+        getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "mykey");
+
+        getRunner.assertValid();
+        getRunner.enqueue(new byte[] {});
+
+        getIgniteCache.initialize(getRunner.getProcessContext());
+
+        getRunner.run(1, false, true);
+
+        getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_FAILURE, 1);
+        List<MockFlowFile> getSucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+        assertEquals(0, getSucessfulFlowFiles.size());
+        List<MockFlowFile> getFailureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+        assertEquals(1, getFailureFlowFiles.size());
+
+        final MockFlowFile getOut = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
+        getOut.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY,
+            GetIgniteCache.IGNITE_GET_FAILED_MESSAGE_PREFIX + "java.lang.NullPointerException");
+
+        getRunner.shutdown();
+    }
+
+    @Test
+    public void testGetIgniteCacheDefaultConfOneFlowFileWithKeyExpression() throws IOException, InterruptedException {
+
+        getRunner = TestRunners.newTestRunner(getIgniteCache);
+        getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
+        getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+        getRunner.assertValid();
+        getRunner.enqueue("".getBytes(),properties1);
+
+        getIgniteCache.initialize(getRunner.getProcessContext());
+
+        getIgniteCache.getIgniteCache().put("key1", "test".getBytes());
+
+        getRunner.run(1, false, true);
+
+        getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 1);
+        List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+        assertEquals(1, sucessfulFlowFiles.size());
+        List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+        assertEquals(0, failureFlowFiles.size());
+
+        final MockFlowFile out = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
+
+        out.assertContentEquals("test".getBytes());
+        getRunner.shutdown();
+    }
+
+    @Test
+    public void testGetIgniteCacheDefaultConfTwoFlowFilesWithExpressionKeys() throws IOException, InterruptedException {
+
+        getRunner = TestRunners.newTestRunner(getIgniteCache);
+        getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
+        getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+        getRunner.assertValid();
+        getRunner.enqueue("".getBytes(),properties1);
+        getRunner.enqueue("".getBytes(),properties2);
+
+        getIgniteCache.initialize(getRunner.getProcessContext());
+
+        getIgniteCache.getIgniteCache().put("key1", "test1".getBytes());
+        getIgniteCache.getIgniteCache().put("key2", "test2".getBytes());
+
+        getRunner.run(2, false, true);
+
+        getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2);
+
+        List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+        assertEquals(2, sucessfulFlowFiles.size());
+        List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+        assertEquals(0, failureFlowFiles.size());
+
+        final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
+
+        out1.assertContentEquals("test1".getBytes());
+        Assert.assertEquals("test1",new String(getIgniteCache.getIgniteCache().get("key1")));
+
+        final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(1);
+
+        out2.assertContentEquals("test2".getBytes());
+
+        Assert.assertArrayEquals("test2".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key2"));
+
+        getRunner.shutdown();
+    }
+
+    @Test
+    public void testGetIgniteCacheDefaultConfOneFlowFileNoKey() throws IOException, InterruptedException {
+
+        getRunner = TestRunners.newTestRunner(getIgniteCache);
+        getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+        getRunner.assertValid();
+        properties1.clear();
+        getRunner.enqueue("".getBytes(),properties1);
+        getIgniteCache.initialize(getRunner.getProcessContext());
+
+        getRunner.run(1, false, true);
+
+        getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_FAILURE, 1);
+        List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+        assertEquals(0, sucessfulFlowFiles.size());
+        List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+        assertEquals(1, failureFlowFiles.size());
+
+        final MockFlowFile out = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
+
+        out.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
+
+        getRunner.shutdown();
+    }
+
+
+
+    @Test
+    public void testGetIgniteCacheDefaultConfTwoFlowFilesNoKey() throws IOException, InterruptedException {
+
+        getRunner = TestRunners.newTestRunner(getIgniteCache);
+        getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+        getRunner.assertValid();
+
+        properties1.clear();
+        getRunner.enqueue("".getBytes(),properties1);
+        getRunner.enqueue("".getBytes(),properties1);
+
+        getIgniteCache.initialize(getRunner.getProcessContext());
+
+        getRunner.run(2, false, true);
+
+        getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_FAILURE, 2);
+        List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+        assertEquals(0, sucessfulFlowFiles.size());
+        List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+        assertEquals(2, failureFlowFiles.size());
+
+        final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
+        out1.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
+        final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(1);
+        out2.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
+
+        getRunner.shutdown();
+
+    }
+
+    @Test
+    public void testGetIgniteCacheDefaultConfTwoFlowFileFirstNoKey() throws IOException, InterruptedException {
+
+        getRunner = TestRunners.newTestRunner(getIgniteCache);
+        getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
+        getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+        getRunner.assertValid();
+        getRunner.enqueue("".getBytes());
+        getRunner.enqueue("".getBytes(),properties2);
+        getIgniteCache.initialize(getRunner.getProcessContext());
+        getIgniteCache.getIgniteCache().put("key2", "test2".getBytes());
+
+        getRunner.run(2, false, true);
+
+        List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+        assertEquals(1, sucessfulFlowFiles.size());
+        List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+        assertEquals(1, failureFlowFiles.size());
+
+        final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
+
+        out1.assertContentEquals("".getBytes());
+        out1.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
+
+        final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
+
+        out2.assertContentEquals("test2".getBytes());
+        Assert.assertArrayEquals("test2".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key2"));
+
+        getRunner.shutdown();
+    }
+
+    @Test
+    public void testGetIgniteCacheDefaultConfTwoFlowFileSecondNoKey() throws IOException, InterruptedException {
+
+        getRunner = TestRunners.newTestRunner(getIgniteCache);
+        getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
+        getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+        getRunner.assertValid();
+        getRunner.enqueue("".getBytes(),properties1);
+        getRunner.enqueue("".getBytes());
+        getIgniteCache.initialize(getRunner.getProcessContext());
+
+        getIgniteCache.getIgniteCache().put("key1", "test1".getBytes());
+        getRunner.run(2, false, true);
+
+        List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+        assertEquals(1, sucessfulFlowFiles.size());
+        List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+        assertEquals(1, failureFlowFiles.size());
+
+        final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
+
+        out1.assertContentEquals("".getBytes());
+        out1.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
+
+        final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
+
+        out2.assertContentEquals("test1".getBytes());
+        Assert.assertArrayEquals("test1".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key1"));
+
+        getRunner.shutdown();
+
+    }
+
+
+    @Test
+    public void testGetIgniteCacheDefaultConfThreeFlowFilesOneOkSecondOkThirdNoExpressionKey() throws IOException, InterruptedException {
+
+        getRunner = TestRunners.newTestRunner(getIgniteCache);
+        getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
+        getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+        getRunner.assertValid();
+        getRunner.enqueue("".getBytes(),properties1);
+        getRunner.enqueue("".getBytes(),properties2);
+        getRunner.enqueue("".getBytes());
+        getIgniteCache.initialize(getRunner.getProcessContext());
+
+        getIgniteCache.getIgniteCache().put("key1", "test1".getBytes());
+        getIgniteCache.getIgniteCache().put("key2", "test2".getBytes());
+        getRunner.run(3, false, true);
+
+        List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+        assertEquals(2, sucessfulFlowFiles.size());
+        List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+        assertEquals(1, failureFlowFiles.size());
+
+        final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
+
+        out1.assertContentEquals("".getBytes());
+        out1.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
+
+        final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
+
+        out2.assertContentEquals("test1".getBytes());
+        Assert.assertArrayEquals("test1".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key1"));
+
+        final MockFlowFile out3 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(1);
+
+        out3.assertContentEquals("test2".getBytes());
+        Assert.assertArrayEquals("test2".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key2"));
+
+        getRunner.shutdown();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java
index 5639795..4d7e5c6 100644
--- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java
@@ -47,13 +47,18 @@ public class TestPutIgniteCache {
 
     @BeforeClass
     public static void setUpClass() {
-        ignite = Ignition.start("test-ignite.xml");
+        List<Ignite> grids = Ignition.allGrids();
+        if ( grids.size() == 1 )
+            ignite = grids.get(0);
+        else
+            ignite = Ignition.start("test-ignite.xml");
 
     }
 
     @AfterClass
     public static void tearDownClass() {
-        ignite.close();
+        if ( ignite != null )
+            ignite.close();
         Ignition.stop(true);
     }