You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2016/10/05 20:31:18 UTC
nifi git commit: NIFI-2398 - GetIgnite processor
Repository: nifi
Updated Branches:
refs/heads/master e46fea920 -> da33e2859
NIFI-2398 - GetIgnite processor
This closes #721.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/da33e285
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/da33e285
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/da33e285
Branch: refs/heads/master
Commit: da33e2859ce45321d28901e5820c38a37dcfc709
Parents: e46fea9
Author: mans2singh <ma...@yahoo.com>
Authored: Sun Jul 24 16:18:55 2016 -0700
Committer: Pierre Villard <pi...@gmail.com>
Committed: Wed Oct 5 22:31:01 2016 +0200
----------------------------------------------------------------------
.../ignite/AbstractIgniteProcessor.java | 30 +-
.../cache/AbstractIgniteCacheProcessor.java | 35 +-
.../processors/ignite/cache/GetIgniteCache.java | 117 ++++++
.../processors/ignite/cache/PutIgniteCache.java | 41 +-
.../org.apache.nifi.processor.Processor | 1 +
.../ignite/cache/ITGetIgniteCache.java | 184 +++++++++
.../ignite/cache/ITPutIgniteCache.java | 67 +++-
.../ignite/cache/TestGetIgniteCache.java | 383 +++++++++++++++++++
.../ignite/cache/TestPutIgniteCache.java | 9 +-
9 files changed, 817 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java
index 1feaf83..28b7b52 100644
--- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java
@@ -18,6 +18,8 @@
*/
package org.apache.nifi.processors.ignite;
+import java.util.List;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
@@ -88,20 +90,28 @@ public abstract class AbstractIgniteProcessor extends AbstractProcessor {
public void initializeIgnite(ProcessContext context) {
if ( getIgnite() != null ) {
- getLogger().warn("Ignite already initialized");
+ getLogger().info("Ignite already initialized");
return;
}
- Ignition.setClientMode(true);
- String configuration = context.getProperty(IGNITE_CONFIGURATION_FILE).getValue();
- getLogger().info("Initializing ignite with configuration {} ", new Object[] { configuration });
- if ( StringUtils.isEmpty(configuration) ) {
- ignite = Ignition.start();
- } else {
- ignite = Ignition.start(configuration);
- }
+ synchronized(Ignition.class) {
+ List<Ignite> grids = Ignition.allGrids();
- }
+ if ( grids.size() == 1 ) {
+ getLogger().info("Ignite grid already available");
+ ignite = grids.get(0);
+ return;
+ }
+ Ignition.setClientMode(true);
+ String configuration = context.getProperty(IGNITE_CONFIGURATION_FILE).getValue();
+ getLogger().info("Initializing ignite with configuration {} ", new Object[] { configuration });
+ if ( StringUtils.isEmpty(configuration) ) {
+ ignite = Ignition.start();
+ } else {
+ ignite = Ignition.start(configuration);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java
index 8e1a7cb..ca6136c 100644
--- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java
@@ -18,12 +18,12 @@ package org.apache.nifi.processors.ignite.cache;
import java.util.Collections;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import org.apache.ignite.IgniteCache;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
@@ -47,9 +47,17 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess
.build();
/**
- * Property descriptors
+ * The Ignite cache key attribute
*/
- protected static List<PropertyDescriptor> descriptors;
+ public static final PropertyDescriptor IGNITE_CACHE_ENTRY_KEY = new PropertyDescriptor.Builder()
+ .displayName("Ignite Cache Entry Identifier")
+ .name("ignite-cache-entry-identifier")
+ .description("A FlowFile attribute, or attribute expression used " +
+ "for determining Ignite cache key for the Flow File content")
+ .required(true)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
+ .expressionLanguageSupported(true)
+ .build();
/**
* Relations
@@ -57,16 +65,19 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess
protected static Set<Relationship> relationships;
/**
- * Ignite cache instance
+ * Ignite cache name
*/
- private transient IgniteCache<String,byte[]> igniteCache;
+ private String cacheName;
/**
* Get ignite cache instance
* @return ignite cache instance
*/
protected IgniteCache<String, byte[]> getIgniteCache() {
- return igniteCache;
+ if ( getIgnite() == null )
+ return null;
+ else
+ return getIgnite().getOrCreateCache(cacheName);
}
static {
@@ -96,8 +107,7 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess
super.initializeIgnite(context);
}
- String cacheName = context.getProperty(CACHE_NAME).getValue();
- igniteCache = getIgnite().getOrCreateCache(cacheName);
+ cacheName = context.getProperty(CACHE_NAME).getValue();
} catch (Exception e) {
getLogger().error("Failed to initialize ignite cache due to {}", new Object[] { e }, e);
@@ -108,12 +118,11 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess
/**
* Close Ignite cache instance and calls base class closeIgnite
*/
- @OnStopped
+ @OnShutdown
public void closeIgniteCache() {
- if (igniteCache != null) {
+ if (getIgniteCache() != null) {
getLogger().info("Closing ignite cache");
- igniteCache.close();
- igniteCache = null;
+ getIgniteCache().close();
}
super.closeIgnite();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/GetIgniteCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/GetIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/GetIgniteCache.java
new file mode 100644
index 0000000..77fc055
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/GetIgniteCache.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.ignite.cache;
+
+import java.io.ByteArrayInputStream;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+/**
+ * Get cache processors which gets byte array for the key from Ignite cache and set the array
+ * as the FlowFile content.
+ */
+@EventDriven
+@SupportsBatching
+@Tags({ "Ignite", "get", "read", "cache", "key" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Get the byte array from Ignite Cache and adds it as the content of a FlowFile." +
+ "The processor uses the value of FlowFile attribute (Ignite cache entry key) as the cache key lookup. " +
+ "If the entry corresponding to the key is not found in the cache an error message is associated with the FlowFile " +
+ "Note - The Ignite Kernel periodically outputs node performance statistics to the logs. This message " +
+ " can be turned off by setting the log level for logger 'org.apache.ignite' to WARN in the logback.xml configuration file.")
+@WritesAttributes({
+ @WritesAttribute(attribute = GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, description = "The reason for getting entry from cache"),
+ })
+@SeeAlso({PutIgniteCache.class})
+public class GetIgniteCache extends AbstractIgniteCacheProcessor {
+
+ /** Flow file attribute keys and messages */
+ public static final String IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY = "ignite.cache.get.failed.reason";
+ public static final String IGNITE_GET_FAILED_MISSING_KEY_MESSAGE = "The FlowFile key attribute was missing";
+ public static final String IGNITE_GET_FAILED_MISSING_ENTRY_MESSAGE = "The cache byte array entry was null or zero length";
+ public static final String IGNITE_GET_FAILED_MESSAGE_PREFIX = "The cache request failed because of ";
+
+ /**
+ * Property descriptors
+ */
+ protected static final List<PropertyDescriptor> descriptors =
+ Arrays.asList(IGNITE_CONFIGURATION_FILE, CACHE_NAME, IGNITE_CACHE_ENTRY_KEY);
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ @OnScheduled
+ public final void initialize(ProcessContext context) throws ProcessException {
+ super.initializeIgniteCache(context);
+ }
+
+ /**
+ * Handle flow file and gets the entry from the cache based on the key attribute
+ */
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+
+ if (flowFile == null) {
+ return;
+ }
+
+ String key = context.getProperty(IGNITE_CACHE_ENTRY_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ if ( StringUtils.isEmpty(key) ) {
+ flowFile = session.putAttribute(flowFile, IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
+ session.transfer(flowFile, REL_FAILURE);
+ } else {
+ try {
+ byte [] value = getIgniteCache().get(key);
+ if ( value == null || value.length == 0 ) {
+ flowFile = session.putAttribute(flowFile, IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY,
+ IGNITE_GET_FAILED_MISSING_ENTRY_MESSAGE);
+ session.transfer(flowFile, REL_FAILURE);
+ } else {
+ ByteArrayInputStream bais = new ByteArrayInputStream(value);
+ flowFile = session.importFrom(bais, flowFile);
+ session.transfer(flowFile,REL_SUCCESS);
+ }
+ } catch(Exception exception) {
+ flowFile = session.putAttribute(flowFile, IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY,
+ IGNITE_GET_FAILED_MESSAGE_PREFIX + exception);
+ getLogger().error("Failed to get value for key {} from IgniteDB due to {}", new Object[] { key, exception }, exception);
+ session.transfer(flowFile, REL_FAILURE);
+ context.yield();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java
index 316ed8f..2d1471e 100644
--- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.AbstractMap;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -34,12 +35,13 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -71,6 +73,7 @@ import org.apache.nifi.stream.io.StreamUtils;
@WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, description = "The total number of failed FlowFiles in the batch"),
@WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, description = "The failed reason attribute key")
})
+@SeeAlso({GetIgniteCache.class})
public class PutIgniteCache extends AbstractIgniteCacheProcessor {
/**
@@ -138,16 +141,6 @@ public class PutIgniteCache extends AbstractIgniteCacheProcessor {
.sensitive(false)
.build();
- public static final PropertyDescriptor IGNITE_CACHE_ENTRY_KEY = new PropertyDescriptor.Builder()
- .displayName("Ignite Cache Entry Identifier")
- .name("ignite-cache-entry-identifier")
- .description("A FlowFile attribute, or attribute expression used " +
- "for determining Ignite cache key for the Flow File content")
- .required(true)
- .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
- .expressionLanguageSupported(true)
- .build();
-
/** Flow file attribute keys and messages */
public static final String IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT = "ignite.cache.batch.flow.file.total.count";
public static final String IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER = "ignite.cache.batch.flow.file.item.number";
@@ -160,17 +153,15 @@ public class PutIgniteCache extends AbstractIgniteCacheProcessor {
public static final String IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE = "The FlowFile key attribute was missing";
public static final String IGNITE_BATCH_FLOW_FILE_FAILED_ZERO_SIZE_MESSAGE = "The FlowFile size was zero";
- static {
- descriptors = new ArrayList<>();
- descriptors.add(IGNITE_CONFIGURATION_FILE);
- descriptors.add(CACHE_NAME);
- descriptors.add(BATCH_SIZE);
- descriptors.add(IGNITE_CACHE_ENTRY_KEY);
- descriptors.add(DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS);
- descriptors.add(DATA_STREAMER_PER_NODE_BUFFER_SIZE);
- descriptors.add(DATA_STREAMER_AUTO_FLUSH_FREQUENCY);
- descriptors.add(DATA_STREAMER_ALLOW_OVERRIDE);
- }
+ /**
+ * Property descriptors
+ */
+ protected static final List<PropertyDescriptor> descriptors =
+ Arrays.asList(IGNITE_CONFIGURATION_FILE,CACHE_NAME,BATCH_SIZE,
+ IGNITE_CACHE_ENTRY_KEY,
+ DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS,
+ DATA_STREAMER_PER_NODE_BUFFER_SIZE,
+ DATA_STREAMER_AUTO_FLUSH_FREQUENCY,DATA_STREAMER_ALLOW_OVERRIDE);
/**
* Data streamer instance
@@ -190,9 +181,13 @@ public class PutIgniteCache extends AbstractIgniteCacheProcessor {
if (igniteDataStreamer != null) {
getLogger().info("Closing ignite data streamer");
igniteDataStreamer.flush();
- igniteDataStreamer.close();
igniteDataStreamer = null;
}
+ }
+
+ @OnShutdown
+ public final void closeIgniteDataStreamerAndCache() {
+ closeIgniteDataStreamer();
super.closeIgniteCache();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index d1da921..931f94f 100644
--- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,3 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.ignite.cache.PutIgniteCache
+org.apache.nifi.processors.ignite.cache.GetIgniteCache
http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITGetIgniteCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITGetIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITGetIgniteCache.java
new file mode 100644
index 0000000..3adf407
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITGetIgniteCache.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.ignite.cache;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ITGetIgniteCache {
+
+ private static final String CACHE_NAME = "testCache";
+ private static TestRunner runner;
+ private static GetIgniteCache getIgniteCache;
+ private static Map<String,String> properties1;
+ private static HashMap<String, String> properties2;
+
+ @BeforeClass
+ public static void setUp() throws IOException {
+ getIgniteCache = new GetIgniteCache();
+ properties1 = new HashMap<String,String>();
+ properties2 = new HashMap<String,String>();
+ }
+
+ @AfterClass
+ public static void teardown() {
+ runner = null;
+ IgniteCache<String, byte[]> cache = getIgniteCache.getIgniteCache();
+ if (cache != null )
+ cache.destroy();
+ getIgniteCache = null;
+ }
+
+ @Test
+ public void testgetIgniteCacheOnTriggerFileConfigurationOneFlowFile() throws IOException, InterruptedException {
+ runner = TestRunners.newTestRunner(getIgniteCache);
+ runner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
+ runner.setProperty(GetIgniteCache.IGNITE_CONFIGURATION_FILE,
+ "file:///" + new File(".").getAbsolutePath() + "/src/test/resources/test-ignite.xml");
+ runner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+ runner.assertValid();
+ properties1.put("igniteKey", "key5");
+ runner.enqueue("test5".getBytes(),properties1);
+
+ getIgniteCache.initialize(runner.getProcessContext());
+
+ getIgniteCache.getIgniteCache().put("key5", "test5".getBytes());
+ runner.run(1, false, true);
+
+ runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 1);
+ List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+ assertEquals(1, sucessfulFlowFiles.size());
+ List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+ assertEquals(0, failureFlowFiles.size());
+
+ final MockFlowFile out = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
+
+ out.assertContentEquals("test5".getBytes());
+ Assert.assertArrayEquals("test5".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key5"));
+ runner.shutdown();
+ }
+
+ @Test
+ public void testgetIgniteCacheOnTriggerNoConfigurationTwoFlowFile() throws IOException, InterruptedException {
+ runner = TestRunners.newTestRunner(getIgniteCache);
+ runner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
+ runner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+ runner.assertValid();
+ properties1.put("igniteKey", "key51");
+ runner.enqueue("test1".getBytes(),properties1);
+ properties2.put("igniteKey", "key52");
+ runner.enqueue("test2".getBytes(),properties2);
+ getIgniteCache.initialize(runner.getProcessContext());
+
+ getIgniteCache.getIgniteCache().put("key51", "test51".getBytes());
+ getIgniteCache.getIgniteCache().put("key52", "test52".getBytes());
+ runner.run(2, false, true);
+
+ runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2);
+ List<MockFlowFile> sucessfulFlowFiles = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+ assertEquals(2, sucessfulFlowFiles.size());
+ List<MockFlowFile> failureFlowFiles = runner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+ assertEquals(0, failureFlowFiles.size());
+
+ final MockFlowFile out = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
+
+ out.assertContentEquals("test51".getBytes());
+ Assert.assertArrayEquals("test51".getBytes(),
+ (byte[])getIgniteCache.getIgniteCache().get("key51"));
+
+ final MockFlowFile out2 = runner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(1);
+
+ out2.assertContentEquals("test52".getBytes());
+ Assert.assertArrayEquals("test52".getBytes(),
+ (byte[])getIgniteCache.getIgniteCache().get("key52"));
+
+ runner.shutdown();
+ }
+
+ @Test
+ public void testgetIgniteCacheOnTriggerNoConfigurationTwoFlowFileStopStart2Times() throws IOException, InterruptedException {
+ runner = TestRunners.newTestRunner(getIgniteCache);
+ runner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
+ runner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+ runner.assertValid();
+ properties1.put("igniteKey", "key51");
+ runner.enqueue("test1".getBytes(),properties1);
+ properties2.put("igniteKey", "key52");
+ runner.enqueue("test2".getBytes(),properties2);
+ getIgniteCache.initialize(runner.getProcessContext());
+
+ getIgniteCache.getIgniteCache().put("key51", "test51".getBytes());
+ getIgniteCache.getIgniteCache().put("key52", "test52".getBytes());
+ runner.run(2, false, true);
+
+ runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2);
+
+ getIgniteCache.closeIgniteCache();
+
+ runner.clearTransferState();
+
+ // reinit and check first time
+ runner.assertValid();
+ properties1.put("igniteKey", "key51");
+ runner.enqueue("test1".getBytes(),properties1);
+ properties2.put("igniteKey", "key52");
+ runner.enqueue("test2".getBytes(),properties2);
+ getIgniteCache.initialize(runner.getProcessContext());
+
+ runner.run(2, false, true);
+
+ runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2);
+
+ getIgniteCache.closeIgniteCache();
+
+ runner.clearTransferState();
+
+ // reinit and check second time
+ runner.assertValid();
+ properties1.put("igniteKey", "key51");
+ runner.enqueue("test1".getBytes(),properties1);
+ properties2.put("igniteKey", "key52");
+ runner.enqueue("test2".getBytes(),properties2);
+ getIgniteCache.initialize(runner.getProcessContext());
+
+ runner.run(2, false, true);
+
+ runner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2);
+
+ getIgniteCache.closeIgniteCache();
+
+ runner.clearTransferState();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java
index e8a469e..7c67536 100644
--- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java
@@ -50,6 +50,7 @@ public class ITPutIgniteCache {
@AfterClass
public static void teardown() {
runner = null;
+ putIgniteCache.getIgniteCache().destroy();
putIgniteCache = null;
}
@@ -82,8 +83,9 @@ public class ITPutIgniteCache {
out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0");
out.assertContentEquals("test".getBytes());
+ System.out.println("Value was: " + new String((byte[])putIgniteCache.getIgniteCache().get("key5")));
Assert.assertArrayEquals("test".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key5"));
- runner.shutdown();
+ putIgniteCache.getIgniteCache().remove("key5");
}
@Test
@@ -115,6 +117,7 @@ public class ITPutIgniteCache {
out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0");
out.assertContentEquals("test1".getBytes());
+ System.out.println("value was " + new String(putIgniteCache.getIgniteCache().get("key51")));
Assert.assertArrayEquals("test1".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key51"));
final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(1);
@@ -126,7 +129,67 @@ public class ITPutIgniteCache {
out2.assertContentEquals("test2".getBytes());
Assert.assertArrayEquals("test2".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key52"));
+ putIgniteCache.getIgniteCache().remove("key52");
+ putIgniteCache.getIgniteCache().remove("key51");
+
+ }
+
+ @Test
+ public void testPutIgniteCacheOnTriggerNoConfigurationTwoFlowFileStopAndStart2Times() throws IOException, InterruptedException {
+ runner = TestRunners.newTestRunner(putIgniteCache);
+ runner.setProperty(PutIgniteCache.BATCH_SIZE, "5");
+ runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME);
+ runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1");
+ runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+ runner.assertValid();
+ properties1.put("igniteKey", "key51");
+ runner.enqueue("test1".getBytes(),properties1);
+ properties2.put("igniteKey", "key52");
+ runner.enqueue("test2".getBytes(),properties2);
+ runner.run(1, false, true);
+ putIgniteCache.getIgniteCache().remove("key51");
+ putIgniteCache.getIgniteCache().remove("key52");
+
+ runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2);
+ putIgniteCache.getIgniteCache().remove("key52");
+ putIgniteCache.getIgniteCache().remove("key52");
+
+ // Close and restart first time
+ putIgniteCache.closeIgniteDataStreamer();
+
+ runner.clearTransferState();
+
+ putIgniteCache.initilizeIgniteDataStreamer(runner.getProcessContext());
+
+ runner.assertValid();
+ properties1.put("igniteKey", "key51");
+ runner.enqueue("test1".getBytes(),properties1);
+ properties2.put("igniteKey", "key52");
+ runner.enqueue("test2".getBytes(),properties2);
+ runner.run(1, false, true);
+
+ runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2);
+ putIgniteCache.getIgniteCache().remove("key51");
+ putIgniteCache.getIgniteCache().remove("key52");
+
+ // Close and restart second time
+ putIgniteCache.closeIgniteDataStreamer();
+
+ runner.clearTransferState();
+
+ putIgniteCache.initilizeIgniteDataStreamer(runner.getProcessContext());
+
+ runner.assertValid();
+ properties1.put("igniteKey", "key51");
+ runner.enqueue("test1".getBytes(),properties1);
+ properties2.put("igniteKey", "key52");
+ runner.enqueue("test2".getBytes(),properties2);
+ runner.run(1, false, true);
+
+ runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2);
+ putIgniteCache.getIgniteCache().remove("key52");
+ putIgniteCache.getIgniteCache().remove("key51");
- runner.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestGetIgniteCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestGetIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestGetIgniteCache.java
new file mode 100644
index 0000000..8692383
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestGetIgniteCache.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.ignite.cache;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestGetIgniteCache {
+
+ private static final String CACHE_NAME = "testCache";
+ private TestRunner getRunner;
+ private GetIgniteCache getIgniteCache;
+ private Map<String,String> properties1;
+ private Map<String,String> properties2;
+ private static Ignite ignite;
+
+ @BeforeClass
+ public static void setUpClass() {
+ ignite = Ignition.start("test-ignite.xml");
+
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ ignite.close();
+ Ignition.stop(true);
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ getIgniteCache = new GetIgniteCache() {
+ @Override
+ protected Ignite getIgnite() {
+ return TestGetIgniteCache.ignite;
+ }
+
+ };
+
+ properties1 = new HashMap<String,String>();
+ properties1.put("igniteKey", "key1");
+ properties2 = new HashMap<String,String>();
+ properties2.put("igniteKey", "key2");
+
+ }
+
+ @After
+ public void teardown() {
+ getRunner = null;
+ ignite.destroyCache(CACHE_NAME);
+ }
+
+ @Test
+ public void testGetIgniteCacheDefaultConfOneFlowFileWithPlainKey() throws IOException, InterruptedException {
+
+ getRunner = TestRunners.newTestRunner(getIgniteCache);
+ getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "mykey");
+
+ getRunner.assertValid();
+ getRunner.enqueue(new byte[] {});
+
+ getIgniteCache.initialize(getRunner.getProcessContext());
+
+ getIgniteCache.getIgniteCache().put("mykey", "test".getBytes());
+
+ getRunner.run(1, false, true);
+
+ getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 1);
+ List<MockFlowFile> getSucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+ assertEquals(1, getSucessfulFlowFiles.size());
+ List<MockFlowFile> getFailureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+ assertEquals(0, getFailureFlowFiles.size());
+
+ final MockFlowFile getOut = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
+ getOut.assertContentEquals("test".getBytes());
+
+ getRunner.shutdown();
+ }
+
+ @Test
+ public void testGetIgniteCacheNullGetCacheThrowsException() throws IOException, InterruptedException {
+
+ getIgniteCache = new GetIgniteCache() {
+ @Override
+ protected Ignite getIgnite() {
+ return TestGetIgniteCache.ignite;
+ }
+
+ @Override
+ protected IgniteCache<String, byte[]> getIgniteCache() {
+ return null;
+ }
+
+ };
+ getRunner = TestRunners.newTestRunner(getIgniteCache);
+ getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "mykey");
+
+ getRunner.assertValid();
+ getRunner.enqueue(new byte[] {});
+
+ getIgniteCache.initialize(getRunner.getProcessContext());
+
+ getRunner.run(1, false, true);
+
+ getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_FAILURE, 1);
+ List<MockFlowFile> getSucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+ assertEquals(0, getSucessfulFlowFiles.size());
+ List<MockFlowFile> getFailureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+ assertEquals(1, getFailureFlowFiles.size());
+
+ final MockFlowFile getOut = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
+ getOut.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY,
+ GetIgniteCache.IGNITE_GET_FAILED_MESSAGE_PREFIX + "java.lang.NullPointerException");
+
+ getRunner.shutdown();
+ }
+
+ @Test
+ public void testGetIgniteCacheDefaultConfOneFlowFileWithKeyExpression() throws IOException, InterruptedException {
+
+ getRunner = TestRunners.newTestRunner(getIgniteCache);
+ getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
+ getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+ getRunner.assertValid();
+ getRunner.enqueue("".getBytes(),properties1);
+
+ getIgniteCache.initialize(getRunner.getProcessContext());
+
+ getIgniteCache.getIgniteCache().put("key1", "test".getBytes());
+
+ getRunner.run(1, false, true);
+
+ getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 1);
+ List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+ assertEquals(1, sucessfulFlowFiles.size());
+ List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+ assertEquals(0, failureFlowFiles.size());
+
+ final MockFlowFile out = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
+
+ out.assertContentEquals("test".getBytes());
+ getRunner.shutdown();
+ }
+
+ @Test
+ public void testGetIgniteCacheDefaultConfTwoFlowFilesWithExpressionKeys() throws IOException, InterruptedException {
+
+ getRunner = TestRunners.newTestRunner(getIgniteCache);
+ getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
+ getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+ getRunner.assertValid();
+ getRunner.enqueue("".getBytes(),properties1);
+ getRunner.enqueue("".getBytes(),properties2);
+
+ getIgniteCache.initialize(getRunner.getProcessContext());
+
+ getIgniteCache.getIgniteCache().put("key1", "test1".getBytes());
+ getIgniteCache.getIgniteCache().put("key2", "test2".getBytes());
+
+ getRunner.run(2, false, true);
+
+ getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 2);
+
+ List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+ assertEquals(2, sucessfulFlowFiles.size());
+ List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+ assertEquals(0, failureFlowFiles.size());
+
+ final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
+
+ out1.assertContentEquals("test1".getBytes());
+ Assert.assertEquals("test1",new String(getIgniteCache.getIgniteCache().get("key1")));
+
+ final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(1);
+
+ out2.assertContentEquals("test2".getBytes());
+
+ Assert.assertArrayEquals("test2".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key2"));
+
+ getRunner.shutdown();
+ }
+
+ @Test
+ public void testGetIgniteCacheDefaultConfOneFlowFileNoKey() throws IOException, InterruptedException {
+
+ getRunner = TestRunners.newTestRunner(getIgniteCache);
+ getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+ getRunner.assertValid();
+ properties1.clear();
+ getRunner.enqueue("".getBytes(),properties1);
+ getIgniteCache.initialize(getRunner.getProcessContext());
+
+ getRunner.run(1, false, true);
+
+ getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_FAILURE, 1);
+ List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+ assertEquals(0, sucessfulFlowFiles.size());
+ List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+ assertEquals(1, failureFlowFiles.size());
+
+ final MockFlowFile out = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
+
+ out.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
+
+ getRunner.shutdown();
+ }
+
+
+
+ @Test
+ public void testGetIgniteCacheDefaultConfTwoFlowFilesNoKey() throws IOException, InterruptedException {
+
+ getRunner = TestRunners.newTestRunner(getIgniteCache);
+ getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+ getRunner.assertValid();
+
+ properties1.clear();
+ getRunner.enqueue("".getBytes(),properties1);
+ getRunner.enqueue("".getBytes(),properties1);
+
+ getIgniteCache.initialize(getRunner.getProcessContext());
+
+ getRunner.run(2, false, true);
+
+ getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_FAILURE, 2);
+ List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+ assertEquals(0, sucessfulFlowFiles.size());
+ List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+ assertEquals(2, failureFlowFiles.size());
+
+ final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
+ out1.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
+ final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(1);
+ out2.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
+
+ getRunner.shutdown();
+
+ }
+
+ @Test
+ public void testGetIgniteCacheDefaultConfTwoFlowFileFirstNoKey() throws IOException, InterruptedException {
+
+ getRunner = TestRunners.newTestRunner(getIgniteCache);
+ getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
+ getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+ getRunner.assertValid();
+ getRunner.enqueue("".getBytes());
+ getRunner.enqueue("".getBytes(),properties2);
+ getIgniteCache.initialize(getRunner.getProcessContext());
+ getIgniteCache.getIgniteCache().put("key2", "test2".getBytes());
+
+ getRunner.run(2, false, true);
+
+ List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+ assertEquals(1, sucessfulFlowFiles.size());
+ List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+ assertEquals(1, failureFlowFiles.size());
+
+ final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
+
+ out1.assertContentEquals("".getBytes());
+ out1.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
+
+ final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
+
+ out2.assertContentEquals("test2".getBytes());
+ Assert.assertArrayEquals("test2".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key2"));
+
+ getRunner.shutdown();
+ }
+
+ @Test
+ public void testGetIgniteCacheDefaultConfTwoFlowFileSecondNoKey() throws IOException, InterruptedException {
+
+ getRunner = TestRunners.newTestRunner(getIgniteCache);
+ getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
+ getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+ getRunner.assertValid();
+ getRunner.enqueue("".getBytes(),properties1);
+ getRunner.enqueue("".getBytes());
+ getIgniteCache.initialize(getRunner.getProcessContext());
+
+ getIgniteCache.getIgniteCache().put("key1", "test1".getBytes());
+ getRunner.run(2, false, true);
+
+ List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+ assertEquals(1, sucessfulFlowFiles.size());
+ List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+ assertEquals(1, failureFlowFiles.size());
+
+ final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
+
+ out1.assertContentEquals("".getBytes());
+ out1.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
+
+ final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
+
+ out2.assertContentEquals("test1".getBytes());
+ Assert.assertArrayEquals("test1".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key1"));
+
+ getRunner.shutdown();
+
+ }
+
+
+ @Test
+ public void testGetIgniteCacheDefaultConfThreeFlowFilesOneOkSecondOkThirdNoExpressionKey() throws IOException, InterruptedException {
+
+ getRunner = TestRunners.newTestRunner(getIgniteCache);
+ getRunner.setProperty(GetIgniteCache.CACHE_NAME, CACHE_NAME);
+ getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}");
+
+ getRunner.assertValid();
+ getRunner.enqueue("".getBytes(),properties1);
+ getRunner.enqueue("".getBytes(),properties2);
+ getRunner.enqueue("".getBytes());
+ getIgniteCache.initialize(getRunner.getProcessContext());
+
+ getIgniteCache.getIgniteCache().put("key1", "test1".getBytes());
+ getIgniteCache.getIgniteCache().put("key2", "test2".getBytes());
+ getRunner.run(3, false, true);
+
+ List<MockFlowFile> sucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
+ assertEquals(2, sucessfulFlowFiles.size());
+ List<MockFlowFile> failureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
+ assertEquals(1, failureFlowFiles.size());
+
+ final MockFlowFile out1 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE).get(0);
+
+ out1.assertContentEquals("".getBytes());
+ out1.assertAttributeEquals(GetIgniteCache.IGNITE_GET_FAILED_REASON_ATTRIBUTE_KEY, GetIgniteCache.IGNITE_GET_FAILED_MISSING_KEY_MESSAGE);
+
+ final MockFlowFile out2 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
+
+ out2.assertContentEquals("test1".getBytes());
+ Assert.assertArrayEquals("test1".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key1"));
+
+ final MockFlowFile out3 = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(1);
+
+ out3.assertContentEquals("test2".getBytes());
+ Assert.assertArrayEquals("test2".getBytes(),(byte[])getIgniteCache.getIgniteCache().get("key2"));
+
+ getRunner.shutdown();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/da33e285/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java
index 5639795..4d7e5c6 100644
--- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java
@@ -47,13 +47,18 @@ public class TestPutIgniteCache {
@BeforeClass
public static void setUpClass() {
- ignite = Ignition.start("test-ignite.xml");
+ List<Ignite> grids = Ignition.allGrids();
+ if ( grids.size() == 1 )
+ ignite = grids.get(0);
+ else
+ ignite = Ignition.start("test-ignite.xml");
}
@AfterClass
public static void tearDownClass() {
- ignite.close();
+ if ( ignite != null )
+ ignite.close();
Ignition.stop(true);
}