You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by le...@apache.org on 2016/07/16 21:31:06 UTC

nutch git commit: NUTCH-2287 Indexer-elastic plugin should use Elasticsearch BulkProcessor and BackoffPolicy

Repository: nutch
Updated Branches:
  refs/heads/master 9dd251d03 -> 80afa3134


NUTCH-2287 Indexer-elastic plugin should use Elasticsearch BulkProcessor and BackoffPolicy


Project: http://git-wip-us.apache.org/repos/asf/nutch/repo
Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/80afa313
Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/80afa313
Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/80afa313

Branch: refs/heads/master
Commit: 80afa3134d7e8de07fcfbb03dba1c51fb7c7dce2
Parents: 9dd251d
Author: Joseph Naegele <jn...@grierforensics.com>
Authored: Thu Jun 30 13:42:34 2016 -0400
Committer: Lewis John McGibbney <le...@gmail.com>
Committed: Sat Jul 16 14:36:35 2016 -0700

----------------------------------------------------------------------
 build.xml                                       |   1 +
 conf/nutch-default.xml                          |  25 +-
 src/plugin/build.xml                            |   1 +
 src/plugin/indexer-elastic/build.xml            |  13 +
 src/plugin/indexer-elastic/plugin.xml           |   5 +-
 .../indexwriter/elastic/ElasticConstants.java   |   5 +-
 .../indexwriter/elastic/ElasticIndexWriter.java | 236 +++++++++----------
 .../src/test/conf/nutch-site-test.xml           |  57 +++++
 .../elastic/TestElasticIndexWriter.java         | 221 +++++++++++++++++
 9 files changed, 431 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index a1c41ed..0ee60a1 100644
--- a/build.xml
+++ b/build.xml
@@ -1032,6 +1032,7 @@
         <source path="${plugins.dir}/indexer-dummy/src/java/" />
         <source path="${plugins.dir}/indexer-solr/src/java/" />
         <source path="${plugins.dir}/indexer-elastic/src/java/" />
+        <source path="${plugins.dir}/indexer-elastic/src/test/" />
         <source path="${plugins.dir}/index-metadata/src/java/" />
         <source path="${plugins.dir}/index-more/src/java/" />
         <source path="${plugins.dir}/index-more/src/test/" />

http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/conf/nutch-default.xml
----------------------------------------------------------------------
diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml
index 324de5c..67326ee 100644
--- a/conf/nutch-default.xml
+++ b/conf/nutch-default.xml
@@ -1812,8 +1812,8 @@ visit https://wiki.apache.org/nutch/SimilarityScoringFilter-->
 <property>
   <name>elastic.host</name>
   <value></value>
-  <description>The hostname to send documents to using TransportClient. Either host
-  and port must be defined or cluster.</description>
+  <description>Comma-separated list of hostnames to send documents to using
+  TransportClient. Either host and port must be defined or cluster.</description>
 </property>
 
 <property> 
@@ -1847,6 +1847,27 @@ visit https://wiki.apache.org/nutch/SimilarityScoringFilter-->
   <description>Maximum size of the bulk in bytes.</description>
 </property>
 
+<property>
+  <name>elastic.exponential.backoff.millis</name>
+  <value>100</value>
+  <description>Initial delay for the BulkProcessor's exponential backoff policy.
+  </description>
+</property>
+
+<property>
+  <name>elastic.exponential.backoff.retries</name>
+  <value>10</value>
+  <description>Number of times the BulkProcessor's exponential backoff policy
+  should retry bulk operations.</description>
+</property>
+
+<property>
+  <name>elastic.bulk.close.timeout</name>
+  <value>600</value>
+  <description>Number of seconds allowed for the BulkProcessor to complete its
+  last operation.</description>
+</property>
+
 <!-- subcollection properties -->
 
 <property>

http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/src/plugin/build.xml
----------------------------------------------------------------------
diff --git a/src/plugin/build.xml b/src/plugin/build.xml
index 75ae2e7..20ef870 100755
--- a/src/plugin/build.xml
+++ b/src/plugin/build.xml
@@ -106,6 +106,7 @@
      <ant dir="index-replace" target="test"/>
      <ant dir="index-links" target="test"/>
      <ant dir="mimetype-filter" target="test"/>
+     <ant dir="indexer-elastic" target="test"/>
      <ant dir="language-identifier" target="test"/>
      <ant dir="lib-http" target="test"/>
      <ant dir="protocol-file" target="test"/>

http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/src/plugin/indexer-elastic/build.xml
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-elastic/build.xml b/src/plugin/indexer-elastic/build.xml
index 38955ff..6955f61 100644
--- a/src/plugin/indexer-elastic/build.xml
+++ b/src/plugin/indexer-elastic/build.xml
@@ -19,4 +19,17 @@
 
   <import file="../build-plugin.xml" />
 
+  <!-- Add compilation dependencies to classpath -->
+  <path id="plugin.deps">
+    <pathelement location="${build.dir}/test/conf"/>
+  </path>
+
+  <!-- Deploy Unit test dependencies -->
+  <target name="deps-test">
+    <copy toDir="${build.test}">
+      <fileset dir="${src.test}" excludes="**/*.java"/>
+    </copy>
+  </target>
+
+
 </project>

http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/src/plugin/indexer-elastic/plugin.xml
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-elastic/plugin.xml b/src/plugin/indexer-elastic/plugin.xml
index d99a665..401e342 100644
--- a/src/plugin/indexer-elastic/plugin.xml
+++ b/src/plugin/indexer-elastic/plugin.xml
@@ -6,9 +6,9 @@
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at
-  
+
   http://www.apache.org/licenses/LICENSE-2.0
-  
+
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -28,7 +28,6 @@
     <library name="guava-18.0.jar"/>
     <library name="HdrHistogram-2.1.6.jar"/>
     <library name="hppc-0.7.1.jar"/>
-    <library name="indexer-elastic.jar"/>
     <library name="jackson-core-2.6.6.jar"/>
     <library name="jackson-dataformat-cbor-2.6.6.jar"/>
     <library name="jackson-dataformat-smile-2.6.6.jar"/>

http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
index b0e70c8..29f36c7 100644
--- a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
+++ b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
@@ -19,10 +19,13 @@ package org.apache.nutch.indexwriter.elastic;
 public interface ElasticConstants {
   public static final String ELASTIC_PREFIX = "elastic.";
 
-  public static final String HOST = ELASTIC_PREFIX + "host";
+  public static final String HOSTS = ELASTIC_PREFIX + "host";
   public static final String PORT = ELASTIC_PREFIX + "port";
   public static final String CLUSTER = ELASTIC_PREFIX + "cluster";
   public static final String INDEX = ELASTIC_PREFIX + "index";
   public static final String MAX_BULK_DOCS = ELASTIC_PREFIX + "max.bulk.docs";
   public static final String MAX_BULK_LENGTH = ELASTIC_PREFIX + "max.bulk.size";
+  public static final String EXPONENTIAL_BACKOFF_MILLIS = ELASTIC_PREFIX + "exponential.backoff.millis";
+  public static final String EXPONENTIAL_BACKOFF_RETRIES = ELASTIC_PREFIX + "exponential.backoff.retries";
+  public static final String BULK_CLOSE_TIMEOUT = ELASTIC_PREFIX + "bulk.close.timeout";
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
index 9367e41..00b96f1 100644
--- a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
+++ b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
@@ -24,68 +24,92 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.nutch.indexer.IndexWriter;
 import org.apache.nutch.indexer.NutchDocument;
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.action.ListenableActionFuture;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequestBuilder;
-import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.settings.Settings.Builder;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.node.Node;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
+ * Sends NutchDocuments to a configured Elasticsearch index.
  */
 public class ElasticIndexWriter implements IndexWriter {
   public static Logger LOG = LoggerFactory.getLogger(ElasticIndexWriter.class);
 
+  private static final int DEFAULT_PORT = 9300;
   private static final int DEFAULT_MAX_BULK_DOCS = 250;
   private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
+  private static final int DEFAULT_EXP_BACKOFF_MILLIS = 100;
+  private static final int DEFAULT_EXP_BACKOFF_RETRIES = 10;
+  private static final int DEFAULT_BULK_CLOSE_TIMEOUT = 600;
+  private static final String DEFAULT_INDEX = "nutch";
 
+  private String defaultIndex;
   private Client client;
   private Node node;
-  private String defaultIndex;
+  private BulkProcessor bulkProcessor;
 
-  private Configuration config;
+  private long bulkCloseTimeout;
 
-  private BulkRequestBuilder bulk;
-  private ListenableActionFuture<BulkResponse> execute;
-  private int port = -1;
-  private String host = null;
-  private String clusterName = null;
-  private int maxBulkDocs;
-  private int maxBulkLength;
-  private long indexedDocs = 0;
-  private int bulkDocs = 0;
-  private int bulkLength = 0;
-  private boolean createNewBulk = false;
+  private Configuration config;
 
   @Override
   public void open(JobConf job, String name) throws IOException {
-    clusterName = job.get(ElasticConstants.CLUSTER);
+    bulkCloseTimeout = job.getLong(ElasticConstants.BULK_CLOSE_TIMEOUT,
+        DEFAULT_BULK_CLOSE_TIMEOUT);
+    defaultIndex = job.get(ElasticConstants.INDEX, DEFAULT_INDEX);
+
+    int maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS,
+        DEFAULT_MAX_BULK_DOCS);
+    int maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH,
+        DEFAULT_MAX_BULK_LENGTH);
+    int expBackoffMillis = job.getInt(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS,
+        DEFAULT_EXP_BACKOFF_MILLIS);
+    int expBackoffRetries = job.getInt(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES,
+        DEFAULT_EXP_BACKOFF_RETRIES);
+
+    client = makeClient(job);
+
+    LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}", maxBulkDocs, maxBulkLength);
+    bulkProcessor = BulkProcessor.builder(client, bulkProcessorListener())
+      .setBulkActions(maxBulkDocs)
+      .setBulkSize(new ByteSizeValue(maxBulkLength, ByteSizeUnit.BYTES))
+      .setConcurrentRequests(1)
+      .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
+          TimeValue.timeValueMillis(expBackoffMillis), expBackoffRetries))
+      .build();
+  }
 
-    host = job.get(ElasticConstants.HOST);
-    port = job.getInt(ElasticConstants.PORT, 9300);
+  /** Generates a TransportClient or NodeClient */
+  protected Client makeClient(Configuration conf) throws IOException {
+    String clusterName = conf.get(ElasticConstants.CLUSTER);
+    String[] hosts = conf.getStrings(ElasticConstants.HOSTS);
+    int port = conf.getInt(ElasticConstants.PORT, DEFAULT_PORT);
 
-    Builder settingsBuilder = Settings.builder();
+    Settings.Builder settingsBuilder = Settings.settingsBuilder();
 
     BufferedReader reader = new BufferedReader(
-        job.getConfResourceAsReader("elasticsearch.conf"));
+        conf.getConfResourceAsReader("elasticsearch.conf"));
     String line;
     String parts[];
-
     while ((line = reader.readLine()) != null) {
       if (StringUtils.isNotBlank(line) && !line.startsWith("#")) {
         line.trim();
@@ -97,29 +121,46 @@ public class ElasticIndexWriter implements IndexWriter {
       }
     }
 
+    // Set the cluster name and build the settings
     if (StringUtils.isNotBlank(clusterName))
       settingsBuilder.put("cluster.name", clusterName);
 
-    // Set the cluster name and build the settings
     Settings settings = settingsBuilder.build();
 
+    Client client = null;
+
     // Prefer TransportClient
-    if (host != null && port > 1) {
-      TransportClient transportClient = TransportClient.builder()
-          .settings(settings).build()
-          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
+    if (hosts != null && port > 1) {
+      TransportClient transportClient = TransportClient.builder().settings(settings).build();
+      for (String host: hosts)
+        transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
       client = transportClient;
     } else if (clusterName != null) {
       node = nodeBuilder().settings(settings).client(true).node();
       client = node.client();
     }
 
-    bulk = client.prepareBulk();
-    defaultIndex = job.get(ElasticConstants.INDEX, "nutch");
-    maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS,
-        DEFAULT_MAX_BULK_DOCS);
-    maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH,
-        DEFAULT_MAX_BULK_LENGTH);
+    return client;
+  }
+
+  /** Generates a default BulkProcessor.Listener */
+  protected BulkProcessor.Listener bulkProcessorListener() {
+    return new BulkProcessor.Listener() {
+      @Override
+      public void beforeBulk(long executionId, BulkRequest request) { }
+
+      @Override
+      public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+        throw new RuntimeException(failure);
+      }
+
+      @Override
+      public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+        if (response.hasFailures()) {
+          LOG.warn("Failures occurred during bulk request");
+        }
+      }
+    };
   }
 
   @Override
@@ -128,60 +169,23 @@ public class ElasticIndexWriter implements IndexWriter {
     String type = doc.getDocumentMeta().get("type");
     if (type == null)
       type = "doc";
-    IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id);
 
+    // Add each field of this doc to the index source
     Map<String, Object> source = new HashMap<String, Object>();
-
-    // Loop through all fields of this doc
     for (String fieldName : doc.getFieldNames()) {
-      if (doc.getField(fieldName).getValues().size() > 1) {
+      if (doc.getFieldValue(fieldName) != null) {
         source.put(fieldName, doc.getFieldValue(fieldName));
-        // Loop through the values to keep track of the size of this
-        // document
-        for (Object value : doc.getField(fieldName).getValues()) {
-          bulkLength += value.toString().length();
-        }
-      } else {
-        if (doc.getFieldValue(fieldName) != null) {
-          source.put(fieldName, doc.getFieldValue(fieldName));
-          bulkLength += doc.getFieldValue(fieldName).toString().length();
-        }
       }
     }
-    request.setSource(source);
-
-    // Add this indexing request to a bulk request
-    bulk.add(request);
-    indexedDocs++;
-    bulkDocs++;
-
-    if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
-      LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = "
-          + bulkLength + ", total docs = " + indexedDocs
-          + ", last doc in bulk = '" + id + "']");
-      // Flush the bulk of indexing requests
-      createNewBulk = true;
-      commit();
-    }
+
+    IndexRequest request = new IndexRequest(defaultIndex, type, id).source(source);
+    bulkProcessor.add(request);
   }
 
   @Override
   public void delete(String key) throws IOException {
-    try {
-      DeleteRequestBuilder builder = client.prepareDelete();
-      builder.setIndex(defaultIndex);
-      builder.setType("doc");
-      builder.setId(key);
-      builder.execute().actionGet();
-    } catch (ElasticsearchException e) {
-      throw makeIOException(e);
-    }
-  }
-
-  public static IOException makeIOException(ElasticsearchException e) {
-    final IOException ioe = new IOException();
-    ioe.initCause(e);
-    return ioe;
+    DeleteRequest request = new DeleteRequest(defaultIndex, "doc", key);
+    bulkProcessor.add(request);
   }
 
   @Override
@@ -191,51 +195,18 @@ public class ElasticIndexWriter implements IndexWriter {
 
   @Override
   public void commit() throws IOException {
-    if (execute != null) {
-      // wait for previous to finish
-      long beforeWait = System.currentTimeMillis();
-      BulkResponse actionGet = execute.actionGet();
-      if (actionGet.hasFailures()) {
-        for (BulkItemResponse item : actionGet) {
-          if (item.isFailed()) {
-            throw new RuntimeException("First failure in bulk: "
-                + item.getFailureMessage());
-          }
-        }
-      }
-      long msWaited = System.currentTimeMillis() - beforeWait;
-      LOG.info("Previous took in ms " + actionGet.getTookInMillis()
-          + ", including wait " + msWaited);
-      execute = null;
-    }
-    if (bulk != null) {
-      if (bulkDocs > 0) {
-        // start a flush, note that this is an asynchronous call
-        execute = bulk.execute();
-      }
-      bulk = null;
-    }
-    if (createNewBulk) {
-      // Prepare a new bulk request
-      bulk = client.prepareBulk();
-      bulkDocs = 0;
-      bulkLength = 0;
-    }
+    bulkProcessor.flush();
   }
 
   @Override
   public void close() throws IOException {
-    // Flush pending requests
-    LOG.info("Processing remaining requests [docs = " + bulkDocs
-        + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]");
-    createNewBulk = false;
-    commit();
-    // flush one more time to finalize the last bulk
-    LOG.info("Processing to finalize last execute");
-    createNewBulk = false;
-    commit();
-
-    // Close
+    // Close BulkProcessor (automatically flushes)
+    try {
+      bulkProcessor.awaitClose(bulkCloseTimeout, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOG.warn("interrupted while waiting for BulkProcessor to complete ({})", e.getMessage());
+    }
+
     client.close();
     if (node != null) {
       node.close();
@@ -247,14 +218,25 @@ public class ElasticIndexWriter implements IndexWriter {
     StringBuffer sb = new StringBuffer("ElasticIndexWriter\n");
     sb.append("\t").append(ElasticConstants.CLUSTER)
         .append(" : elastic prefix cluster\n");
-    sb.append("\t").append(ElasticConstants.HOST).append(" : hostname\n");
+    sb.append("\t").append(ElasticConstants.HOSTS).append(" : hostname\n");
     sb.append("\t").append(ElasticConstants.PORT).append(" : port\n");
     sb.append("\t").append(ElasticConstants.INDEX)
         .append(" : elastic index command \n");
     sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS)
-        .append(" : elastic bulk index doc counts. (default 250) \n");
+        .append(" : elastic bulk index doc counts. (default ")
+        .append(DEFAULT_MAX_BULK_DOCS).append(")\n");
     sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH)
-        .append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n");
+        .append(" : elastic bulk index length in bytes. (default ")
+        .append(DEFAULT_MAX_BULK_LENGTH).append(")\n");
+    sb.append("\t").append(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS)
+        .append(" : elastic bulk exponential backoff initial delay in milliseconds. (default ")
+        .append(DEFAULT_EXP_BACKOFF_MILLIS).append(")\n");
+    sb.append("\t").append(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES)
+        .append(" : elastic bulk exponential backoff max retries. (default ")
+        .append(DEFAULT_EXP_BACKOFF_RETRIES).append(")\n");
+    sb.append("\t").append(ElasticConstants.BULK_CLOSE_TIMEOUT)
+        .append(" : elastic timeout for the last bulk in seconds. (default ")
+        .append(DEFAULT_BULK_CLOSE_TIMEOUT).append(")\n");
     return sb.toString();
   }
 
@@ -262,9 +244,9 @@ public class ElasticIndexWriter implements IndexWriter {
   public void setConf(Configuration conf) {
     config = conf;
     String cluster = conf.get(ElasticConstants.CLUSTER);
-    String host = conf.get(ElasticConstants.HOST);
+    String hosts = conf.get(ElasticConstants.HOSTS);
 
-    if (StringUtils.isBlank(cluster) && StringUtils.isBlank(host)) {
+    if (StringUtils.isBlank(cluster) && StringUtils.isBlank(hosts)) {
       String message = "Missing elastic.cluster and elastic.host. At least one of them should be set in nutch-site.xml ";
       message += "\n" + describe();
       LOG.error(message);

http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/src/plugin/indexer-elastic/src/test/conf/nutch-site-test.xml
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-elastic/src/test/conf/nutch-site-test.xml b/src/plugin/indexer-elastic/src/test/conf/nutch-site-test.xml
new file mode 100644
index 0000000..0a37225
--- /dev/null
+++ b/src/plugin/indexer-elastic/src/test/conf/nutch-site-test.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+
+<property>
+  <name>http.agent.name</name>
+  <value>Nutch-Test</value>
+  <description></description>
+</property>
+
+
+<!-- Elasticsearch properties -->
+
+<property>
+  <name>elastic.host</name>
+  <value>localhost</value>
+  <description>The hostname to send documents to using TransportClient. Either host
+  and port must be defined or cluster.</description>
+</property>
+
+<property>
+  <name>elastic.port</name>
+  <value>9300</value>
+  <description>The port to connect to using TransportClient.</description>
+</property>
+
+<property>
+  <name>elastic.cluster</name>
+  <value>nutch</value>
+  <description>The cluster name to discover. Either host and port must be defined
+  or cluster.</description>
+</property>
+
+<property>
+  <name>elastic.index</name>
+  <value>nutch</value>
+  <description>Default index to send documents to.</description>
+</property>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/TestElasticIndexWriter.java
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/TestElasticIndexWriter.java b/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/TestElasticIndexWriter.java
new file mode 100644
index 0000000..bae9737
--- /dev/null
+++ b/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/TestElasticIndexWriter.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexwriter.elastic;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.util.NutchConfiguration;
+import org.elasticsearch.action.Action;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionRequestBuilder;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.support.AbstractClient;
+import org.elasticsearch.client.support.Headers;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestElasticIndexWriter {
+
+  boolean bulkRequestSuccessful, clusterSaturated;
+  int curNumFailures, maxNumFailures;
+  Configuration conf;
+  Client client;
+  ElasticIndexWriter testIndexWriter;
+
+  @Before
+  public void setup() {
+    conf = NutchConfiguration.create();
+    conf.addResource("nutch-site-test.xml");
+
+    bulkRequestSuccessful = false;
+    clusterSaturated = false;
+    curNumFailures = 0;
+    maxNumFailures = 0;
+
+    Settings settings = Settings.builder().build();
+    ThreadPool threadPool = new ThreadPool(settings);
+    Headers headers = new Headers(settings);
+
+    // customize the ES client to simulate responses from an ES cluster
+    client = new AbstractClient(settings, threadPool, headers) {
+      @Override
+      public void close() { }
+
+      @Override
+      protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
+          Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
+
+        BulkResponse response = null;
+        if (clusterSaturated) {
+          // pretend the cluster is saturated
+          curNumFailures++;
+          if (curNumFailures >= maxNumFailures) {
+            // pretend the cluster is suddenly no longer saturated
+            clusterSaturated = false;
+          }
+
+          // respond with a failure
+          BulkItemResponse failed = new BulkItemResponse(0, "index",
+              new BulkItemResponse.Failure("nutch", "index", "failure0",
+                  new EsRejectedExecutionException("saturated")));
+          response = new BulkResponse(new BulkItemResponse[]{failed}, 0);
+        } else {
+          // respond successfully
+          BulkItemResponse success = new BulkItemResponse(0, "index",
+              new IndexResponse("nutch", "index", "index0", 0, true));
+          response = new BulkResponse(new BulkItemResponse[]{success}, 0);
+        }
+
+        listener.onResponse((Response)response);
+      }
+    };
+
+    // customize the plugin to signal successful bulk operations
+    testIndexWriter = new ElasticIndexWriter() {
+      @Override
+      protected Client makeClient(Configuration conf) {
+        return client;
+      }
+
+      @Override
+      protected BulkProcessor.Listener bulkProcessorListener() {
+        return new BulkProcessor.Listener() {
+
+          @Override
+          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+            if (!response.hasFailures()) {
+              bulkRequestSuccessful = true;
+            }
+          }
+
+          @Override
+          public void afterBulk(long executionId, BulkRequest request, Throwable failure) { }
+
+          @Override
+          public void beforeBulk(long executionId, BulkRequest request) { }
+        };
+      }
+    };
+  }
+
+  @Test
+  public void testBulkMaxDocs() throws IOException {
+    int numDocs = 10;
+    conf.setInt(ElasticConstants.MAX_BULK_DOCS, numDocs);
+    JobConf job = new JobConf(conf);
+
+    testIndexWriter.setConf(conf);
+    testIndexWriter.open(job, "name");
+
+    NutchDocument doc = new NutchDocument();
+    doc.add("id", "http://www.example.com");
+
+    Assert.assertFalse(bulkRequestSuccessful);
+
+    for (int i = 0; i < numDocs; i++) {
+      testIndexWriter.write(doc);
+    }
+
+    testIndexWriter.close();
+
+    Assert.assertTrue(bulkRequestSuccessful);
+  }
+
+  @Test
+  public void testBulkMaxLength() throws IOException {
+    String key = "id";
+    String value = "http://www.example.com";
+
+    int defaultMaxBulkLength = conf.getInt(ElasticConstants.MAX_BULK_LENGTH, 2500500);
+
+    // Test that MAX_BULK_LENGTH is respected by lowering it 10x
+    int testMaxBulkLength = defaultMaxBulkLength / 10;
+
+    // This number is somewhat arbitrary, but must be a function of:
+    // - testMaxBulkLength
+    // - approximate size of each doc
+    int numDocs = testMaxBulkLength / (key.length() + value.length());
+
+    conf.setInt(ElasticConstants.MAX_BULK_LENGTH, testMaxBulkLength);
+    JobConf job = new JobConf(conf);
+
+    testIndexWriter.setConf(conf);
+    testIndexWriter.open(job, "name");
+
+    NutchDocument doc = new NutchDocument();
+    doc.add(key, value);
+
+    Assert.assertFalse(bulkRequestSuccessful);
+
+    for (int i = 0; i < numDocs; i++) {
+      testIndexWriter.write(doc);
+    }
+
+    testIndexWriter.close();
+
+    Assert.assertTrue(bulkRequestSuccessful);
+  }
+
+  @Test
+  public void testBackoffPolicy() throws IOException {
+    // set a non-zero "max-retry" value, **implying the cluster is saturated**
+    maxNumFailures = 5;
+    conf.setInt(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES, maxNumFailures);
+
+    int numDocs = 10;
+    conf.setInt(ElasticConstants.MAX_BULK_DOCS, numDocs);
+
+    JobConf job = new JobConf(conf);
+
+    testIndexWriter.setConf(conf);
+    testIndexWriter.open(job, "name");
+
+    NutchDocument doc = new NutchDocument();
+    doc.add("id", "http://www.example.com");
+
+    // pretend the remote cluster is "saturated"
+    clusterSaturated = true;
+
+    Assert.assertFalse(bulkRequestSuccessful);
+
+    // write enough docs to initiate one bulk request
+    for (int i = 0; i < numDocs; i++) {
+      testIndexWriter.write(doc);
+    }
+
+    testIndexWriter.close();
+
+    // the BulkProcessor should have retried `maxNumFailures + 1` times, then succeeded
+    Assert.assertTrue(bulkRequestSuccessful);
+  }
+
+}