You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/12/02 16:27:11 UTC

nifi git commit: NIFI-3095: Add EL support to Elasticsearch 2.x (and HTTP) processors

Repository: nifi
Updated Branches:
  refs/heads/master 8da38acf3 -> 69b23adf1


NIFI-3095: Add EL support to Elasticsearch 2.x (and HTTP) processors

This closes #1276

Signed-off-by: jpercivall <JP...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/69b23adf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/69b23adf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/69b23adf

Branch: refs/heads/master
Commit: 69b23adf1b0fdfb7e1e344c833adfbeb1f4980e5
Parents: 8da38ac
Author: Matt Burgess <ma...@apache.org>
Authored: Tue Nov 29 11:29:47 2016 -0500
Committer: jpercivall <JP...@apache.org>
Committed: Fri Dec 2 11:16:23 2016 -0500

----------------------------------------------------------------------
 .../AbstractElasticsearchHttpProcessor.java     |  7 ++-
 .../AbstractElasticsearchProcessor.java         | 17 +++++--
 ...ctElasticsearchTransportClientProcessor.java | 52 +++++++++++---------
 .../elasticsearch/FetchElasticsearch.java       | 34 ++++++++-----
 .../elasticsearch/FetchElasticsearchHttp.java   | 44 +++++++++++------
 .../elasticsearch/PutElasticsearch.java         | 44 ++++++++++-------
 .../elasticsearch/PutElasticsearchHttp.java     | 51 +++++++++++--------
 .../elasticsearch/QueryElasticsearchHttp.java   | 38 ++++++++------
 .../elasticsearch/ScrollElasticsearchHttp.java  | 43 +++++++++-------
 .../elasticsearch/TestFetchElasticsearch.java   | 31 ++++++++++++
 .../TestFetchElasticsearchHttp.java             | 29 +++++++++++
 .../elasticsearch/TestPutElasticsearch.java     | 33 +++++++++++++
 .../elasticsearch/TestPutElasticsearchHttp.java | 27 ++++++++++
 .../TestQueryElasticsearchHttp.java             | 23 +++++++++
 .../TestScrollElasticsearchHttp.java            | 24 +++++++++
 15 files changed, 371 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/69b23adf/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
index c4121b5..d67ce6c 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
@@ -56,6 +56,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
             .description("Elasticsearch URL which will be connected to, including scheme (http, e.g.), host, and port. The default port for the REST API is 9200.")
             .required(true)
             .addValidator(StandardValidators.URL_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
@@ -81,6 +82,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
             .required(true)
             .defaultValue("5 secs")
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor RESPONSE_TIMEOUT = new PropertyDescriptor.Builder()
@@ -90,6 +92,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
             .required(true)
             .defaultValue("15 secs")
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
@@ -109,8 +112,8 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
         }
 
         // Set timeouts
-        okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS);
-        okHttpClient.readTimeout(context.getProperty(RESPONSE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS);
+        okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS);
+        okHttpClient.readTimeout(context.getProperty(RESPONSE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS);
 
         final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
         final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(SSLContextService.ClientAuth.NONE);

http://git-wip-us.apache.org/repos/asf/nifi/blob/69b23adf/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java
index 76c7224..c5e4cc3 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.elasticsearch;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
@@ -28,7 +29,6 @@ import org.apache.nifi.util.StringUtils;
 
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 
 /**
@@ -36,6 +36,13 @@ import java.util.Set;
  */
 public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
 
+    static final Validator NON_EMPTY_EL_VALIDATOR = (subject, value, context) -> {
+        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+            return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+        }
+        return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, value, context);
+    };
+
     public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
             .name("SSL Context Service")
             .description("The SSL Context Service used to provide client certificate information for TLS/SSL "
@@ -50,6 +57,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
             .required(true)
             .defaultValue("UTF-8")
             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
@@ -57,6 +65,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
             .description("Username to access the Elasticsearch cluster")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
@@ -65,6 +74,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
             .required(false)
             .sensitive(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     protected abstract void createElasticsearchClient(ProcessContext context) throws ProcessException;
@@ -74,8 +84,9 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
         Set<ValidationResult> results = new HashSet<>();
 
         // Ensure that if username or password is set, then the other is too
-        Map<PropertyDescriptor, String> propertyMap = validationContext.getProperties();
-        if (StringUtils.isEmpty(propertyMap.get(USERNAME)) != StringUtils.isEmpty(propertyMap.get(PASSWORD))) {
+        String userName = validationContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        String password = validationContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+        if (StringUtils.isEmpty(userName) != StringUtils.isEmpty(password)) {
             results.add(new ValidationResult.Builder().valid(false).explanation(
                     "If username or password is specified, then the other must be specified as well").build());
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/69b23adf/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java
index e1c9d4d..a16a0dd 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.processors.elasticsearch;
 
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.logging.ComponentLog;
@@ -50,21 +49,20 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
     /**
      * This validator ensures the Elasticsearch hosts property is a valid list of hostname:port entries
      */
-    private static final Validator HOSTNAME_PORT_VALIDATOR = new Validator() {
-        @Override
-        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
-            final List<String> esList = Arrays.asList(input.split(","));
-            for (String hostnamePort : esList) {
-                String[] addresses = hostnamePort.split(":");
-                // Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there)
-                if (addresses.length != 2) {
-                    return new ValidationResult.Builder().subject(subject).input(input).explanation(
-                            "Must be in hostname:port form (no scheme such as http://").valid(false).build();
-                }
+    private static final Validator HOSTNAME_PORT_VALIDATOR = (subject, input, context) -> {
+        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+            return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+        }
+        final List<String> esList = Arrays.asList(input.split(","));
+        for (String hostnamePort : esList) {
+            String[] addresses = hostnamePort.split(":");
+            // Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there)
+            if (addresses.length != 2) {
+                return new ValidationResult.Builder().subject(subject).input(input).explanation(
+                        "Must be in hostname:port form (no scheme such as http://").valid(false).build();
             }
-            return new ValidationResult.Builder().subject(subject).input(input).explanation(
-                    "Valid cluster definition").valid(true).build();
         }
+        return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid cluster definition").valid(true).build();
     };
 
     protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder()
@@ -73,6 +71,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .defaultValue("elasticsearch")
+            .expressionLanguageSupported(true)
             .build();
 
     protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder()
@@ -83,6 +82,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
             .required(true)
             .expressionLanguageSupported(false)
             .addValidator(HOSTNAME_PORT_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor PROP_SHIELD_LOCATION = new PropertyDescriptor.Builder()
@@ -93,6 +93,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
                     + "lib/ directory, doing so will prevent the Shield plugin from being loaded.")
             .required(false)
             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder()
@@ -101,7 +102,8 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
                     "For example, 5s (5 seconds). If non-local recommended is 30s")
             .required(true)
             .defaultValue("5s")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder()
@@ -110,7 +112,8 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
                     + "If non-local recommended is 30s.")
             .required(true)
             .defaultValue("5s")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     protected AtomicReference<Client> esClient = new AtomicReference<>();
@@ -135,11 +138,11 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
 
         log.debug("Creating ElasticSearch Client");
         try {
-            final String clusterName = context.getProperty(CLUSTER_NAME).getValue();
-            final String pingTimeout = context.getProperty(PING_TIMEOUT).getValue();
-            final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).getValue();
-            final String username = context.getProperty(USERNAME).getValue();
-            final String password = context.getProperty(PASSWORD).getValue();
+            final String clusterName = context.getProperty(CLUSTER_NAME).evaluateAttributeExpressions().getValue();
+            final String pingTimeout = context.getProperty(PING_TIMEOUT).evaluateAttributeExpressions().getValue();
+            final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).evaluateAttributeExpressions().getValue();
+            final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+            final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
 
             final SSLContextService sslService =
                     context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
@@ -149,7 +152,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
                     .put("client.transport.ping_timeout", pingTimeout)
                     .put("client.transport.nodes_sampler_interval", samplerInterval);
 
-            String shieldUrl = context.getProperty(PROP_SHIELD_LOCATION).getValue();
+            String shieldUrl = context.getProperty(PROP_SHIELD_LOCATION).evaluateAttributeExpressions().getValue();
             if (sslService != null) {
                 settingsBuilder.put("shield.transport.ssl", "true")
                         .put("shield.ssl.keystore.path", sslService.getKeyStoreFile())
@@ -171,7 +174,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
 
             TransportClient transportClient = getTransportClient(settingsBuilder, shieldUrl, username, password);
 
-            final String hosts = context.getProperty(HOSTS).getValue();
+            final String hosts = context.getProperty(HOSTS).evaluateAttributeExpressions().getValue();
             esHosts = getEsHosts(hosts);
 
             if (esHosts != null) {
@@ -268,6 +271,9 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
         for (String item : esList) {
 
             String[] addresses = item.split(":");
+            if (addresses.length != 2) {
+                throw new ArrayIndexOutOfBoundsException("Not in host:port format");
+            }
             final String hostName = addresses[0].trim();
             final int port = Integer.parseInt(addresses[1].trim());
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/69b23adf/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java
index 67aaae7..643edbb 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java
@@ -105,18 +105,17 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc
             .build();
 
 
-    @Override
-    public Set<Relationship> getRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_FAILURE);
-        relationships.add(REL_RETRY);
-        relationships.add(REL_NOT_FOUND);
-        return Collections.unmodifiableSet(relationships);
-    }
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        final Set<Relationship> _rels = new HashSet<>();
+        _rels.add(REL_SUCCESS);
+        _rels.add(REL_FAILURE);
+        _rels.add(REL_RETRY);
+        _rels.add(REL_NOT_FOUND);
+        relationships = Collections.unmodifiableSet(_rels);
 
-    @Override
-    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(CLUSTER_NAME);
         descriptors.add(HOSTS);
@@ -131,9 +130,18 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc
         descriptors.add(TYPE);
         descriptors.add(CHARSET);
 
-        return Collections.unmodifiableList(descriptors);
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
     }
 
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
 
     @OnScheduled
     public void setup(ProcessContext context) {
@@ -151,7 +159,7 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc
         final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
         final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
         final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
-        final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
+        final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
 
         final ComponentLog logger = getLogger();
         try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/69b23adf/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
index ac598bf..8fd30dc 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
@@ -131,19 +131,17 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
 
-    @Override
-    public Set<Relationship> getRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_FAILURE);
-        relationships.add(REL_RETRY);
-        relationships.add(REL_NOT_FOUND);
-        return Collections.unmodifiableSet(relationships);
-    }
+    static {
+        final Set<Relationship> _rels = new HashSet<>();
+        _rels.add(REL_SUCCESS);
+        _rels.add(REL_FAILURE);
+        _rels.add(REL_RETRY);
+        _rels.add(REL_NOT_FOUND);
+        relationships = Collections.unmodifiableSet(_rels);
 
-    @Override
-    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(ES_URL);
         descriptors.add(PROP_SSL_CONTEXT_SERVICE);
@@ -156,9 +154,18 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         descriptors.add(TYPE);
         descriptors.add(FIELDS);
 
-        return Collections.unmodifiableList(descriptors);
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
     }
 
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
 
     @OnScheduled
     public void setup(ProcessContext context) {
@@ -194,21 +201,22 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
                 : null;
 
         // Authentication
-        final String username = context.getProperty(USERNAME).getValue();
-        final String password = context.getProperty(PASSWORD).getValue();
+        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
 
         final ComponentLog logger = getLogger();
 
+        Response getResponse = null;
 
         try {
             logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId});
 
             // read the url property from the context
-            final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).getValue());
+            final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
             final URL url = buildRequestURL(urlstr, docId, index, docType, fields);
             final long startNanos = System.nanoTime();
 
-            final Response getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null);
+            getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null);
             final int statusCode = getResponse.code();
 
             if (isSuccess(statusCode)) {
@@ -290,6 +298,10 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
                 session.remove(flowFile);
             }
             context.yield();
+        } finally {
+            if (getResponse != null) {
+                getResponse.close();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/69b23adf/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
index 216efd4..d208d40 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
@@ -96,8 +96,7 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
             .description("The type of this document (used by Elasticsearch for indexing and searching)")
             .required(true)
             .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
-                    AttributeExpression.ResultType.STRING, true))
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@@ -105,8 +104,7 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
             .description("The type of the operation used to index (index, update, upsert)")
             .required(true)
             .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
-                    AttributeExpression.ResultType.STRING, true))
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
             .defaultValue("index")
             .build();
 
@@ -116,20 +114,19 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
             .required(true)
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .defaultValue("100")
+            .expressionLanguageSupported(true)
             .build();
 
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
 
-    @Override
-    public Set<Relationship> getRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_FAILURE);
-        relationships.add(REL_RETRY);
-        return Collections.unmodifiableSet(relationships);
-    }
+    static {
+        final Set<Relationship> _rels = new HashSet<>();
+        _rels.add(REL_SUCCESS);
+        _rels.add(REL_FAILURE);
+        _rels.add(REL_RETRY);
+        relationships = Collections.unmodifiableSet(_rels);
 
-    @Override
-    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(CLUSTER_NAME);
         descriptors.add(HOSTS);
@@ -146,7 +143,17 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
         descriptors.add(BATCH_SIZE);
         descriptors.add(INDEX_OP);
 
-        return Collections.unmodifiableList(descriptors);
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
     }
 
     @OnScheduled
@@ -156,16 +163,16 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+        final ComponentLog logger = getLogger();
         final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue();
-        final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
+        final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
 
         final List<FlowFile> flowFiles = session.get(batchSize);
         if (flowFiles.isEmpty()) {
             return;
         }
 
-        final ComponentLog logger = getLogger();
         // Keep track of the list of flow files that need to be transferred. As they are transferred, remove them from the list.
         List<FlowFile> flowFilesToTransfer = new LinkedList<>(flowFiles);
         try {
@@ -178,6 +185,7 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
                 final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue();
                 final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue();
                 final String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(file).getValue();
+                final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(file).getValue());
 
                 final String id = file.getAttribute(id_attribute);
                 if (id == null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/69b23adf/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index 3ba46bb..2b39a86 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -104,8 +104,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
             .description("The type of this document (used by Elasticsearch for indexing and searching)")
             .required(true)
             .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
-                    AttributeExpression.ResultType.STRING, true))
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@@ -114,8 +113,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
             .description("The type of the operation used to index (index, update, upsert, delete)")
             .required(true)
             .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
-                    AttributeExpression.ResultType.STRING, true))
+            .addValidator(NON_EMPTY_EL_VALIDATOR)
             .defaultValue("index")
             .build();
 
@@ -128,19 +126,19 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
             .required(true)
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .defaultValue("100")
+            .expressionLanguageSupported(true)
             .build();
 
-    @Override
-    public Set<Relationship> getRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_FAILURE);
-        relationships.add(REL_RETRY);
-        return Collections.unmodifiableSet(relationships);
-    }
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        final Set<Relationship> _rels = new HashSet<>();
+        _rels.add(REL_SUCCESS);
+        _rels.add(REL_FAILURE);
+        _rels.add(REL_RETRY);
+        relationships = Collections.unmodifiableSet(_rels);
 
-    @Override
-    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(ES_URL);
         descriptors.add(PROP_SSL_CONTEXT_SERVICE);
@@ -154,7 +152,18 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         descriptors.add(CHARSET);
         descriptors.add(BATCH_SIZE);
         descriptors.add(INDEX_OP);
-        return Collections.unmodifiableList(descriptors);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
     }
 
     @Override
@@ -192,7 +201,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
 
         final List<FlowFile> flowFiles = session.get(batchSize);
         if (flowFiles.isEmpty()) {
@@ -200,10 +209,10 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         }
 
         final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue();
-        final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
+
         // Authentication
-        final String username = context.getProperty(USERNAME).getValue();
-        final String password = context.getProperty(PASSWORD).getValue();
+        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
 
 
         OkHttpClient okHttpClient = getClient();
@@ -213,7 +222,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         List<FlowFile> flowFilesToTransfer = new LinkedList<>(flowFiles);
 
         final StringBuilder sb = new StringBuilder();
-        final String baseUrl = trimToEmpty(context.getProperty(ES_URL).getValue());
+        final String baseUrl = trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
         final URL url;
         try {
             url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl + "/") + "_bulk");
@@ -225,6 +234,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
 
         for (FlowFile file : flowFiles) {
             final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue();
+            final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(file).getValue());
             if (StringUtils.isEmpty(index)) {
                 logger.error("No value for index in for {}, transferring to failure", new Object[]{id_attribute, file});
                 flowFilesToTransfer.remove(file);
@@ -368,6 +378,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
                 logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()});
                 session.transfer(flowFilesToTransfer, REL_FAILURE);
             }
+            getResponse.close();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/69b23adf/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
index f921323..f65816e 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
@@ -174,17 +174,16 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
             .allowableValues(TARGET_FLOW_FILE_CONTENT, TARGET_FLOW_FILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
 
-    @Override
-    public Set<Relationship> getRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_FAILURE);
-        relationships.add(REL_RETRY);
-        return Collections.unmodifiableSet(relationships);
-    }
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        final Set<Relationship> _rels = new HashSet<>();
+        _rels.add(REL_SUCCESS);
+        _rels.add(REL_FAILURE);
+        _rels.add(REL_RETRY);
+        relationships = Collections.unmodifiableSet(_rels);
 
-    @Override
-    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(ES_URL);
         descriptors.add(PROP_SSL_CONTEXT_SERVICE);
@@ -201,7 +200,17 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
         descriptors.add(LIMIT);
         descriptors.add(TARGET);
 
-        return Collections.unmodifiableList(descriptors);
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
     }
 
     @OnScheduled
@@ -247,8 +256,8 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
                 .equals(TARGET_FLOW_FILE_CONTENT);
 
         // Authentication
-        final String username = context.getProperty(USERNAME).getValue();
-        final String password = context.getProperty(PASSWORD).getValue();
+        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
 
         final ComponentLog logger = getLogger();
 
@@ -261,7 +270,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
 
             final long startNanos = System.nanoTime();
             // read the url property from the context
-            final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).getValue());
+            final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
 
             boolean hitLimit = false;
             do {
@@ -279,6 +288,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
                 numResults = this.getPage(getResponse, queryUrl, context, session, flowFile,
                         logger, startNanos, targetIsContent);
                 fromIndex += pageSize;
+                getResponse.close();
             } while (numResults > 0 && !hitLimit);
 
             if (flowFile != null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/69b23adf/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
index 3d897cf..0442bf7 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
@@ -159,16 +159,15 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
             .required(true).expressionLanguageSupported(true)
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
 
-    @Override
-    public Set<Relationship> getRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_FAILURE);
-        return Collections.unmodifiableSet(relationships);
-    }
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        final Set<Relationship> _rels = new HashSet<>();
+        _rels.add(REL_SUCCESS);
+        _rels.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(_rels);
 
-    @Override
-    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(ES_URL);
         descriptors.add(PROP_SSL_CONTEXT_SERVICE);
@@ -184,7 +183,17 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
         descriptors.add(FIELDS);
         descriptors.add(SORT);
 
-        return Collections.unmodifiableList(descriptors);
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
     }
 
     @OnScheduled
@@ -227,18 +236,18 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
                 .getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null;
 
         // Authentication
-        final String username = context.getProperty(USERNAME).getValue();
-        final String password = context.getProperty(PASSWORD).getValue();
+        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
 
         final ComponentLog logger = getLogger();
 
         try {
             String scrollId = loadScrollId(context.getStateManager());
 
+            // read the url property from the context
+            final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions()
+                    .getValue());
             if (scrollId != null) {
-                // read the url property from the context
-                final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL)
-                        .getValue());
                 final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort,
                         scrollId, pageSize, scroll);
                 final long startNanos = System.nanoTime();
@@ -246,13 +255,12 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
                 final Response getResponse = sendRequestToElasticsearch(okHttpClient, scrollurl,
                         username, password, "GET", null);
                 this.getPage(getResponse, scrollurl, context, session, flowFile, logger, startNanos);
+                getResponse.close();
             } else {
                 logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index,
                         docType, query });
 
                 // read the url property from the context
-                final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL)
-                        .getValue());
                 final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
                         scrollId, pageSize, scroll);
                 final long startNanos = System.nanoTime();
@@ -260,6 +268,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
                 final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
                         username, password, "GET", null);
                 this.getPage(getResponse, queryUrl, context, session, flowFile, logger, startNanos);
+                getResponse.close();
             }
 
         } catch (IOException ioe) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/69b23adf/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
index 9b68f2e..ba22b65 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
@@ -103,6 +103,37 @@ public class TestFetchElasticsearch {
     }
 
     @Test
+    public void testFetchElasticsearchOnTriggerEL() throws IOException {
+        runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(true)); // all docs are found
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "${cluster.name}");
+        runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "${hosts}");
+        runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "${ping.timeout}");
+        runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "${sampler.interval}");
+
+        runner.setProperty(FetchElasticsearch.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(FetchElasticsearch.TYPE, "status");
+        runner.assertNotValid();
+        runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
+        runner.assertValid();
+        runner.setVariable("cluster.name", "elasticsearch");
+        runner.setVariable("hosts", "127.0.0.1:9300");
+        runner.setVariable("ping.timeout", "5s");
+        runner.setVariable("sampler.interval", "5s");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearch.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
     public void testFetchElasticsearchOnTriggerWithFailures() throws IOException {
         runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(false)); // simulate doc not found
         runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch");

http://git-wip-us.apache.org/repos/asf/nifi/blob/69b23adf/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
index 82fa3da..28bc060 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
@@ -62,6 +62,35 @@ public class TestFetchElasticsearchHttp {
     }
 
     @Test
+    public void testFetchElasticsearchOnTriggerEL() throws IOException {
+        runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
+
+        runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
+        runner.assertNotValid();
+        runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+        runner.assertValid();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
+        runner.assertValid();
+
+        runner.setVariable("es.url", "http://127.0.0.1:9200");
+        runner.setVariable("connect.timeout", "5s");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
     public void testFetchElasticsearchOnTrigger() throws IOException {
         runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found
         runner.setValidateExpressionUsage(true);

http://git-wip-us.apache.org/repos/asf/nifi/blob/69b23adf/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
index 4e6a820..6d6da5a 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
@@ -103,6 +103,39 @@ public class TestPutElasticsearch {
     }
 
     @Test
+    public void testPutElasticSearchOnTriggerEL() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "${cluster.name}");
+        runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "${hosts}");
+        runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "${ping.timeout}");
+        runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "${sampler.interval}");
+
+        runner.setProperty(PutElasticsearch.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(PutElasticsearch.TYPE, "status");
+        runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
+        runner.assertNotValid();
+        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
+        runner.assertValid();
+        runner.setVariable("cluster.name", "elasticsearch");
+        runner.setVariable("hosts", "127.0.0.1:9300");
+        runner.setVariable("ping.timeout", "5s");
+        runner.setVariable("sampler.interval", "5s");
+
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
     public void testPutElasticSearchOnTriggerWithFailures() throws IOException {
         runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures
         runner.setValidateExpressionUsage(false);

http://git-wip-us.apache.org/repos/asf/nifi/blob/69b23adf/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
index 9ce578f..fae63ee 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
@@ -127,6 +127,33 @@ public class TestPutElasticsearchHttp {
     }
 
     @Test
+    public void testPutElasticSearchOnTriggerEL() throws IOException {
+        runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
+
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+        runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
+        runner.assertValid();
+
+        runner.setVariable("es.url", "http://127.0.0.1:9200");
+        runner.setVariable("connect.timeout", "5s");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
     public void testPutElasticSearchOnTriggerBadIndexOp() throws IOException {
         runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
         runner.setValidateExpressionUsage(true);

http://git-wip-us.apache.org/repos/asf/nifi/blob/69b23adf/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
index b9ec1f9..ccd74fa 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
@@ -77,6 +77,29 @@ public class TestQueryElasticsearchHttp {
     }
 
     @Test
+    public void testQueryElasticsearchOnTrigger_withInput_EL() throws IOException {
+        runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
+
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.QUERY,
+                "source:Twitter AND identifier:\"${identifier}\"");
+        runner.assertValid();
+        runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+        runner.assertValid();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
+        runner.assertValid();
+
+        runner.setVariable("es.url", "http://127.0.0.1:9200");
+
+        runAndVerifySuccess(true);
+    }
+
+    @Test
     public void testQueryElasticsearchOnTrigger_withInput_attributeTarget() throws IOException {
         runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
         runner.setValidateExpressionUsage(true);

http://git-wip-us.apache.org/repos/asf/nifi/blob/69b23adf/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
index 2616269..a1a4e8d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
@@ -78,6 +78,30 @@ public class TestScrollElasticsearchHttp {
         runAndVerifySuccess();
     }
 
+    @Test
+    public void testScrollElasticsearchOnTrigger_withNoInput_EL() throws IOException {
+        runner = TestRunners.newTestRunner(new ScrollElasticsearchHttpTestProcessor());
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
+
+        runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
+        runner.assertNotValid();
+        runner.setProperty(ScrollElasticsearchHttp.QUERY,
+                "source:WZ AND identifier:\"${identifier}\"");
+        runner.assertValid();
+        runner.setProperty(ScrollElasticsearchHttp.PAGE_SIZE, "2");
+        runner.assertValid();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
+        runner.assertValid();
+
+        runner.setVariable("es.url", "http://127.0.0.1:9200");
+
+        runner.setIncomingConnection(false);
+        runAndVerifySuccess();
+    }
+
     private void runAndVerifySuccess() {
         runner.enqueue("".getBytes(), new HashMap<String, String>() {
             {