You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/07/06 17:24:43 UTC

[2/2] nifi git commit: NIFI-2068: Add Elasticsearch HTTP processors

NIFI-2068: Add Elasticsearch HTTP processors

This closes #576

Signed-off-by: jpercivall <jo...@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/181386b9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/181386b9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/181386b9

Branch: refs/heads/master
Commit: 181386b943ea693fd62ad2fbfff65277d22834b0
Parents: 9d273b1
Author: Matt Burgess <ma...@apache.org>
Authored: Wed Jul 6 13:07:16 2016 -0400
Committer: jpercivall <jo...@yahoo.com>
Committed: Wed Jul 6 13:12:29 2016 -0400

----------------------------------------------------------------------
 .../nifi-elasticsearch-processors/pom.xml       |  28 ++
 .../AbstractElasticsearchHttpProcessor.java     | 184 +++++++++
 .../AbstractElasticsearchProcessor.java         | 265 +------------
 ...ctElasticsearchTransportClientProcessor.java | 280 +++++++++++++
 .../elasticsearch/FetchElasticsearch.java       |   2 +-
 .../elasticsearch/FetchElasticsearchHttp.java   | 310 +++++++++++++++
 .../elasticsearch/PutElasticsearch.java         |   2 +-
 .../elasticsearch/PutElasticsearchHttp.java     | 367 ++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   2 +
 .../elasticsearch/TestFetchElasticsearch.java   |  56 +--
 .../TestFetchElasticsearchHttp.java             | 354 +++++++++++++++++
 .../elasticsearch/TestPutElasticsearch.java     |  64 +--
 .../elasticsearch/TestPutElasticsearchHttp.java | 388 +++++++++++++++++++
 .../src/test/resources/DocumentExample.json     |  16 -
 14 files changed, 1986 insertions(+), 332 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
index 4cf40fe..68f35b9 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
@@ -56,6 +56,11 @@ language governing permissions and limitations under the License. -->
             <version>${es.version}</version>
         </dependency>
         <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+            <version>3.3.1</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-ssl-context-service-api</artifactId>
         </dependency>
@@ -64,10 +69,33 @@ language governing permissions and limitations under the License. -->
             <artifactId>commons-io</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-ssl-context-service</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>2.5.4</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/test/resources/DocumentExample.json</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
new file mode 100644
index 0000000..d477d0e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import okhttp3.Credentials;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.StringUtils;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A base class for Elasticsearch processors that use the HTTP API
+ */
+public abstract class AbstractElasticsearchHttpProcessor extends AbstractElasticsearchProcessor {
+
+    public static final PropertyDescriptor ES_URL = new PropertyDescriptor.Builder()
+            .name("elasticsearch-http-url")
+            .displayName("Elasticsearch URL")
+            .description("Elasticsearch URL which will be connected to, including scheme, host, port, path. The default port for the REST API is 9200.")
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
+            .name("elasticsearch-http-proxy-host")
+            .displayName("Proxy Host")
+            .description("The fully qualified hostname or IP address of the proxy server")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
+            .name("elasticsearch-http-proxy-port")
+            .displayName("Proxy Port")
+            .description("The port of the proxy server")
+            .required(false)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("elasticsearch-http-connect-timeout")
+            .displayName("Connection Timeout")
+            .description("Max wait time for the connection to the Elasticsearch REST API.")
+            .required(true)
+            .defaultValue("5 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor RESPONSE_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("elasticsearch-http-response-timeout")
+            .displayName("Response Timeout")
+            .description("Max wait time for a response from the Elasticsearch REST API.")
+            .required(true)
+            .defaultValue("15 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
+
+    @Override
+    protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
+        okHttpClientAtomicReference.set(null);
+
+        OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder();
+
+        // Add a proxy if set
+        final String proxyHost = context.getProperty(PROXY_HOST).getValue();
+        final Integer proxyPort = context.getProperty(PROXY_PORT).asInteger();
+        if (proxyHost != null && proxyPort != null) {
+            final Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
+            okHttpClient.proxy(proxy);
+        }
+
+        // Set timeouts
+        okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS);
+        okHttpClient.readTimeout(context.getProperty(RESPONSE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS);
+
+        final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(SSLContextService.ClientAuth.NONE);
+
+        // check if the ssl context is set and add the factory if so
+        if (sslContext != null) {
+            okHttpClient.sslSocketFactory(sslContext.getSocketFactory());
+        }
+
+        okHttpClientAtomicReference.set(okHttpClient.build());
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+        if(validationContext.getProperty(PROXY_HOST).isSet() != validationContext.getProperty(PROXY_PORT).isSet()) {
+            results.add(new ValidationResult.Builder()
+                    .valid(false)
+                    .explanation("Proxy Host and Proxy Port must be both set or empty")
+                    .build());
+        }
+        return results;
+    }
+
+    protected OkHttpClient getClient() {
+        return okHttpClientAtomicReference.get();
+    }
+
+    protected boolean isSuccess(int statusCode) {
+        return statusCode / 100 == 2;
+    }
+
+    protected Response sendRequestToElasticsearch(OkHttpClient client, URL url, String username, String password, String verb, RequestBody body) throws IOException {
+
+        final ComponentLog log = getLogger();
+        Request.Builder requestBuilder = new Request.Builder()
+                .url(url);
+        if ("get".equalsIgnoreCase(verb)) {
+            requestBuilder = requestBuilder.get();
+        } else if ("put".equalsIgnoreCase(verb)) {
+            requestBuilder = requestBuilder.put(body);
+        } else {
+            throw new IllegalArgumentException("Elasticsearch REST API verb not supported by this processor: " + verb);
+        }
+
+        if(!StringUtils.isEmpty(username) && !StringUtils.isEmpty(password)) {
+            String credential = Credentials.basic(username, password);
+            requestBuilder = requestBuilder.header("Authorization", credential);
+        }
+        Request httpRequest = requestBuilder.build();
+        log.debug("Sending Elasticsearch request to {}", new Object[]{url});
+
+        Response responseHttp = client.newCall(httpRequest).execute();
+
+        // store the status code and message
+        int statusCode = responseHttp.code();
+
+        if (statusCode == 0) {
+            throw new IllegalStateException("Status code unknown, connection hasn't been attempted.");
+        }
+
+        log.debug("Received response from Elasticsearch with status code {}", new Object[]{statusCode});
+
+        return responseHttp;
+    }
+
+    protected JsonNode parseJsonResponse(InputStream in) throws IOException {
+        final ObjectMapper mapper = new ObjectMapper();
+        return mapper.readTree(in);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java
index f810ec3..76c7224 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java
@@ -19,92 +19,37 @@ package org.apache.nifi.processors.elasticsearch;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.StringUtils;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
 
-import java.io.File;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
 
+/**
+ * A base class for all Elasticsearch processors
+ */
 public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
 
-    /**
-     * This validator ensures the Elasticsearch hosts property is a valid list of hostname:port entries
-     */
-    private static final Validator HOSTNAME_PORT_VALIDATOR = new Validator() {
-        @Override
-        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
-            final List<String> esList = Arrays.asList(input.split(","));
-            for (String hostnamePort : esList) {
-                String[] addresses = hostnamePort.split(":");
-                // Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there)
-                if (addresses.length != 2) {
-                    return new ValidationResult.Builder().subject(subject).input(input).explanation(
-                            "Must be in hostname:port form (no scheme such as http://").valid(false).build();
-                }
-            }
-            return new ValidationResult.Builder().subject(subject).input(input).explanation(
-                    "Valid cluster definition").valid(true).build();
-        }
-    };
-
-    protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder()
-            .name("Cluster Name")
-            .description("Name of the ES cluster (for example, elasticsearch_brew). Defaults to 'elasticsearch'")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .defaultValue("elasticsearch")
-            .build();
-
-    protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder()
-            .name("ElasticSearch Hosts")
-            .description("ElasticSearch Hosts, which should be comma separated and colon for hostname/port "
-                    + "host1:port,host2:port,....  For example testcluster:9300.")
-            .required(true)
-            .expressionLanguageSupported(false)
-            .addValidator(HOSTNAME_PORT_VALIDATOR)
-            .build();
-
     public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
             .name("SSL Context Service")
             .description("The SSL Context Service used to provide client certificate information for TLS/SSL "
-                    + "connections. This service only applies if the Shield plugin is available.")
+                    + "connections. This service only applies if the Elasticsearch endpoint(s) have been secured with TLS/SSL.")
             .required(false)
             .identifiesControllerService(SSLContextService.class)
             .build();
 
-    public static final PropertyDescriptor PROP_SHIELD_LOCATION = new PropertyDescriptor.Builder()
-            .name("Shield Plugin Filename")
-            .description("Specifies the path to the JAR for the Elasticsearch Shield plugin. "
-                    + "If the Elasticsearch cluster has been secured with the Shield plugin, then the Shield plugin "
-                    + "JAR must also be available to this processor. Note: Do NOT place the Shield JAR into NiFi's "
-                    + "lib/ directory, doing so will prevent the Shield plugin from being loaded.")
-            .required(false)
-            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+    protected static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("Specifies the character set of the document data.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
@@ -122,36 +67,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
-    protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder()
-            .name("ElasticSearch Ping Timeout")
-            .description("The ping timeout used to determine when a node is unreachable. " +
-                    "For example, 5s (5 seconds). If non-local recommended is 30s")
-            .required(true)
-            .defaultValue("5s")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder()
-            .name("Sampler Interval")
-            .description("How often to sample / ping the nodes listed and connected. For example, 5s (5 seconds). "
-                    + "If non-local recommended is 30s.")
-            .required(true)
-            .defaultValue("5s")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    protected static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
-            .name("Character Set")
-            .description("Specifies the character set of the document data.")
-            .required(true)
-            .defaultValue("UTF-8")
-            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
-            .build();
-
-    protected AtomicReference<Client> esClient = new AtomicReference<>();
-    protected List<InetSocketAddress> esHosts;
-    protected String authToken;
-
+    protected abstract void createElasticsearchClient(ProcessContext context) throws ProcessException;
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
@@ -172,163 +88,4 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
         createElasticsearchClient(context);
     }
 
-    /**
-     * Instantiate ElasticSearch Client. This chould be called by subclasses' @OnScheduled method to create a client
-     * if one does not yet exist. If called when scheduled, closeClient() should be called by the subclasses' @OnStopped
-     * method so the client will be destroyed when the processor is stopped.
-     *
-     * @param context The context for this processor
-     * @throws ProcessException if an error occurs while creating an Elasticsearch client
-     */
-    protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
-
-        ComponentLog log = getLogger();
-        if (esClient.get() != null) {
-            return;
-        }
-
-        log.debug("Creating ElasticSearch Client");
-        try {
-            final String clusterName = context.getProperty(CLUSTER_NAME).getValue();
-            final String pingTimeout = context.getProperty(PING_TIMEOUT).getValue();
-            final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).getValue();
-            final String username = context.getProperty(USERNAME).getValue();
-            final String password = context.getProperty(PASSWORD).getValue();
-
-            final SSLContextService sslService =
-                    context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-
-            Settings.Builder settingsBuilder = Settings.settingsBuilder()
-                    .put("cluster.name", clusterName)
-                    .put("client.transport.ping_timeout", pingTimeout)
-                    .put("client.transport.nodes_sampler_interval", samplerInterval);
-
-            String shieldUrl = context.getProperty(PROP_SHIELD_LOCATION).getValue();
-            if (sslService != null) {
-                settingsBuilder.put("shield.transport.ssl", "true")
-                        .put("shield.ssl.keystore.path", sslService.getKeyStoreFile())
-                        .put("shield.ssl.keystore.password", sslService.getKeyStorePassword())
-                        .put("shield.ssl.truststore.path", sslService.getTrustStoreFile())
-                        .put("shield.ssl.truststore.password", sslService.getTrustStorePassword());
-            }
-
-            // Set username and password for Shield
-            if (!StringUtils.isEmpty(username)) {
-                StringBuffer shieldUser = new StringBuffer(username);
-                if (!StringUtils.isEmpty(password)) {
-                    shieldUser.append(":");
-                    shieldUser.append(password);
-                }
-                settingsBuilder.put("shield.user", shieldUser);
-
-            }
-
-            TransportClient transportClient = getTransportClient(settingsBuilder, shieldUrl, username, password);
-
-            final String hosts = context.getProperty(HOSTS).getValue();
-            esHosts = getEsHosts(hosts);
-
-            if (esHosts != null) {
-                for (final InetSocketAddress host : esHosts) {
-                    try {
-                        transportClient.addTransportAddress(new InetSocketTransportAddress(host));
-                    } catch (IllegalArgumentException iae) {
-                        log.error("Could not add transport address {}", new Object[]{host});
-                    }
-                }
-            }
-            esClient.set(transportClient);
-
-        } catch (Exception e) {
-            log.error("Failed to create Elasticsearch client due to {}", new Object[]{e}, e);
-            throw new ProcessException(e);
-        }
-    }
-
-    protected TransportClient getTransportClient(Settings.Builder settingsBuilder, String shieldUrl,
-                                                 String username, String password)
-            throws MalformedURLException {
-
-        // Create new transport client using the Builder pattern
-        TransportClient.Builder builder = TransportClient.builder();
-
-        // See if the Elasticsearch Shield JAR location was specified, and add the plugin if so. Also create the
-        // authorization token if username and password are supplied.
-        final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
-        if (!StringUtils.isBlank(shieldUrl)) {
-            ClassLoader shieldClassLoader =
-                    new URLClassLoader(new URL[]{new File(shieldUrl).toURI().toURL()}, this.getClass().getClassLoader());
-            Thread.currentThread().setContextClassLoader(shieldClassLoader);
-
-            try {
-                Class shieldPluginClass = Class.forName("org.elasticsearch.shield.ShieldPlugin", true, shieldClassLoader);
-                builder = builder.addPlugin(shieldPluginClass);
-
-                if (!StringUtils.isEmpty(username) && !StringUtils.isEmpty(password)) {
-
-                    // Need a couple of classes from the Shield plugin to build the token
-                    Class usernamePasswordTokenClass =
-                            Class.forName("org.elasticsearch.shield.authc.support.UsernamePasswordToken", true, shieldClassLoader);
-
-                    Class securedStringClass =
-                            Class.forName("org.elasticsearch.shield.authc.support.SecuredString", true, shieldClassLoader);
-
-                    Constructor<?> securedStringCtor = securedStringClass.getConstructor(char[].class);
-                    Object securePasswordString = securedStringCtor.newInstance(password.toCharArray());
-
-                    Method basicAuthHeaderValue = usernamePasswordTokenClass.getMethod("basicAuthHeaderValue", String.class, securedStringClass);
-                    authToken = (String) basicAuthHeaderValue.invoke(null, username, securePasswordString);
-                }
-            } catch (ClassNotFoundException
-                    | NoSuchMethodException
-                    | InstantiationException
-                    | IllegalAccessException
-                    | InvocationTargetException shieldLoadException) {
-                getLogger().debug("Did not detect Elasticsearch Shield plugin, secure connections and/or authorization will not be available");
-            }
-        } else {
-            getLogger().debug("No Shield plugin location specified, secure connections and/or authorization will not be available");
-        }
-        TransportClient transportClient = builder.settings(settingsBuilder.build()).build();
-        Thread.currentThread().setContextClassLoader(originalClassLoader);
-        return transportClient;
-    }
-
-    /**
-     * Dispose of ElasticSearch client
-     */
-    public void closeClient() {
-        if (esClient.get() != null) {
-            getLogger().info("Closing ElasticSearch Client");
-            esClient.get().close();
-            esClient.set(null);
-        }
-    }
-
-    /**
-     * Get the ElasticSearch hosts from a Nifi attribute, e.g.
-     *
-     * @param hosts A comma-separated list of ElasticSearch hosts (host:port,host2:port2, etc.)
-     * @return List of InetSocketAddresses for the ES hosts
-     */
-    private List<InetSocketAddress> getEsHosts(String hosts) {
-
-        if (hosts == null) {
-            return null;
-        }
-        final List<String> esList = Arrays.asList(hosts.split(","));
-        List<InetSocketAddress> esHosts = new ArrayList<>();
-
-        for (String item : esList) {
-
-            String[] addresses = item.split(":");
-            final String hostName = addresses[0].trim();
-            final int port = Integer.parseInt(addresses[1].trim());
-
-            esHosts.add(new InetSocketAddress(hostName, port));
-        }
-        return esHosts;
-    }
-
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java
new file mode 100644
index 0000000..d2989db
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.StringUtils;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+public abstract class AbstractElasticsearchTransportClientProcessor extends AbstractElasticsearchProcessor {
+
+    /**
+     * This validator ensures the Elasticsearch hosts property is a valid list of hostname:port entries
+     */
+    private static final Validator HOSTNAME_PORT_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+            final List<String> esList = Arrays.asList(input.split(","));
+            for (String hostnamePort : esList) {
+                String[] addresses = hostnamePort.split(":");
+                // Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there)
+                if (addresses.length != 2) {
+                    return new ValidationResult.Builder().subject(subject).input(input).explanation(
+                            "Must be in hostname:port form (no scheme such as http://").valid(false).build();
+                }
+            }
+            return new ValidationResult.Builder().subject(subject).input(input).explanation(
+                    "Valid cluster definition").valid(true).build();
+        }
+    };
+
+    protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder()
+            .name("Cluster Name")
+            .description("Name of the ES cluster (for example, elasticsearch_brew). Defaults to 'elasticsearch'")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("elasticsearch")
+            .build();
+
+    protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder()
+            .name("ElasticSearch Hosts")
+            .description("ElasticSearch Hosts, which should be comma separated and colon for hostname/port "
+                    + "host1:port,host2:port,....  For example testcluster:9300. This processor uses the Transport Client to "
+                    + "connect to hosts. The default transport client port is 9300.")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .addValidator(HOSTNAME_PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROP_SHIELD_LOCATION = new PropertyDescriptor.Builder()
+            .name("Shield Plugin Filename")
+            .description("Specifies the path to the JAR for the Elasticsearch Shield plugin. "
+                    + "If the Elasticsearch cluster has been secured with the Shield plugin, then the Shield plugin "
+                    + "JAR must also be available to this processor. Note: Do NOT place the Shield JAR into NiFi's "
+                    + "lib/ directory, doing so will prevent the Shield plugin from being loaded.")
+            .required(false)
+            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("ElasticSearch Ping Timeout")
+            .description("The ping timeout used to determine when a node is unreachable. " +
+                    "For example, 5s (5 seconds). If non-local recommended is 30s")
+            .required(true)
+            .defaultValue("5s")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder()
+            .name("Sampler Interval")
+            .description("How often to sample / ping the nodes listed and connected. For example, 5s (5 seconds). "
+                    + "If non-local recommended is 30s.")
+            .required(true)
+            .defaultValue("5s")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    protected AtomicReference<Client> esClient = new AtomicReference<>();
+    protected List<InetSocketAddress> esHosts;
+    protected String authToken;
+
+    /**
+     * Instantiate ElasticSearch Client. This chould be called by subclasses' @OnScheduled method to create a client
+     * if one does not yet exist. If called when scheduled, closeClient() should be called by the subclasses' @OnStopped
+     * method so the client will be destroyed when the processor is stopped.
+     *
+     * @param context The context for this processor
+     * @throws ProcessException if an error occurs while creating an Elasticsearch client
+     */
+    @Override
+    protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
+
+        ComponentLog log = getLogger();
+        if (esClient.get() != null) {
+            return;
+        }
+
+        log.debug("Creating ElasticSearch Client");
+        try {
+            final String clusterName = context.getProperty(CLUSTER_NAME).getValue();
+            final String pingTimeout = context.getProperty(PING_TIMEOUT).getValue();
+            final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).getValue();
+            final String username = context.getProperty(USERNAME).getValue();
+            final String password = context.getProperty(PASSWORD).getValue();
+
+            final SSLContextService sslService =
+                    context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+            Settings.Builder settingsBuilder = Settings.settingsBuilder()
+                    .put("cluster.name", clusterName)
+                    .put("client.transport.ping_timeout", pingTimeout)
+                    .put("client.transport.nodes_sampler_interval", samplerInterval);
+
+            String shieldUrl = context.getProperty(PROP_SHIELD_LOCATION).getValue();
+            if (sslService != null) {
+                settingsBuilder.put("shield.transport.ssl", "true")
+                        .put("shield.ssl.keystore.path", sslService.getKeyStoreFile())
+                        .put("shield.ssl.keystore.password", sslService.getKeyStorePassword())
+                        .put("shield.ssl.truststore.path", sslService.getTrustStoreFile())
+                        .put("shield.ssl.truststore.password", sslService.getTrustStorePassword());
+            }
+
+            // Set username and password for Shield
+            if (!StringUtils.isEmpty(username)) {
+                StringBuffer shieldUser = new StringBuffer(username);
+                if (!StringUtils.isEmpty(password)) {
+                    shieldUser.append(":");
+                    shieldUser.append(password);
+                }
+                settingsBuilder.put("shield.user", shieldUser);
+
+            }
+
+            TransportClient transportClient = getTransportClient(settingsBuilder, shieldUrl, username, password);
+
+            final String hosts = context.getProperty(HOSTS).getValue();
+            esHosts = getEsHosts(hosts);
+
+            if (esHosts != null) {
+                for (final InetSocketAddress host : esHosts) {
+                    try {
+                        transportClient.addTransportAddress(new InetSocketTransportAddress(host));
+                    } catch (IllegalArgumentException iae) {
+                        log.error("Could not add transport address {}", new Object[]{host});
+                    }
+                }
+            }
+            esClient.set(transportClient);
+
+        } catch (Exception e) {
+            log.error("Failed to create Elasticsearch client due to {}", new Object[]{e}, e);
+            throw new ProcessException(e);
+        }
+    }
+
+    protected TransportClient getTransportClient(Settings.Builder settingsBuilder, String shieldUrl,
+                                                 String username, String password)
+            throws MalformedURLException {
+
+        // Create new transport client using the Builder pattern
+        TransportClient.Builder builder = TransportClient.builder();
+
+        // See if the Elasticsearch Shield JAR location was specified, and add the plugin if so. Also create the
+        // authorization token if username and password are supplied.
+        final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+        if (!StringUtils.isBlank(shieldUrl)) {
+            ClassLoader shieldClassLoader =
+                    new URLClassLoader(new URL[]{new File(shieldUrl).toURI().toURL()}, this.getClass().getClassLoader());
+            Thread.currentThread().setContextClassLoader(shieldClassLoader);
+
+            try {
+                Class shieldPluginClass = Class.forName("org.elasticsearch.shield.ShieldPlugin", true, shieldClassLoader);
+                builder = builder.addPlugin(shieldPluginClass);
+
+                if (!StringUtils.isEmpty(username) && !StringUtils.isEmpty(password)) {
+
+                    // Need a couple of classes from the Shield plugin to build the token
+                    Class usernamePasswordTokenClass =
+                            Class.forName("org.elasticsearch.shield.authc.support.UsernamePasswordToken", true, shieldClassLoader);
+
+                    Class securedStringClass =
+                            Class.forName("org.elasticsearch.shield.authc.support.SecuredString", true, shieldClassLoader);
+
+                    Constructor<?> securedStringCtor = securedStringClass.getConstructor(char[].class);
+                    Object securePasswordString = securedStringCtor.newInstance(password.toCharArray());
+
+                    Method basicAuthHeaderValue = usernamePasswordTokenClass.getMethod("basicAuthHeaderValue", String.class, securedStringClass);
+                    authToken = (String) basicAuthHeaderValue.invoke(null, username, securePasswordString);
+                }
+            } catch (ClassNotFoundException
+                    | NoSuchMethodException
+                    | InstantiationException
+                    | IllegalAccessException
+                    | InvocationTargetException shieldLoadException) {
+                getLogger().debug("Did not detect Elasticsearch Shield plugin, secure connections and/or authorization will not be available");
+            }
+        } else {
+            getLogger().debug("No Shield plugin location specified, secure connections and/or authorization will not be available");
+        }
+        TransportClient transportClient = builder.settings(settingsBuilder.build()).build();
+        Thread.currentThread().setContextClassLoader(originalClassLoader);
+        return transportClient;
+    }
+
+    /**
+     * Dispose of ElasticSearch client
+     */
+    public void closeClient() {
+        if (esClient.get() != null) {
+            getLogger().info("Closing ElasticSearch Client");
+            esClient.get().close();
+            esClient.set(null);
+        }
+    }
+
+    /**
+     * Get the ElasticSearch hosts from a Nifi attribute, e.g.
+     *
+     * @param hosts A comma-separated list of ElasticSearch hosts (host:port,host2:port2, etc.)
+     * @return List of InetSocketAddresses for the ES hosts
+     */
+    private List<InetSocketAddress> getEsHosts(String hosts) {
+
+        if (hosts == null) {
+            return null;
+        }
+        final List<String> esList = Arrays.asList(hosts.split(","));
+        List<InetSocketAddress> esHosts = new ArrayList<>();
+
+        for (String item : esList) {
+
+            String[] addresses = item.split(":");
+            final String hostName = addresses[0].trim();
+            final int port = Integer.parseInt(addresses[1].trim());
+
+            esHosts.add(new InetSocketAddress(hostName, port));
+        }
+        return esHosts;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java
index 8d2e378..67aaae7 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java
@@ -64,7 +64,7 @@ import java.util.Set;
         @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
         @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type")
 })
-public class FetchElasticsearch extends AbstractElasticsearchProcessor {
+public class FetchElasticsearch extends AbstractElasticsearchTransportClientProcessor {
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
             .description("All FlowFiles that are read from Elasticsearch are routed to this relationship").build();

http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
new file mode 100644
index 0000000..9ce1510
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.codehaus.jackson.JsonNode;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@EventDriven
+@SupportsBatching
+@Tags({"elasticsearch", "fetch", "read", "get", "http"})
+@CapabilityDescription("Retrieves a document from Elasticsearch using the specified connection properties and the "
+        + "identifier of the document to retrieve. Note that the full body of the document will be read into memory before being "
+        + "written to a Flow File for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type")
+})
+public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
+
+    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All FlowFiles that are read from Elasticsearch are routed to this relationship.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
+                    + "flow files will be routed to failure.")
+            .build();
+
+    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
+            .description("A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may "
+                    + "succeed. Note that if the processor has no incoming connections, flow files may still be sent to this relationship "
+                    + "based on the processor properties and the results of the fetch operation.")
+            .build();
+
+    public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found")
+            .description("A FlowFile is routed to this relationship if the specified document does not exist in the Elasticsearch cluster. "
+                    + "Note that if the processor has no incoming connections, flow files may still be sent to this relationship based "
+                    + "on the processor properties and the results of the fetch operation.")
+            .build();
+
+    public static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder()
+            .name("fetch-es-doc-id")
+            .displayName("Document Identifier")
+            .description("The identifier of the document to be fetched")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+            .name("fetch-es-index")
+            .displayName("Index")
+            .description("The name of the index to read from")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
+            .name("fetch-es-type")
+            .displayName("Type")
+            .description("The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set "
+                    + "to _all, the first document matching the identifier across all types will be retrieved.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
+            .name("fetch-es-fields")
+            .displayName("Fields")
+            .description("A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, "
+                    + "then the entire document's source will be retrieved.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_RETRY);
+        relationships.add(REL_NOT_FOUND);
+        return Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ES_URL);
+        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(CONNECT_TIMEOUT);
+        descriptors.add(RESPONSE_TIMEOUT);
+        descriptors.add(DOC_ID);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(FIELDS);
+
+        return Collections.unmodifiableList(descriptors);
+    }
+
+
+    @OnScheduled
+    public void setup(ProcessContext context) {
+        super.setup(context);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+
+        FlowFile flowFile = null;
+        if (context.hasIncomingConnection()) {
+            flowFile = session.get();
+
+            // If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
+            // However, if we have no FlowFile and we have connections coming from other Processors, then
+            // we know that we should run only if we have a FlowFile.
+            if (flowFile == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        OkHttpClient okHttpClient = getClient();
+
+        if (flowFile == null) {
+            flowFile = session.create();
+        }
+
+        final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
+        final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
+        final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
+        final String fields = context.getProperty(FIELDS).isSet()
+                ? context.getProperty(FIELDS).evaluateAttributeExpressions(flowFile).getValue()
+                : null;
+
+        // Authentication
+        final String username = context.getProperty(USERNAME).getValue();
+        final String password = context.getProperty(PASSWORD).getValue();
+
+        final ComponentLog logger = getLogger();
+
+
+        try {
+            logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId});
+
+            // read the url property from the context
+            final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).getValue());
+            final URL url = buildRequestURL(urlstr, docId, index, docType, fields);
+            final long startNanos = System.nanoTime();
+
+            final Response getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null);
+            final int statusCode = getResponse.code();
+
+            if (isSuccess(statusCode)) {
+                ResponseBody body = getResponse.body();
+                final byte[] bodyBytes = body.bytes();
+                JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
+                boolean found = responseJson.get("found").asBoolean(false);
+                String retrievedIndex = responseJson.get("_index").asText();
+                String retrievedType = responseJson.get("_type").asText();
+                String retrievedId = responseJson.get("_id").asText();
+
+                if (found) {
+                    JsonNode source = responseJson.get("_source");
+                    flowFile = session.putAttribute(flowFile, "filename", retrievedId);
+                    flowFile = session.putAttribute(flowFile, "es.index", retrievedIndex);
+                    flowFile = session.putAttribute(flowFile, "es.type", retrievedType);
+                    if (source != null) {
+                        flowFile = session.write(flowFile, out -> {
+                            out.write(source.toString().getBytes());
+                        });
+                    }
+                    logger.debug("Elasticsearch document " + retrievedId + " fetched, routing to success");
+
+                    // emit provenance event
+                    final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+                    if (context.hasNonLoopConnection()) {
+                        session.getProvenanceReporter().fetch(flowFile, url.toExternalForm(), millis);
+                    } else {
+                        session.getProvenanceReporter().receive(flowFile, url.toExternalForm(), millis);
+                    }
+                    session.transfer(flowFile, REL_SUCCESS);
+                } else {
+                    logger.warn("Failed to read {}/{}/{} from Elasticsearch: Document not found",
+                            new Object[]{index, docType, docId});
+
+                    // We couldn't find the document, so send it to "not found"
+                    session.transfer(flowFile, REL_NOT_FOUND);
+                }
+            } else {
+                if (statusCode == 404) {
+                    logger.warn("Failed to read {}/{}/{} from Elasticsearch: Document not found",
+                            new Object[]{index, docType, docId});
+
+                    // We couldn't find the document, so penalize it and send it to "not found"
+                    session.transfer(flowFile, REL_NOT_FOUND);
+                } else {
+                    // 5xx -> RETRY, but a server error might last a while, so yield
+                    if (statusCode / 100 == 5) {
+
+                        logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...",
+                                new Object[]{statusCode, getResponse.message()});
+                        session.transfer(flowFile, REL_RETRY);
+                        context.yield();
+                    } else if (context.hasIncomingConnection()) {  // 1xx, 3xx, 4xx -> NO RETRY
+                        logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()});
+                        session.transfer(flowFile, REL_FAILURE);
+                    } else {
+                        logger.warn("Elasticsearch returned code {} with message {}", new Object[]{statusCode, getResponse.message()});
+                        session.remove(flowFile);
+                    }
+                }
+            }
+        } catch (IOException ioe) {
+            logger.error("Failed to read from Elasticsearch due to {}, this may indicate an error in configuration "
+                            + "(hosts, username/password, etc.). Routing to retry",
+                    new Object[]{ioe.getLocalizedMessage()}, ioe);
+            if (context.hasIncomingConnection()) {
+                session.transfer(flowFile, REL_RETRY);
+            } else {
+                session.remove(flowFile);
+            }
+            context.yield();
+
+        } catch (Exception e) {
+            logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e.getLocalizedMessage()}, e);
+            if (context.hasIncomingConnection()) {
+                session.transfer(flowFile, REL_FAILURE);
+            } else {
+                session.remove(flowFile);
+            }
+            context.yield();
+        }
+    }
+
+    private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields) throws MalformedURLException {
+        if (StringUtils.isEmpty(baseUrl)) {
+            throw new MalformedURLException("Base URL cannot be null");
+        }
+        HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
+        builder.addPathSegment(index);
+        builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : type);
+        builder.addPathSegment(docId);
+        if (!StringUtils.isEmpty(fields)) {
+            String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
+            builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
+        }
+
+        return builder.build().url();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
index 5075586..f64180b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
@@ -62,7 +62,7 @@ import java.util.Set;
         + "the index to insert into and the type of the document. If the cluster has been configured for authorization "
         + "and/or secure transport (SSL/TLS) and the Shield plugin is available, secure connections can be made. This processor "
         + "supports Elasticsearch 2.x clusters.")
-public class PutElasticsearch extends AbstractElasticsearchProcessor {
+public class PutElasticsearch extends AbstractElasticsearchTransportClientProcessor {
 
     static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
             .description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build();

http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
new file mode 100644
index 0000000..92b1452
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.util.StringUtils;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.ArrayNode;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.commons.lang3.StringUtils.trimToEmpty;
+
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@SupportsBatching
+@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"})
+@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as "
+        + "the index to insert into and the type of the document.")
+public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+            .description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build();
+
+    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
+            .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
+            .build();
+
+    public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("put-es-id-attr")
+            .displayName("Identifier Attribute")
+            .description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", "
+                    + "this property may be left empty or evaluate to an empty value, in which case the document's identifier will be "
+                    + "auto-generated by Elasticsearch. For all other Index Operations, the attribute must evaluate to a non-empty value.")
+            .required(false)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+            .name("put-es-index")
+            .displayName("Index")
+            .description("The name of the index to insert into")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
+                    AttributeExpression.ResultType.STRING, true))
+            .build();
+
+    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
+            .name("put-es-type")
+            .displayName("Type")
+            .description("The type of this document (used by Elasticsearch for indexing and searching)")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
+                    AttributeExpression.ResultType.STRING, true))
+            .build();
+
+    public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
+            .name("put-es-index-op")
+            .displayName("Index Operation")
+            .description("The type of the operation used to index (index, update, upsert, delete)")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
+                    AttributeExpression.ResultType.STRING, true))
+            .defaultValue("index")
+            .build();
+
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("put-es-batch-size")
+            .displayName("Batch Size")
+            .description("The preferred number of flow files to put to the database in a single transaction. Note that the contents of the "
+                    + "flow files will be stored in memory until the bulk operation is performed. Also the results should be returned in the "
+                    + "same order the flow files were received.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("100")
+            .build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_RETRY);
+        return Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ES_URL);
+        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(CONNECT_TIMEOUT);
+        descriptors.add(RESPONSE_TIMEOUT);
+        descriptors.add(ID_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CHARSET);
+        descriptors.add(BATCH_SIZE);
+        descriptors.add(INDEX_OP);
+        return Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
+        // Since Expression Language is allowed for index operation, we can't guarantee that we can catch
+        // all invalid configurations, but we should catch them as soon as we can. For example, if the
+        // Identifier Attribute property is empty, the Index Operation must evaluate to "index".
+        String idAttribute = validationContext.getProperty(ID_ATTRIBUTE).getValue();
+        String indexOp = validationContext.getProperty(INDEX_OP).getValue();
+
+        if (StringUtils.isEmpty(idAttribute)) {
+            switch (indexOp.toLowerCase()) {
+                case "update":
+                case "upsert":
+                case "delete":
+                case "":
+                    problems.add(new ValidationResult.Builder()
+                            .valid(false)
+                            .subject(INDEX_OP.getDisplayName())
+                            .explanation("If Identifier Attribute is not set, Index Operation must evaluate to \"index\"")
+                            .build());
+                    break;
+                default:
+                    break;
+            }
+        }
+        return problems;
+    }
+
+    @OnScheduled
+    public void setup(ProcessContext context) {
+        super.setup(context);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+        final List<FlowFile> flowFiles = session.get(batchSize);
+        if (flowFiles.isEmpty()) {
+            return;
+        }
+
+        final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue();
+        final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
+        // Authentication
+        final String username = context.getProperty(USERNAME).getValue();
+        final String password = context.getProperty(PASSWORD).getValue();
+
+
+        OkHttpClient okHttpClient = getClient();
+        final ComponentLog logger = getLogger();
+
+        // Keep track of the list of flow files that need to be transferred. As they are transferred, remove them from the list.
+        List<FlowFile> flowFilesToTransfer = new LinkedList<>(flowFiles);
+
+        final StringBuilder sb = new StringBuilder();
+        final String baseUrl = trimToEmpty(context.getProperty(ES_URL).getValue());
+        final URL url;
+        try {
+            url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl + "/") + "_bulk");
+        } catch (MalformedURLException mue) {
+            // Since we have a URL validator, something has gone very wrong, throw a ProcessException
+            context.yield();
+            throw new ProcessException(mue);
+        }
+
+        for (FlowFile file : flowFiles) {
+            final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue();
+            if (StringUtils.isEmpty(index)) {
+                logger.error("No value for index in for {}, transferring to failure", new Object[]{id_attribute, file});
+                flowFilesToTransfer.remove(file);
+                session.transfer(file, REL_FAILURE);
+                continue;
+            }
+            final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue();
+            String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(file).getValue();
+            if (StringUtils.isEmpty(indexOp)) {
+                logger.error("No Index operation specified for {}, transferring to failure.", new Object[]{file});
+                flowFilesToTransfer.remove(file);
+                session.transfer(file, REL_FAILURE);
+                continue;
+            }
+
+            switch (indexOp.toLowerCase()) {
+                case "index":
+                case "update":
+                case "upsert":
+                case "delete":
+                    break;
+                default:
+                    logger.error("Index operation {} not supported for {}, transferring to failure.", new Object[]{indexOp, file});
+                    flowFilesToTransfer.remove(file);
+                    session.transfer(file, REL_FAILURE);
+                    continue;
+            }
+
+            final String id = (id_attribute != null) ? file.getAttribute(id_attribute) : null;
+
+            // The ID must be valid for all operations except "index". For that case,
+            // a missing ID indicates one is to be auto-generated by Elasticsearch
+            if (id == null && !indexOp.equalsIgnoreCase("index")) {
+                logger.error("Index operation {} requires a valid identifier value from a flow file attribute, transferring to failure.",
+                        new Object[]{indexOp, file});
+                flowFilesToTransfer.remove(file);
+                session.transfer(file, REL_FAILURE);
+                continue;
+            }
+
+            final StringBuilder json = new StringBuilder();
+            session.read(file, in -> {
+                json.append(IOUtils.toString(in, charset).replace("\r\n", " ").replace('\n', ' ').replace('\r', ' '));
+            });
+            if (indexOp.equalsIgnoreCase("index")) {
+                sb.append("{\"index\": { \"_index\": \"");
+                sb.append(index);
+                sb.append("\", \"_type\": \"");
+                sb.append(docType);
+                sb.append("\"");
+                if (!StringUtils.isEmpty(id)) {
+                    sb.append(", \"_id\": \"");
+                    sb.append(id);
+                    sb.append("\"");
+                }
+                sb.append("}}\n");
+                sb.append(json);
+                sb.append("\n");
+            } else if (indexOp.equalsIgnoreCase("upsert") || indexOp.equalsIgnoreCase("update")) {
+                sb.append("{\"update\": { \"_index\": \"");
+                sb.append(index);
+                sb.append("\", \"_type\": \"");
+                sb.append(docType);
+                sb.append("\", \"_id\": \"");
+                sb.append(id);
+                sb.append("\" }\n");
+                sb.append("{\"doc\": ");
+                sb.append(json);
+                sb.append(", \"doc_as_upsert\": ");
+                sb.append(indexOp.equalsIgnoreCase("upsert"));
+                sb.append(" }\n");
+            } else if (indexOp.equalsIgnoreCase("delete")) {
+                sb.append("{\"delete\": { \"_index\": \"");
+                sb.append(index);
+                sb.append("\", \"_type\": \"");
+                sb.append(docType);
+                sb.append("\", \"_id\": \"");
+                sb.append(id);
+                sb.append("\" }\n");
+            }
+        }
+        if (!flowFilesToTransfer.isEmpty()) {
+            RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), sb.toString());
+            final Response getResponse;
+            try {
+                getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody);
+            } catch (IllegalStateException | IOException ioe) {
+                throw new ProcessException(ioe);
+            }
+            final int statusCode = getResponse.code();
+
+            if (isSuccess(statusCode)) {
+                ResponseBody responseBody = getResponse.body();
+                try {
+                    final byte[] bodyBytes = responseBody.bytes();
+
+                    JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
+                    boolean errors = responseJson.get("errors").asBoolean(false);
+                    if (errors) {
+                        ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items");
+                        if (itemNodeArray.size() > 0) {
+                            // All items are returned whether they succeeded or failed, so iterate through the item array
+                            // at the same time as the flow file list, moving each to success or failure accordingly
+                            for (int i = 0; i < itemNodeArray.size(); i++) {
+                                JsonNode itemNode = itemNodeArray.get(i);
+                                FlowFile flowFile = flowFilesToTransfer.remove(i);
+                                int status = itemNode.findPath("status").asInt();
+                                if (!isSuccess(status)) {
+                                    String reason = itemNode.findPath("//error/reason").asText();
+                                    logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure",
+                                            new Object[]{flowFile, reason});
+                                    session.transfer(flowFile, REL_FAILURE);
+
+                                } else {
+                                    session.transfer(flowFile, REL_SUCCESS);
+                                    // Record provenance event
+                                    session.getProvenanceReporter().send(flowFile, url.toString());
+                                }
+                            }
+                        }
+                    }
+                    // Transfer any remaining flowfiles to success
+                    flowFilesToTransfer.forEach(file -> {
+                        session.transfer(file, REL_SUCCESS);
+                        // Record provenance event
+                        session.getProvenanceReporter().send(file, url.toString());
+                    });
+                } catch (IOException ioe) {
+                    // Something went wrong when parsing the response, log the error and route to failure
+                    logger.error("Error parsing Bulk API response: {}", new Object[]{ioe.getMessage()}, ioe);
+                    session.transfer(flowFilesToTransfer, REL_FAILURE);
+                    context.yield();
+                }
+            } else {
+                // Something went wrong during the bulk update, throw a ProcessException to indicate rollback
+                throw new ProcessException("Received error code " + statusCode + " from Elasticsearch API");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index e5046fc..782f87e 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -14,3 +14,5 @@
 # limitations under the License.
 org.apache.nifi.processors.elasticsearch.FetchElasticsearch
 org.apache.nifi.processors.elasticsearch.PutElasticsearch
+org.apache.nifi.processors.elasticsearch.FetchElasticsearchHttp
+org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp