You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/23 16:14:01 UTC
[5/5] nifi git commit: NIFI-1055: Fixed checkstyle violations
NIFI-1055: Fixed checkstyle violations
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0fc5d304
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0fc5d304
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0fc5d304
Branch: refs/heads/master
Commit: 0fc5d3046178836365f710d312cef6568126a99d
Parents: 5d90c9b
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Oct 23 09:59:24 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Oct 23 10:08:44 2015 -0400
----------------------------------------------------------------------
.../nifi/processors/avro/ConvertAvroToJSON.java | 42 ++++----
.../processors/kite/TestCSVToAvroProcessor.java | 1 -
.../nifi/processors/standard/ListenHTTP.java | 108 +++++++++----------
.../standard/PutDistributedMapCache.java | 96 +++++++++--------
.../standard/TestPutDistributedMapCache.java | 31 +++---
5 files changed, 140 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/0fc5d304/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
index 016750b..f0ba71a 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
@@ -50,7 +50,7 @@ import org.apache.nifi.processor.io.StreamCallback;
@SideEffectFree
@SupportsBatching
-@Tags({ "json", "avro", "binary" })
+@Tags({"json", "avro", "binary"})
@CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such "
+ "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this "
+ "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of "
@@ -60,41 +60,41 @@ public class ConvertAvroToJSON extends AbstractProcessor {
protected static final String CONTAINER_ARRAY = "array";
protected static final String CONTAINER_NONE = "none";
- static final PropertyDescriptor CONTAINER_OPTIONS
- = new PropertyDescriptor.Builder()
- .name("JSON container options")
- .description("Determines how stream of records is exposed: either as a sequence of single Objects (" + CONTAINER_NONE + ") (i.e. writing every Object to a new line), or as an array of Objects (" + CONTAINER_ARRAY + ").")
- .allowableValues(CONTAINER_NONE, CONTAINER_ARRAY)
- .required(true)
- .defaultValue(CONTAINER_ARRAY)
- .build();
+ static final PropertyDescriptor CONTAINER_OPTIONS = new PropertyDescriptor.Builder()
+ .name("JSON container options")
+ .description("Determines how stream of records is exposed: either as a sequence of single Objects (" + CONTAINER_NONE
+ + ") (i.e. writing every Object to a new line), or as an array of Objects (" + CONTAINER_ARRAY + ").")
+ .allowableValues(CONTAINER_NONE, CONTAINER_ARRAY)
+ .required(true)
+ .defaultValue(CONTAINER_ARRAY)
+ .build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("A FlowFile is routed to this relationship after it has been converted to JSON")
- .build();
+ .name("success")
+ .description("A FlowFile is routed to this relationship after it has been converted to JSON")
+ .build();
static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason")
- .build();
+ .name("failure")
+ .description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason")
+ .build();
-
private List<PropertyDescriptor> properties;
-
+
@Override
protected void init(ProcessorInitializationContext context) {
super.init(context);
-
+
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(CONTAINER_OPTIONS);
this.properties = Collections.unmodifiableList(properties);
-
}
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
+
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> rels = new HashSet<>();
@@ -118,8 +118,8 @@ public class ConvertAvroToJSON extends AbstractProcessor {
public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn);
- final OutputStream out = new BufferedOutputStream(rawOut);
- final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
+ final OutputStream out = new BufferedOutputStream(rawOut);
+ final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
final GenericData genericData = GenericData.get();
GenericRecord record = reader.next();
http://git-wip-us.apache.org/repos/asf/nifi/blob/0fc5d304/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
index 0cde23c..902ec79 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
@@ -58,7 +58,6 @@ public class TestCSVToAvroProcessor {
/**
* Basic test for tab separated files, similar to #test
- * @throws IOException
*/
@Test
public void testTabSeparatedConversion() throws IOException {
http://git-wip-us.apache.org/repos/asf/nifi/blob/0fc5d304/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index a446eb6..9ad1703 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -31,10 +31,12 @@ import java.util.regex.Pattern;
import javax.servlet.Servlet;
import javax.ws.rs.Path;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
-import org.apache.nifi.stream.io.StreamThrottler;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
@@ -42,15 +44,12 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.servlets.ContentAcknowledgmentServlet;
import org.apache.nifi.processors.standard.servlets.ListenHTTPServlet;
import org.apache.nifi.ssl.SSLContextService;
-
+import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
+import org.apache.nifi.stream.io.StreamThrottler;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
@@ -70,56 +69,56 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
private List<PropertyDescriptor> properties;
public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("Relationship for successfully received FlowFiles")
- .build();
+ .name("success")
+ .description("Relationship for successfully received FlowFiles")
+ .build();
public static final PropertyDescriptor BASE_PATH = new PropertyDescriptor.Builder()
- .name("Base Path")
- .description("Base path for incoming connections")
- .required(true)
- .defaultValue("contentListener")
- .addValidator(StandardValidators.URI_VALIDATOR)
- .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
- .build();
+ .name("Base Path")
+ .description("Base path for incoming connections")
+ .required(true)
+ .defaultValue("contentListener")
+ .addValidator(StandardValidators.URI_VALIDATOR)
+ .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
+ .build();
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
- .name("Listening Port")
- .description("The Port to listen on for incoming connections")
- .required(true)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .build();
+ .name("Listening Port")
+ .description("The Port to listen on for incoming connections")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
public static final PropertyDescriptor AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder()
- .name("Authorized DN Pattern")
- .description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.")
- .required(true)
- .defaultValue(".*")
- .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
- .build();
+ .name("Authorized DN Pattern")
+ .description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.")
+ .required(true)
+ .defaultValue(".*")
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
public static final PropertyDescriptor MAX_UNCONFIRMED_TIME = new PropertyDescriptor.Builder()
- .name("Max Unconfirmed Flowfile Time")
- .description("The maximum amount of time to wait for a FlowFile to be confirmed before it is removed from the cache")
- .required(true)
- .defaultValue("60 secs")
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .build();
+ .name("Max Unconfirmed Flowfile Time")
+ .description("The maximum amount of time to wait for a FlowFile to be confirmed before it is removed from the cache")
+ .required(true)
+ .defaultValue("60 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder()
- .name("Max Data to Receive per Second")
- .description("The maximum amount of data to receive per second; this allows the bandwidth to be throttled to a specified data rate; if not specified, the data rate is not throttled")
- .required(false)
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .build();
+ .name("Max Data to Receive per Second")
+ .description("The maximum amount of data to receive per second; this allows the bandwidth to be throttled to a specified data rate; if not specified, the data rate is not throttled")
+ .required(false)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
- .name("SSL Context Service")
- .description("The Controller Service to use in order to obtain an SSL Context")
- .required(false)
- .identifiesControllerService(SSLContextService.class)
- .build();
+ .name("SSL Context Service")
+ .description("The Controller Service to use in order to obtain an SSL Context")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
public static final PropertyDescriptor HEADERS_AS_ATTRIBUTES_REGEX = new PropertyDescriptor.Builder()
- .name("HTTP Headers to receive as Attributes (Regex)")
- .description("Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes")
- .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
- .required(false)
- .build();
+ .name("HTTP Headers to receive as Attributes (Regex)")
+ .description("Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes")
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .required(false)
+ .build();
public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
@@ -173,7 +172,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
toShutdown.stop();
toShutdown.destroy();
} catch (final Exception ex) {
- getLogger().warn("unable to cleanly shutdown embedded server due to {}", new Object[]{ex});
+ getLogger().warn("unable to cleanly shutdown embedded server due to {}", new Object[] {ex});
this.server = null;
}
}
@@ -235,18 +234,17 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
connector.setPort(port);
// add the connector to the server
- server.setConnectors(new Connector[]{connector});
+ server.setConnectors(new Connector[] {connector});
final ServletContextHandler contextHandler = new ServletContextHandler(server, "/", true, (keystorePath != null));
for (final Class<? extends Servlet> cls : getServerClasses()) {
final Path path = cls.getAnnotation(Path.class);
// Note: servlets must have a path annotation - this will NPE otherwise
// also, servlets other than ListenHttpServlet must have a path starting with /
- if(basePath.isEmpty() && !path.value().isEmpty()){
+ if (basePath.isEmpty() && !path.value().isEmpty()) {
// Note: this is to handle the condition of an empty uri, otherwise pathSpec would start with //
contextHandler.addServlet(cls, path.value());
- }
- else{
+ } else {
contextHandler.addServlet(cls, "/" + basePath + path.value());
}
}
@@ -304,7 +302,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
for (final String id : findOldFlowFileIds(context)) {
final FlowFileEntryTimeWrapper wrapper = flowFileMap.remove(id);
if (wrapper != null) {
- getLogger().warn("failed to received acknowledgment for HOLD with ID {}; rolling back session", new Object[]{id});
+ getLogger().warn("failed to received acknowledgment for HOLD with ID {}; rolling back session", new Object[] {id});
wrapper.session.rollback();
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0fc5d304/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java
index 8e50c9f..bc1fde5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java
@@ -16,6 +16,16 @@
*/
package org.apache.nifi.processors.standard;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -33,24 +43,22 @@ import org.apache.nifi.distributed.cache.client.exception.SerializationException
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
-import org.apache.nifi.processor.*;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-
@EventDriven
@SupportsBatching
@Tags({"map", "cache", "put", "distributed"})
@CapabilityDescription("Gets the content of a FlowFile and puts it to a distributed map cache, using a cache key " +
- "computed from FlowFile attributes. If the cache already contains the entry and the cache update strategy is " +
- "'keep original' the entry is not replaced.'")
+ "computed from FlowFile attributes. If the cache already contains the entry and the cache update strategy is " +
+ "'keep original' the entry is not replaced.'")
@WritesAttribute(attribute = "cached", description = "All FlowFiles will have an attribute 'cached'. The value of this " +
- "attribute is true, is the FlowFile is cached, otherwise false.")
+ "attribute is true, is the FlowFile is cached, otherwise false.")
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer"})
public class PutDistributedMapCache extends AbstractProcessor {
@@ -58,55 +66,55 @@ public class PutDistributedMapCache extends AbstractProcessor {
// Identifies the distributed map cache client
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
- .name("Distributed Cache Service")
- .description("The Controller Service that is used to cache flow files")
- .required(true)
- .identifiesControllerService(DistributedMapCacheClient.class)
- .build();
+ .name("Distributed Cache Service")
+ .description("The Controller Service that is used to cache flow files")
+ .required(true)
+ .identifiesControllerService(DistributedMapCacheClient.class)
+ .build();
// Selects the FlowFile attribute, whose value is used as cache key
public static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
- .name("Cache Entry Identifier")
- .description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will " +
- "be evaluated against a FlowFile in order to determine the cache key")
- .required(true)
- .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
- .expressionLanguageSupported(true)
- .build();
+ .name("Cache Entry Identifier")
+ .description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will " +
+ "be evaluated against a FlowFile in order to determine the cache key")
+ .required(true)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
+ .expressionLanguageSupported(true)
+ .build();
public static final AllowableValue CACHE_UPDATE_REPLACE = new AllowableValue("replace", "Replace if present",
- "Adds the specified entry to the cache, replacing any value that is currently set.");
+ "Adds the specified entry to the cache, replacing any value that is currently set.");
public static final AllowableValue CACHE_UPDATE_KEEP_ORIGINAL = new AllowableValue("keeporiginal", "Keep original",
- "Adds the specified entry to the cache, if the key does not exist.");
+ "Adds the specified entry to the cache, if the key does not exist.");
public static final PropertyDescriptor CACHE_UPDATE_STRATEGY = new PropertyDescriptor.Builder()
- .name("Cache update strategy")
- .description("Determines how the cache is updated if the cache already contains the entry")
- .required(true)
- .allowableValues(CACHE_UPDATE_REPLACE, CACHE_UPDATE_KEEP_ORIGINAL)
- .defaultValue(CACHE_UPDATE_REPLACE.getValue())
- .build();
+ .name("Cache update strategy")
+ .description("Determines how the cache is updated if the cache already contains the entry")
+ .required(true)
+ .allowableValues(CACHE_UPDATE_REPLACE, CACHE_UPDATE_KEEP_ORIGINAL)
+ .defaultValue(CACHE_UPDATE_REPLACE.getValue())
+ .build();
public static final PropertyDescriptor CACHE_ENTRY_MAX_BYTES = new PropertyDescriptor.Builder()
- .name("Max cache entry size")
- .description("The maximum amount of data to put into cache")
- .required(false)
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .defaultValue("1 MB")
- .expressionLanguageSupported(false)
- .build();
+ .name("Max cache entry size")
+ .description("The maximum amount of data to put into cache")
+ .required(false)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("1 MB")
+ .expressionLanguageSupported(false)
+ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("Any FlowFile that is successfully inserted into cache will be routed to this relationship")
- .build();
+ .name("success")
+ .description("Any FlowFile that is successfully inserted into cache will be routed to this relationship")
+ .build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("Any FlowFile that cannot be inserted into the cache will be routed to this relationship")
- .build();
+ .name("failure")
+ .description("Any FlowFile that cannot be inserted into the cache will be routed to this relationship")
+ .build();
private final Set<Relationship> relationships;
private final Serializer<String> keySerializer = new StringSerializer();
@@ -207,7 +215,7 @@ public class PutDistributedMapCache extends AbstractProcessor {
} catch (final IOException e) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
- logger.error("Unable to communicate with cache when processing {} due to {}", new Object[]{flowFile, e});
+ logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e});
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0fc5d304/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
index 8347e7f..05d4293 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
@@ -16,6 +16,14 @@
*/
package org.apache.nifi.processors.standard;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
@@ -26,22 +34,11 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static org.junit.Assert.assertEquals;
public class TestPutDistributedMapCache {
private TestRunner runner;
private MockCacheClient service;
- private PutDistributedMapCache processor;
@Before
public void setup() throws InitializationException {
@@ -57,7 +54,7 @@ public class TestPutDistributedMapCache {
public void testNoCacheKey() throws InitializationException {
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${caheKeyAttribute}");
- runner.enqueue(new byte[]{});
+ runner.enqueue(new byte[] {});
runner.run();
@@ -99,7 +96,7 @@ public class TestPutDistributedMapCache {
props.put("caheKeyAttribute", "2");
// flow file without content
- runner.enqueue(new byte[]{}, props);
+ runner.enqueue(new byte[] {}, props);
runner.run();
@@ -171,7 +168,7 @@ public class TestPutDistributedMapCache {
runner.clearTransferState();
- //we expect that the cache entry is replaced
+ // we expect that the cache entry is replaced
value = service.get("replaceme", new PutDistributedMapCache.StringSerializer(), new PutDistributedMapCache.CacheValueDeserializer());
assertEquals(replaced, new String(value, "UTF-8"));
}
@@ -215,7 +212,7 @@ public class TestPutDistributedMapCache {
runner.clearTransferState();
- //we expect that the cache entry is NOT replaced
+ // we expect that the cache entry is NOT replaced
value = service.get("replaceme", new PutDistributedMapCache.StringSerializer(), new PutDistributedMapCache.CacheValueDeserializer());
assertEquals(original, new String(value, "UTF-8"));
}
@@ -225,7 +222,7 @@ public class TestPutDistributedMapCache {
private boolean failOnCalls = false;
private void verifyNotFail() throws IOException {
- if ( failOnCalls ) {
+ if (failOnCalls) {
throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
}
}
@@ -240,7 +237,7 @@ public class TestPutDistributedMapCache {
@Override
@SuppressWarnings("unchecked")
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
- final Deserializer<V> valueDeserializer) throws IOException {
+ final Deserializer<V> valueDeserializer) throws IOException {
verifyNotFail();
return (V) values.putIfAbsent(key, value);
}