You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 21:32:56 UTC

[1/5] nutch git commit: NUTCH-2267 - Solr and Hadoop JAR mismatch [Forced Update!]

Repository: nutch
Updated Branches:
  refs/heads/NUTCH-2292 82b03bc15 -> 2175c767c (forced update)


NUTCH-2267 - Solr and Hadoop JAR mismatch

Explicitly pass in an instance of SystemDefaultHttpClient to CloudSolrClient, otherwise SolrJ will use a default implementation of CloseableHttpClient, which is not present in the HttpClient and HttpCore JARs in Hadoop < 2.8 (see https://issues.apache.org/jira/browse/SOLR-7948 and https://issues.apache.org/jira/browse/HADOOP-12767).

Project: http://git-wip-us.apache.org/repos/asf/nutch/repo
Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/f64686bb
Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/f64686bb
Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/f64686bb

Branch: refs/heads/NUTCH-2292
Commit: f64686bb06cec2e31c9560d7e7e7f050311d62f1
Parents: d29be63
Author: Steven <sj...@gmail.com>
Authored: Mon Jun 27 09:30:52 2016 -0400
Committer: GitHub <no...@github.com>
Committed: Mon Jun 27 09:30:52 2016 -0400

----------------------------------------------------------------------
 .../src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java    | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nutch/blob/f64686bb/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java b/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
index eec0080..85a9c4c 100644
--- a/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
+++ b/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.http.impl.client.SystemDefaultHttpClient;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -60,7 +61,8 @@ public class SolrUtils {
   }
 
   public static CloudSolrClient getCloudSolrClient(String url) throws MalformedURLException {
-    CloudSolrClient sc = new CloudSolrClient(url.replace('|', ','));
+    SystemDefaultHttpClient httpClient = new SystemDefaultHttpClient();
+    CloudSolrClient sc = new CloudSolrClient(url.replace('|', ','), httpClient);
     sc.setParallelUpdates(true);
     sc.connect();
     return sc;


[4/5] nutch git commit: Merge branch 'master' into NUTCH-2293

Posted by th...@apache.org.
Merge branch 'master' into NUTCH-2293


Project: http://git-wip-us.apache.org/repos/asf/nutch/repo
Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/2175c767
Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/2175c767
Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/2175c767

Branch: refs/heads/NUTCH-2292
Commit: 2175c767c64f92b77ebe497750241e4740acea9b
Parents: 0bf453e 80afa31
Author: Thamme Gowda <th...@apache.org>
Authored: Sat Jul 16 14:32:19 2016 -0700
Committer: Thamme Gowda <th...@apache.org>
Committed: Sat Jul 16 14:32:19 2016 -0700

----------------------------------------------------------------------
 build.xml                                       |   1 +
 conf/nutch-default.xml                          |  25 +-
 nutch-plugins/build.xml                         |   1 +
 nutch-plugins/indexer-elastic/build.xml         |  13 +
 nutch-plugins/indexer-elastic/plugin.xml        |   5 +-
 .../indexwriter/elastic/ElasticConstants.java   |   5 +-
 .../indexwriter/elastic/ElasticIndexWriter.java | 236 +++++++++----------
 .../nutch/indexwriter/solr/SolrUtils.java       |   8 +-
 .../src/test/conf/nutch-site-test.xml           |  57 +++++
 .../elastic/TestElasticIndexWriter.java         | 221 +++++++++++++++++
 10 files changed, 436 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/build.xml
----------------------------------------------------------------------
diff --cc nutch-plugins/build.xml
index 75ae2e7,0000000..20ef870
mode 100755,000000..100755
--- a/nutch-plugins/build.xml
+++ b/nutch-plugins/build.xml
@@@ -1,213 -1,0 +1,214 @@@
 +<?xml version="1.0"?>
 +<!--
 + Licensed to the Apache Software Foundation (ASF) under one or more
 + contributor license agreements.  See the NOTICE file distributed with
 + this work for additional information regarding copyright ownership.
 + The ASF licenses this file to You under the Apache License, Version 2.0
 + (the "License"); you may not use this file except in compliance with
 + the License.  You may obtain a copy of the License at
 +
 +     http://www.apache.org/licenses/LICENSE-2.0
 +
 + Unless required by applicable law or agreed to in writing, software
 + distributed under the License is distributed on an "AS IS" BASIS,
 + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + See the License for the specific language governing permissions and
 + limitations under the License.
 +-->
 +<project name="Nutch" default="deploy-core" basedir=".">
 +
 +  <target name="deploy-core">
 +    <ant target="compile-core" inheritall="false" dir="../.."/>
 +    <ant target="deploy"/>
 +  </target>
 +
 +  <!-- ====================================================== -->
 +  <!-- Build & deploy all the plugin jars.                    -->
 +  <!-- ====================================================== -->
 +  <target name="deploy">
 +     <ant dir="creativecommons" target="deploy"/>
 +     <ant dir="feed" target="deploy"/>
 +     <ant dir="headings" target="deploy"/>
 +     <ant dir="index-basic" target="deploy"/>
 +     <ant dir="index-anchor" target="deploy"/>
 +     <ant dir="index-geoip" target="deploy"/>
 +     <ant dir="index-more" target="deploy"/>
 +     <ant dir="index-replace" target="deploy"/>
 +     <ant dir="index-static" target="deploy"/>
 +     <ant dir="index-metadata" target="deploy"/>
 +     <ant dir="index-links" target="deploy"/>
 +     <ant dir="mimetype-filter" target="deploy"/>
 +     <ant dir="indexer-cloudsearch" target="deploy"/>
 +     <ant dir="indexer-dummy" target="deploy"/>
 +     <ant dir="indexer-elastic" target="deploy"/>
 +     <ant dir="indexer-solr" target="deploy"/>
 +     <ant dir="language-identifier" target="deploy"/>
 +     <ant dir="lib-http" target="deploy"/>
 +     <ant dir="lib-nekohtml" target="deploy"/>
 +     <ant dir="lib-regex-filter" target="deploy"/>
 +     <ant dir="lib-xml" target="deploy"/>
 +     <ant dir="microformats-reltag" target="deploy"/>
 +     <ant dir="nutch-extensionpoints" target="deploy"/>
 +     <ant dir="protocol-file" target="deploy"/>
 +     <ant dir="protocol-ftp" target="deploy"/>
 +     <ant dir="protocol-http" target="deploy"/>
 +     <ant dir="protocol-httpclient" target="deploy"/>
 +     <ant dir="lib-htmlunit" target="deploy"/>
 +     <ant dir="protocol-htmlunit" target="deploy" />
 +     <ant dir="lib-selenium" target="deploy"/>
 +     <ant dir="protocol-selenium" target="deploy" />
 +     <ant dir="protocol-interactiveselenium" target="deploy" />
 +     <ant dir="parse-ext" target="deploy"/>
 +     <ant dir="parse-js" target="deploy"/>
 +     <ant dir="parse-html" target="deploy"/>
 +     <ant dir="parse-metatags" target="deploy"/>
 +     <ant dir="parse-swf" target="deploy"/>
 +     <ant dir="parse-tika" target="deploy"/>
 +     <ant dir="parse-zip" target="deploy"/>
 +     <ant dir="scoring-depth" target="deploy"/>
 +     <ant dir="scoring-opic" target="deploy"/>
 +     <ant dir="scoring-link" target="deploy"/>
 +     <ant dir="scoring-similarity" target="deploy"/>
 +     <ant dir="subcollection" target="deploy"/>
 +     <ant dir="tld" target="deploy"/>
 +     <ant dir="urlfilter-automaton" target="deploy"/>
 +     <ant dir="urlfilter-domain" target="deploy" />
 +     <ant dir="urlfilter-domainblacklist" target="deploy" />
 +     <ant dir="urlfilter-prefix" target="deploy"/>
 +     <ant dir="urlfilter-regex" target="deploy"/>
 +     <ant dir="urlfilter-suffix" target="deploy"/>
 +     <ant dir="urlfilter-validator" target="deploy"/>
 +     <ant dir="urlfilter-ignoreexempt" target="deploy"/>
 +     <ant dir="parsefilter-naivebayes" target="deploy"/>
 +     <ant dir="parsefilter-regex" target="deploy"/>
 +     <ant dir="urlmeta" target="deploy"/>
 +     <ant dir="urlnormalizer-ajax" target="deploy"/>
 +     <ant dir="urlnormalizer-basic" target="deploy"/>
 +     <ant dir="urlnormalizer-host" target="deploy"/>
 +     <ant dir="urlnormalizer-pass" target="deploy"/>
 +     <ant dir="urlnormalizer-protocol" target="deploy"/>
 +     <ant dir="urlnormalizer-querystring" target="deploy"/>
 +     <ant dir="urlnormalizer-regex" target="deploy"/>
 +     <ant dir="urlnormalizer-slash" target="deploy"/>
 +  </target>
 +
 +  <!-- ====================================================== -->
 +  <!-- Test all of the plugins.                               -->
 +  <!-- ====================================================== -->
 +  <target name="test">
 +    <parallel threadCount="2">
 +     <ant dir="creativecommons" target="test"/>
 +     <ant dir="index-basic" target="test"/>
 +     <ant dir="index-anchor" target="test"/>
 +     <ant dir="index-geoip" target="test"/>
 +     <ant dir="index-more" target="test"/>
 +     <ant dir="index-static" target="test"/>
 +     <ant dir="index-replace" target="test"/>
 +     <ant dir="index-links" target="test"/>
 +     <ant dir="mimetype-filter" target="test"/>
++     <ant dir="indexer-elastic" target="test"/>
 +     <ant dir="language-identifier" target="test"/>
 +     <ant dir="lib-http" target="test"/>
 +     <ant dir="protocol-file" target="test"/>
 +     <ant dir="protocol-http" target="test"/>
 +     <ant dir="protocol-httpclient" target="test"/>
 +     <!--ant dir="parse-ext" target="test"/-->
 +     <ant dir="feed" target="test"/>
 +     <ant dir="parse-html" target="test"/>
 +     <ant dir="parse-metatags" target="test"/>
 +     <ant dir="parse-swf" target="test"/>
 +     <ant dir="parse-tika" target="test"/>
 +     <ant dir="parse-zip" target="test"/>
 +     <ant dir="parsefilter-regex" target="test"/>
 +     <ant dir="subcollection" target="test"/>
 +     <ant dir="urlfilter-automaton" target="test"/>
 +     <ant dir="urlfilter-domain" target="test"/>
 +     <ant dir="urlfilter-domainblacklist" target="test"/>
 +     <ant dir="urlfilter-prefix" target="test"/>
 +     <ant dir="urlfilter-regex" target="test"/>
 +     <ant dir="urlfilter-suffix" target="test"/>
 +     <ant dir="urlfilter-validator" target="test"/>
 +     <ant dir="urlfilter-ignoreexempt" target="test"/>
 +     <ant dir="urlnormalizer-ajax" target="test"/>
 +     <ant dir="urlnormalizer-basic" target="test"/>
 +     <ant dir="urlnormalizer-host" target="test"/>
 +     <ant dir="urlnormalizer-pass" target="test"/>
 +     <ant dir="urlnormalizer-protocol" target="test"/>
 +     <ant dir="urlnormalizer-querystring" target="test"/>
 +     <ant dir="urlnormalizer-regex" target="test"/>
 +     <ant dir="urlnormalizer-slash" target="test"/>
 +    </parallel>
 +  </target>
 +
 +  <!-- ====================================================== -->
 +  <!-- Clean all of the plugins.                              -->
 +  <!-- ====================================================== -->
 +  <target name="clean">
 +    <ant dir="creativecommons" target="clean"/>
 +    <ant dir="feed" target="clean"/>
 +    <ant dir="headings" target="clean"/>
 +    <ant dir="index-basic" target="clean"/>
 +    <ant dir="index-anchor" target="clean"/>
 +    <ant dir="index-geoip" target="clean"/>
 +    <ant dir="index-more" target="clean"/>
 +    <ant dir="index-static" target="clean"/>
 +    <ant dir="index-replace" target="clean"/>
 +    <ant dir="index-metadata" target="clean"/>
 +    <ant dir="index-links" target="clean"/>
 +    <ant dir="mimetype-filter" target="clean"/>
 +    <ant dir="indexer-cloudsearch" target="clean"/>
 +    <ant dir="indexer-dummy" target="clean"/>
 +    <ant dir="indexer-elastic" target="clean"/>
 +    <ant dir="indexer-solr" target="clean"/>
 +    <ant dir="language-identifier" target="clean"/>
 +    <!-- <ant dir="lib-commons-httpclient" target="clean"/> -->
 +    <ant dir="lib-http" target="clean"/>
 +    <!-- <ant dir="lib-lucene-analyzers" target="clean"/>-->
 +    <ant dir="lib-nekohtml" target="clean"/>
 +    <ant dir="lib-regex-filter" target="clean"/>
 +    <ant dir="lib-xml" target="clean"/>
 +    <ant dir="microformats-reltag" target="clean"/>
 +    <ant dir="nutch-extensionpoints" target="clean"/>
 +    <ant dir="protocol-file" target="clean"/>
 +    <ant dir="protocol-ftp" target="clean"/>
 +    <ant dir="protocol-http" target="clean"/>
 +    <ant dir="protocol-httpclient" target="clean"/>
 +    <ant dir="lib-htmlunit" target="clean"/>
 +    <ant dir="protocol-htmlunit" target="clean" />
 +    <ant dir="lib-selenium" target="clean"/>
 +    <ant dir="protocol-selenium" target="clean" />
 +    <ant dir="protocol-interactiveselenium" target="clean" />
 +    <ant dir="parse-ext" target="clean"/>
 +    <ant dir="parse-js" target="clean"/>
 +    <ant dir="parse-html" target="clean"/>
 +    <ant dir="parse-metatags" target="clean"/>
 +    <ant dir="parse-swf" target="clean"/>
 +    <ant dir="parse-tika" target="clean"/>
 +    <ant dir="parse-zip" target="clean"/>
 +    <ant dir="parsefilter-regex" target="clean"/>
 +    <ant dir="scoring-depth" target="clean"/>
 +    <ant dir="scoring-opic" target="clean"/>
 +    <ant dir="scoring-link" target="clean"/>
 +    <ant dir="scoring-similarity" target="clean"/>
 +    <ant dir="subcollection" target="clean"/>
 +    <ant dir="tld" target="clean"/>
 +    <ant dir="urlfilter-automaton" target="clean"/>
 +    <ant dir="urlfilter-domain" target="clean" />
 +    <ant dir="urlfilter-domainblacklist" target="clean" />
 +    <ant dir="urlfilter-prefix" target="clean"/>
 +    <ant dir="urlfilter-regex" target="clean"/>
 +    <ant dir="urlfilter-suffix" target="clean"/>
 +    <ant dir="urlfilter-validator" target="clean"/>
 +    <ant dir="urlfilter-ignoreexempt" target="clean"/>
 +    <ant dir="parsefilter-naivebayes" target="clean" />
 +    <ant dir="urlmeta" target="clean"/>
 +    <ant dir="urlnormalizer-ajax" target="clean"/>
 +    <ant dir="urlnormalizer-basic" target="clean"/>
 +    <ant dir="urlnormalizer-host" target="clean"/>
 +    <ant dir="urlnormalizer-pass" target="clean"/>
 +    <ant dir="urlnormalizer-protocol" target="clean"/>
 +    <ant dir="urlnormalizer-querystring" target="clean"/>
 +    <ant dir="urlnormalizer-regex" target="clean"/>
 +    <ant dir="urlnormalizer-slash" target="clean"/>
 +  </target>
 +</project>

http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-elastic/build.xml
----------------------------------------------------------------------
diff --cc nutch-plugins/indexer-elastic/build.xml
index 38955ff,0000000..6955f61
mode 100644,000000..100644
--- a/nutch-plugins/indexer-elastic/build.xml
+++ b/nutch-plugins/indexer-elastic/build.xml
@@@ -1,22 -1,0 +1,35 @@@
 +<?xml version="1.0"?>
 +<!--
 + Licensed to the Apache Software Foundation (ASF) under one or more
 + contributor license agreements.  See the NOTICE file distributed with
 + this work for additional information regarding copyright ownership.
 + The ASF licenses this file to You under the Apache License, Version 2.0
 + (the "License"); you may not use this file except in compliance with
 + the License.  You may obtain a copy of the License at
 +
 +     http://www.apache.org/licenses/LICENSE-2.0
 +
 + Unless required by applicable law or agreed to in writing, software
 + distributed under the License is distributed on an "AS IS" BASIS,
 + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + See the License for the specific language governing permissions and
 + limitations under the License.
 +-->
 +<project name="indexer-elastic" default="jar-core">
 +
 +  <import file="../build-plugin.xml" />
 +
++  <!-- Add compilation dependencies to classpath -->
++  <path id="plugin.deps">
++    <pathelement location="${build.dir}/test/conf"/>
++  </path>
++
++  <!-- Deploy Unit test dependencies -->
++  <target name="deps-test">
++    <copy toDir="${build.test}">
++      <fileset dir="${src.test}" excludes="**/*.java"/>
++    </copy>
++  </target>
++
++
 +</project>

http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-elastic/plugin.xml
----------------------------------------------------------------------
diff --cc nutch-plugins/indexer-elastic/plugin.xml
index d99a665,0000000..401e342
mode 100644,000000..100644
--- a/nutch-plugins/indexer-elastic/plugin.xml
+++ b/nutch-plugins/indexer-elastic/plugin.xml
@@@ -1,71 -1,0 +1,70 @@@
 +<?xml version="1.0" encoding="UTF-8"?>
 +<!--
 +  Licensed to the Apache Software Foundation (ASF) under one or more
 +  contributor license agreements.  See the NOTICE file distributed with
 +  this work for additional information regarding copyright ownership.
 +  The ASF licenses this file to You under the Apache License, Version 2.0
 +  (the "License"); you may not use this file except in compliance with
 +  the License.  You may obtain a copy of the License at
-   
++
 +  http://www.apache.org/licenses/LICENSE-2.0
-   
++
 +  Unless required by applicable law or agreed to in writing, software
 +  distributed under the License is distributed on an "AS IS" BASIS,
 +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +  See the License for the specific language governing permissions and
 +  limitations under the License.
 +-->
 +<plugin id="indexer-elastic" name="ElasticIndexWriter" version="1.0.0"
 +  provider-name="nutch.apache.org">
 +
 +  <runtime>
 +    <library name="indexer-elastic.jar">
 +      <export name="*" />
 +    </library>
 +    <library name="elasticsearch-2.3.3.jar"/>
 +    <library name="commons-cli-1.3.1.jar"/>
 +    <library name="compress-lzf-1.0.2.jar"/>
 +    <library name="guava-18.0.jar"/>
 +    <library name="HdrHistogram-2.1.6.jar"/>
 +    <library name="hppc-0.7.1.jar"/>
-     <library name="indexer-elastic.jar"/>
 +    <library name="jackson-core-2.6.6.jar"/>
 +    <library name="jackson-dataformat-cbor-2.6.6.jar"/>
 +    <library name="jackson-dataformat-smile-2.6.6.jar"/>
 +    <library name="jackson-dataformat-yaml-2.6.6.jar"/>
 +    <library name="joda-convert-1.2.jar"/>
 +    <library name="joda-time-2.8.2.jar"/>
 +    <library name="jsr166e-1.1.0.jar"/>
 +    <library name="lucene-analyzers-common-5.5.0.jar"/>
 +    <library name="lucene-backward-codecs-5.5.0.jar"/>
 +    <library name="lucene-core-5.5.0.jar"/>
 +    <library name="lucene-grouping-5.5.0.jar"/>
 +    <library name="lucene-highlighter-5.5.0.jar"/>
 +    <library name="lucene-join-5.5.0.jar"/>
 +    <library name="lucene-memory-5.5.0.jar"/>
 +    <library name="lucene-misc-5.5.0.jar"/>
 +    <library name="lucene-queries-5.5.0.jar"/>
 +    <library name="lucene-queryparser-5.5.0.jar"/>
 +    <library name="lucene-sandbox-5.5.0.jar"/>
 +    <library name="lucene-spatial-5.5.0.jar"/>
 +    <library name="lucene-spatial3d-5.5.0.jar"/>
 +    <library name="lucene-suggest-5.5.0.jar"/>
 +    <library name="netty-3.10.5.Final.jar"/>
 +    <library name="securesm-1.0.jar"/>
 +    <library name="snakeyaml-1.15.jar"/>
 +    <library name="spatial4j-0.5.jar"/>
 +    <library name="t-digest-3.0.jar"/>
 +  </runtime>
 +
 +  <requires>
 +    <import plugin="nutch-extensionpoints" />
 +  </requires>
 +
 +  <extension id="org.apache.nutch.indexer.elastic"
 +    name="Elasticsearch Index Writer"
 +    point="org.apache.nutch.indexer.IndexWriter">
 +    <implementation id="ElasticIndexWriter"
 +      class="org.apache.nutch.indexwriter.elastic.ElasticIndexWriter" />
 +  </extension>
 +
 +</plugin>

http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
----------------------------------------------------------------------
diff --cc nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
index b0e70c8,0000000..29f36c7
mode 100644,000000..100644
--- a/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
+++ b/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
@@@ -1,28 -1,0 +1,31 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.nutch.indexwriter.elastic;
 +
 +public interface ElasticConstants {
 +  public static final String ELASTIC_PREFIX = "elastic.";
 +
-   public static final String HOST = ELASTIC_PREFIX + "host";
++  public static final String HOSTS = ELASTIC_PREFIX + "host";
 +  public static final String PORT = ELASTIC_PREFIX + "port";
 +  public static final String CLUSTER = ELASTIC_PREFIX + "cluster";
 +  public static final String INDEX = ELASTIC_PREFIX + "index";
 +  public static final String MAX_BULK_DOCS = ELASTIC_PREFIX + "max.bulk.docs";
 +  public static final String MAX_BULK_LENGTH = ELASTIC_PREFIX + "max.bulk.size";
++  public static final String EXPONENTIAL_BACKOFF_MILLIS = ELASTIC_PREFIX + "exponential.backoff.millis";
++  public static final String EXPONENTIAL_BACKOFF_RETRIES = ELASTIC_PREFIX + "exponential.backoff.retries";
++  public static final String BULK_CLOSE_TIMEOUT = ELASTIC_PREFIX + "bulk.close.timeout";
 +}

http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
----------------------------------------------------------------------
diff --cc nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
index 9367e41,0000000..00b96f1
mode 100644,000000..100644
--- a/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
+++ b/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
@@@ -1,279 -1,0 +1,261 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.nutch.indexwriter.elastic;
 +
 +import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
 +
 +import java.io.BufferedReader;
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.util.HashMap;
 +import java.util.Map;
++import java.util.concurrent.TimeUnit;
 +
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.mapred.JobConf;
 +import org.apache.nutch.indexer.IndexWriter;
 +import org.apache.nutch.indexer.NutchDocument;
- import org.elasticsearch.ElasticsearchException;
- import org.elasticsearch.action.ListenableActionFuture;
- import org.elasticsearch.action.bulk.BulkItemResponse;
- import org.elasticsearch.action.bulk.BulkRequestBuilder;
 +import org.elasticsearch.action.bulk.BulkResponse;
- import org.elasticsearch.action.delete.DeleteRequestBuilder;
- import org.elasticsearch.action.index.IndexRequestBuilder;
++import org.elasticsearch.action.bulk.BulkRequest;
++import org.elasticsearch.action.bulk.BackoffPolicy;
++import org.elasticsearch.action.bulk.BulkProcessor;
++import org.elasticsearch.action.delete.DeleteRequest;
++import org.elasticsearch.action.index.IndexRequest;
 +import org.elasticsearch.client.Client;
 +import org.elasticsearch.client.transport.TransportClient;
++import org.elasticsearch.common.unit.ByteSizeUnit;
++import org.elasticsearch.common.unit.ByteSizeValue;
++import org.elasticsearch.common.unit.TimeValue;
 +import org.elasticsearch.common.settings.Settings;
- import org.elasticsearch.common.settings.Settings.Builder;
 +import org.elasticsearch.common.transport.InetSocketTransportAddress;
 +import org.elasticsearch.node.Node;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +/**
++ * Sends NutchDocuments to a configured Elasticsearch index.
 + */
 +public class ElasticIndexWriter implements IndexWriter {
 +  public static Logger LOG = LoggerFactory.getLogger(ElasticIndexWriter.class);
 +
++  private static final int DEFAULT_PORT = 9300;
 +  private static final int DEFAULT_MAX_BULK_DOCS = 250;
 +  private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
++  private static final int DEFAULT_EXP_BACKOFF_MILLIS = 100;
++  private static final int DEFAULT_EXP_BACKOFF_RETRIES = 10;
++  private static final int DEFAULT_BULK_CLOSE_TIMEOUT = 600;
++  private static final String DEFAULT_INDEX = "nutch";
 +
++  private String defaultIndex;
 +  private Client client;
 +  private Node node;
-   private String defaultIndex;
++  private BulkProcessor bulkProcessor;
 +
-   private Configuration config;
++  private long bulkCloseTimeout;
 +
-   private BulkRequestBuilder bulk;
-   private ListenableActionFuture<BulkResponse> execute;
-   private int port = -1;
-   private String host = null;
-   private String clusterName = null;
-   private int maxBulkDocs;
-   private int maxBulkLength;
-   private long indexedDocs = 0;
-   private int bulkDocs = 0;
-   private int bulkLength = 0;
-   private boolean createNewBulk = false;
++  private Configuration config;
 +
 +  @Override
 +  public void open(JobConf job, String name) throws IOException {
-     clusterName = job.get(ElasticConstants.CLUSTER);
++    bulkCloseTimeout = job.getLong(ElasticConstants.BULK_CLOSE_TIMEOUT,
++        DEFAULT_BULK_CLOSE_TIMEOUT);
++    defaultIndex = job.get(ElasticConstants.INDEX, DEFAULT_INDEX);
++
++    int maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS,
++        DEFAULT_MAX_BULK_DOCS);
++    int maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH,
++        DEFAULT_MAX_BULK_LENGTH);
++    int expBackoffMillis = job.getInt(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS,
++        DEFAULT_EXP_BACKOFF_MILLIS);
++    int expBackoffRetries = job.getInt(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES,
++        DEFAULT_EXP_BACKOFF_RETRIES);
++
++    client = makeClient(job);
++
++    LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}", maxBulkDocs, maxBulkLength);
++    bulkProcessor = BulkProcessor.builder(client, bulkProcessorListener())
++      .setBulkActions(maxBulkDocs)
++      .setBulkSize(new ByteSizeValue(maxBulkLength, ByteSizeUnit.BYTES))
++      .setConcurrentRequests(1)
++      .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
++          TimeValue.timeValueMillis(expBackoffMillis), expBackoffRetries))
++      .build();
++  }
 +
-     host = job.get(ElasticConstants.HOST);
-     port = job.getInt(ElasticConstants.PORT, 9300);
++  /** Generates a TransportClient or NodeClient */
++  protected Client makeClient(Configuration conf) throws IOException {
++    String clusterName = conf.get(ElasticConstants.CLUSTER);
++    String[] hosts = conf.getStrings(ElasticConstants.HOSTS);
++    int port = conf.getInt(ElasticConstants.PORT, DEFAULT_PORT);
 +
-     Builder settingsBuilder = Settings.builder();
++    Settings.Builder settingsBuilder = Settings.settingsBuilder();
 +
 +    BufferedReader reader = new BufferedReader(
-         job.getConfResourceAsReader("elasticsearch.conf"));
++        conf.getConfResourceAsReader("elasticsearch.conf"));
 +    String line;
 +    String parts[];
- 
 +    while ((line = reader.readLine()) != null) {
 +      if (StringUtils.isNotBlank(line) && !line.startsWith("#")) {
 +        line.trim();
 +        parts = line.split("=");
 +
 +        if (parts.length == 2) {
 +          settingsBuilder.put(parts[0].trim(), parts[1].trim());
 +        }
 +      }
 +    }
 +
++    // Set the cluster name and build the settings
 +    if (StringUtils.isNotBlank(clusterName))
 +      settingsBuilder.put("cluster.name", clusterName);
 +
-     // Set the cluster name and build the settings
 +    Settings settings = settingsBuilder.build();
 +
++    Client client = null;
++
 +    // Prefer TransportClient
-     if (host != null && port > 1) {
-       TransportClient transportClient = TransportClient.builder()
-           .settings(settings).build()
-           .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
++    if (hosts != null && port > 1) {
++      TransportClient transportClient = TransportClient.builder().settings(settings).build();
++      for (String host: hosts)
++        transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
 +      client = transportClient;
 +    } else if (clusterName != null) {
 +      node = nodeBuilder().settings(settings).client(true).node();
 +      client = node.client();
 +    }
 +
-     bulk = client.prepareBulk();
-     defaultIndex = job.get(ElasticConstants.INDEX, "nutch");
-     maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS,
-         DEFAULT_MAX_BULK_DOCS);
-     maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH,
-         DEFAULT_MAX_BULK_LENGTH);
++    return client;
++  }
++
++  /** Generates a default BulkProcessor.Listener */
++  protected BulkProcessor.Listener bulkProcessorListener() {
++    return new BulkProcessor.Listener() {
++      @Override
++      public void beforeBulk(long executionId, BulkRequest request) { }
++
++      @Override
++      public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
++        throw new RuntimeException(failure);
++      }
++
++      @Override
++      public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
++        if (response.hasFailures()) {
++          LOG.warn("Failures occurred during bulk request");
++        }
++      }
++    };
 +  }
 +
 +  @Override
 +  public void write(NutchDocument doc) throws IOException {
 +    String id = (String) doc.getFieldValue("id");
 +    String type = doc.getDocumentMeta().get("type");
 +    if (type == null)
 +      type = "doc";
-     IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id);
 +
++    // Add each field of this doc to the index source
 +    Map<String, Object> source = new HashMap<String, Object>();
- 
-     // Loop through all fields of this doc
 +    for (String fieldName : doc.getFieldNames()) {
-       if (doc.getField(fieldName).getValues().size() > 1) {
++      if (doc.getFieldValue(fieldName) != null) {
 +        source.put(fieldName, doc.getFieldValue(fieldName));
-         // Loop through the values to keep track of the size of this
-         // document
-         for (Object value : doc.getField(fieldName).getValues()) {
-           bulkLength += value.toString().length();
-         }
-       } else {
-         if (doc.getFieldValue(fieldName) != null) {
-           source.put(fieldName, doc.getFieldValue(fieldName));
-           bulkLength += doc.getFieldValue(fieldName).toString().length();
-         }
 +      }
 +    }
-     request.setSource(source);
- 
-     // Add this indexing request to a bulk request
-     bulk.add(request);
-     indexedDocs++;
-     bulkDocs++;
- 
-     if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
-       LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = "
-           + bulkLength + ", total docs = " + indexedDocs
-           + ", last doc in bulk = '" + id + "']");
-       // Flush the bulk of indexing requests
-       createNewBulk = true;
-       commit();
-     }
++
++    IndexRequest request = new IndexRequest(defaultIndex, type, id).source(source);
++    bulkProcessor.add(request);
 +  }
 +
 +  @Override
 +  public void delete(String key) throws IOException {
-     try {
-       DeleteRequestBuilder builder = client.prepareDelete();
-       builder.setIndex(defaultIndex);
-       builder.setType("doc");
-       builder.setId(key);
-       builder.execute().actionGet();
-     } catch (ElasticsearchException e) {
-       throw makeIOException(e);
-     }
-   }
- 
-   public static IOException makeIOException(ElasticsearchException e) {
-     final IOException ioe = new IOException();
-     ioe.initCause(e);
-     return ioe;
++    DeleteRequest request = new DeleteRequest(defaultIndex, "doc", key);
++    bulkProcessor.add(request);
 +  }
 +
 +  @Override
 +  public void update(NutchDocument doc) throws IOException {
 +    write(doc);
 +  }
 +
 +  @Override
 +  public void commit() throws IOException {
-     if (execute != null) {
-       // wait for previous to finish
-       long beforeWait = System.currentTimeMillis();
-       BulkResponse actionGet = execute.actionGet();
-       if (actionGet.hasFailures()) {
-         for (BulkItemResponse item : actionGet) {
-           if (item.isFailed()) {
-             throw new RuntimeException("First failure in bulk: "
-                 + item.getFailureMessage());
-           }
-         }
-       }
-       long msWaited = System.currentTimeMillis() - beforeWait;
-       LOG.info("Previous took in ms " + actionGet.getTookInMillis()
-           + ", including wait " + msWaited);
-       execute = null;
-     }
-     if (bulk != null) {
-       if (bulkDocs > 0) {
-         // start a flush, note that this is an asynchronous call
-         execute = bulk.execute();
-       }
-       bulk = null;
-     }
-     if (createNewBulk) {
-       // Prepare a new bulk request
-       bulk = client.prepareBulk();
-       bulkDocs = 0;
-       bulkLength = 0;
-     }
++    bulkProcessor.flush();
 +  }
 +
 +  @Override
 +  public void close() throws IOException {
-     // Flush pending requests
-     LOG.info("Processing remaining requests [docs = " + bulkDocs
-         + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]");
-     createNewBulk = false;
-     commit();
-     // flush one more time to finalize the last bulk
-     LOG.info("Processing to finalize last execute");
-     createNewBulk = false;
-     commit();
- 
-     // Close
++    // Close BulkProcessor (automatically flushes)
++    try {
++      bulkProcessor.awaitClose(bulkCloseTimeout, TimeUnit.SECONDS);
++    } catch (InterruptedException e) {
++      LOG.warn("interrupted while waiting for BulkProcessor to complete ({})", e.getMessage());
++    }
++
 +    client.close();
 +    if (node != null) {
 +      node.close();
 +    }
 +  }
 +
 +  @Override
 +  public String describe() {
 +    StringBuffer sb = new StringBuffer("ElasticIndexWriter\n");
 +    sb.append("\t").append(ElasticConstants.CLUSTER)
 +        .append(" : elastic prefix cluster\n");
-     sb.append("\t").append(ElasticConstants.HOST).append(" : hostname\n");
++    sb.append("\t").append(ElasticConstants.HOSTS).append(" : hostname\n");
 +    sb.append("\t").append(ElasticConstants.PORT).append(" : port\n");
 +    sb.append("\t").append(ElasticConstants.INDEX)
 +        .append(" : elastic index command \n");
 +    sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS)
-         .append(" : elastic bulk index doc counts. (default 250) \n");
++        .append(" : elastic bulk index doc counts. (default ")
++        .append(DEFAULT_MAX_BULK_DOCS).append(")\n");
 +    sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH)
-         .append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n");
++        .append(" : elastic bulk index length in bytes. (default ")
++        .append(DEFAULT_MAX_BULK_LENGTH).append(")\n");
++    sb.append("\t").append(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS)
++        .append(" : elastic bulk exponential backoff initial delay in milliseconds. (default ")
++        .append(DEFAULT_EXP_BACKOFF_MILLIS).append(")\n");
++    sb.append("\t").append(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES)
++        .append(" : elastic bulk exponential backoff max retries. (default ")
++        .append(DEFAULT_EXP_BACKOFF_RETRIES).append(")\n");
++    sb.append("\t").append(ElasticConstants.BULK_CLOSE_TIMEOUT)
++        .append(" : elastic timeout for the last bulk in seconds. (default ")
++        .append(DEFAULT_BULK_CLOSE_TIMEOUT).append(")\n");
 +    return sb.toString();
 +  }
 +
 +  @Override
 +  public void setConf(Configuration conf) {
 +    config = conf;
 +    String cluster = conf.get(ElasticConstants.CLUSTER);
-     String host = conf.get(ElasticConstants.HOST);
++    String hosts = conf.get(ElasticConstants.HOSTS);
 +
-     if (StringUtils.isBlank(cluster) && StringUtils.isBlank(host)) {
++    if (StringUtils.isBlank(cluster) && StringUtils.isBlank(hosts)) {
 +      String message = "Missing elastic.cluster and elastic.host. At least one of them should be set in nutch-site.xml ";
 +      message += "\n" + describe();
 +      LOG.error(message);
 +      throw new RuntimeException(message);
 +    }
 +  }
 +
 +  @Override
 +  public Configuration getConf() {
 +    return config;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-solr/src/main/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
----------------------------------------------------------------------
diff --cc nutch-plugins/indexer-solr/src/main/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
index eec0080,0000000..d70bc62
mode 100644,000000..100644
--- a/nutch-plugins/indexer-solr/src/main/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
+++ b/nutch-plugins/indexer-solr/src/main/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
@@@ -1,97 -1,0 +1,99 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.nutch.indexwriter.solr;
 +
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.apache.hadoop.mapred.JobConf;
++import org.apache.http.impl.client.SystemDefaultHttpClient;
 +import org.apache.solr.client.solrj.SolrClient;
 +import org.apache.solr.client.solrj.impl.HttpSolrClient;
 +import org.apache.solr.client.solrj.impl.CloudSolrClient;
 +
 +import java.net.MalformedURLException;
 +
 +public class SolrUtils {
 +
 +  public static Logger LOG = LoggerFactory.getLogger(SolrUtils.class);
++  private static HttpClient HTTP_CLIENT = new SystemDefaultHttpClient();
 +
 +  /**
 +   *
 +   *
 +   * @param JobConf
 +   * @return SolrClient
 +   */
 +  public static ArrayList<SolrClient> getSolrClients(JobConf job) throws MalformedURLException {
 +    String[] urls = job.getStrings(SolrConstants.SERVER_URL);
 +    String[] zkHostString = job.getStrings(SolrConstants.ZOOKEEPER_HOSTS);
 +    ArrayList<SolrClient> solrClients = new ArrayList<SolrClient>();
 +    
 +    if (zkHostString != null && zkHostString.length > 0) {
 +      for (int i = 0; i < zkHostString.length; i++) {
 +        CloudSolrClient sc = getCloudSolrClient(zkHostString[i]);
 +        sc.setDefaultCollection(job.get(SolrConstants.COLLECTION));
 +        solrClients.add(sc);
 +      }
 +    } else {
 +      for (int i = 0; i < urls.length; i++) {
-         SolrClient sc = new HttpSolrClient(urls[i]);
++        SolrClient sc = new HttpSolrClient(urls[i], HTTP_CLIENT);
 +        solrClients.add(sc);
 +      }
 +    }
 +
 +    return solrClients;
 +  }
 +
 +  public static CloudSolrClient getCloudSolrClient(String url) throws MalformedURLException {
-     CloudSolrClient sc = new CloudSolrClient(url.replace('|', ','));
++    CloudSolrClient sc = new CloudSolrClient(url.replace('|', ','), HTTP_CLIENT);
 +    sc.setParallelUpdates(true);
 +    sc.connect();
 +    return sc;
 +  }
 +
 +  public static SolrClient getHttpSolrClient(String url) throws MalformedURLException {
-     SolrClient sc =new HttpSolrClient(url);
++    SolrClient sc =new HttpSolrClient(url, HTTP_CLIENT);
 +    return sc;
 +  }
 +  
 +  public static String stripNonCharCodepoints(String input) {
 +    StringBuilder retval = new StringBuilder();
 +    char ch;
 +
 +    for (int i = 0; i < input.length(); i++) {
 +      ch = input.charAt(i);
 +
 +      // Strip all non-characters
 +      // http://unicode.org/cldr/utility/list-unicodeset.jsp?a=[:Noncharacter_Code_Point=True:]
 +      // and non-printable control characters except tabulator, new line and
 +      // carriage return
 +      if (ch % 0x10000 != 0xffff && // 0xffff - 0x10ffff range step 0x10000
 +          ch % 0x10000 != 0xfffe && // 0xfffe - 0x10fffe range
 +          (ch <= 0xfdd0 || ch >= 0xfdef) && // 0xfdd0 - 0xfdef
 +          (ch > 0x1F || ch == 0x9 || ch == 0xa || ch == 0xd)) {
 +
 +        retval.append(ch);
 +      }
 +    }
 +
 +    return retval.toString();
 +  }
 +
 +}


[3/5] nutch git commit: Merge branch 'master' of https://github.com/sjwoodard/nutch this closes #129

Posted by th...@apache.org.
Merge branch 'master' of https://github.com/sjwoodard/nutch this closes #129


Project: http://git-wip-us.apache.org/repos/asf/nutch/repo
Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/9dd251d0
Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/9dd251d0
Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/9dd251d0

Branch: refs/heads/NUTCH-2292
Commit: 9dd251d030c5a336901906bfb1fa26830c49b6fe
Parents: 5943d11 6c1537b
Author: Lewis John McGibbney <le...@gmail.com>
Authored: Sat Jul 16 14:23:42 2016 -0700
Committer: Lewis John McGibbney <le...@gmail.com>
Committed: Sat Jul 16 14:23:42 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/nutch/indexwriter/solr/SolrUtils.java    | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[2/5] nutch git commit: Use static HttpClient for all SOLR connections

Posted by th...@apache.org.
Use static HttpClient for all SOLR connections

Changed HttpClient to static based on http://hc.apache.org/httpclient-3.x/performance.html and added connection all SolrJ connections.

Project: http://git-wip-us.apache.org/repos/asf/nutch/repo
Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/6c1537b1
Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/6c1537b1
Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/6c1537b1

Branch: refs/heads/NUTCH-2292
Commit: 6c1537b16ba5f3e0fc28fff5a60595e01437900d
Parents: f64686b
Author: Steven <sj...@gmail.com>
Authored: Thu Jun 30 07:48:31 2016 -0400
Committer: GitHub <no...@github.com>
Committed: Thu Jun 30 07:48:31 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/nutch/indexwriter/solr/SolrUtils.java    | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nutch/blob/6c1537b1/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java b/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
index 85a9c4c..d70bc62 100644
--- a/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
+++ b/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
@@ -32,6 +32,7 @@ import java.net.MalformedURLException;
 public class SolrUtils {
 
   public static Logger LOG = LoggerFactory.getLogger(SolrUtils.class);
+  private static HttpClient HTTP_CLIENT = new SystemDefaultHttpClient();
 
   /**
    *
@@ -52,7 +53,7 @@ public class SolrUtils {
       }
     } else {
       for (int i = 0; i < urls.length; i++) {
-        SolrClient sc = new HttpSolrClient(urls[i]);
+        SolrClient sc = new HttpSolrClient(urls[i], HTTP_CLIENT);
         solrClients.add(sc);
       }
     }
@@ -61,15 +62,14 @@ public class SolrUtils {
   }
 
   public static CloudSolrClient getCloudSolrClient(String url) throws MalformedURLException {
-    SystemDefaultHttpClient httpClient = new SystemDefaultHttpClient();
-    CloudSolrClient sc = new CloudSolrClient(url.replace('|', ','), httpClient);
+    CloudSolrClient sc = new CloudSolrClient(url.replace('|', ','), HTTP_CLIENT);
     sc.setParallelUpdates(true);
     sc.connect();
     return sc;
   }
 
   public static SolrClient getHttpSolrClient(String url) throws MalformedURLException {
-    SolrClient sc =new HttpSolrClient(url);
+    SolrClient sc =new HttpSolrClient(url, HTTP_CLIENT);
     return sc;
   }
   


[5/5] nutch git commit: NUTCH-2287 Indexer-elastic plugin should use Elasticsearch BulkProcessor and BackoffPolicy

Posted by th...@apache.org.
NUTCH-2287 Indexer-elastic plugin should use Elasticsearch BulkProcessor and BackoffPolicy


Project: http://git-wip-us.apache.org/repos/asf/nutch/repo
Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/80afa313
Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/80afa313
Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/80afa313

Branch: refs/heads/NUTCH-2292
Commit: 80afa3134d7e8de07fcfbb03dba1c51fb7c7dce2
Parents: 9dd251d
Author: Joseph Naegele <jn...@grierforensics.com>
Authored: Thu Jun 30 13:42:34 2016 -0400
Committer: Lewis John McGibbney <le...@gmail.com>
Committed: Sat Jul 16 14:36:35 2016 -0700

----------------------------------------------------------------------
 build.xml                                       |   1 +
 conf/nutch-default.xml                          |  25 +-
 src/plugin/build.xml                            |   1 +
 src/plugin/indexer-elastic/build.xml            |  13 +
 src/plugin/indexer-elastic/plugin.xml           |   5 +-
 .../indexwriter/elastic/ElasticConstants.java   |   5 +-
 .../indexwriter/elastic/ElasticIndexWriter.java | 236 +++++++++----------
 .../src/test/conf/nutch-site-test.xml           |  57 +++++
 .../elastic/TestElasticIndexWriter.java         | 221 +++++++++++++++++
 9 files changed, 431 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index a1c41ed..0ee60a1 100644
--- a/build.xml
+++ b/build.xml
@@ -1032,6 +1032,7 @@
         <source path="${plugins.dir}/indexer-dummy/src/java/" />
         <source path="${plugins.dir}/indexer-solr/src/java/" />
         <source path="${plugins.dir}/indexer-elastic/src/java/" />
+        <source path="${plugins.dir}/indexer-elastic/src/test/" />
         <source path="${plugins.dir}/index-metadata/src/java/" />
         <source path="${plugins.dir}/index-more/src/java/" />
         <source path="${plugins.dir}/index-more/src/test/" />

http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/conf/nutch-default.xml
----------------------------------------------------------------------
diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml
index 324de5c..67326ee 100644
--- a/conf/nutch-default.xml
+++ b/conf/nutch-default.xml
@@ -1812,8 +1812,8 @@ visit https://wiki.apache.org/nutch/SimilarityScoringFilter-->
 <property>
   <name>elastic.host</name>
   <value></value>
-  <description>The hostname to send documents to using TransportClient. Either host
-  and port must be defined or cluster.</description>
+  <description>Comma-separated list of hostnames to send documents to using
+  TransportClient. Either host and port must be defined or cluster.</description>
 </property>
 
 <property> 
@@ -1847,6 +1847,27 @@ visit https://wiki.apache.org/nutch/SimilarityScoringFilter-->
   <description>Maximum size of the bulk in bytes.</description>
 </property>
 
+<property>
+  <name>elastic.exponential.backoff.millis</name>
+  <value>100</value>
+  <description>Initial delay for the BulkProcessor's exponential backoff policy.
+  </description>
+</property>
+
+<property>
+  <name>elastic.exponential.backoff.retries</name>
+  <value>10</value>
+  <description>Number of times the BulkProcessor's exponential backoff policy
+  should retry bulk operations.</description>
+</property>
+
+<property>
+  <name>elastic.bulk.close.timeout</name>
+  <value>600</value>
+  <description>Number of seconds allowed for the BulkProcessor to complete its
+  last operation.</description>
+</property>
+
 <!-- subcollection properties -->
 
 <property>

http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/src/plugin/build.xml
----------------------------------------------------------------------
diff --git a/src/plugin/build.xml b/src/plugin/build.xml
index 75ae2e7..20ef870 100755
--- a/src/plugin/build.xml
+++ b/src/plugin/build.xml
@@ -106,6 +106,7 @@
      <ant dir="index-replace" target="test"/>
      <ant dir="index-links" target="test"/>
      <ant dir="mimetype-filter" target="test"/>
+     <ant dir="indexer-elastic" target="test"/>
      <ant dir="language-identifier" target="test"/>
      <ant dir="lib-http" target="test"/>
      <ant dir="protocol-file" target="test"/>

http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/src/plugin/indexer-elastic/build.xml
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-elastic/build.xml b/src/plugin/indexer-elastic/build.xml
index 38955ff..6955f61 100644
--- a/src/plugin/indexer-elastic/build.xml
+++ b/src/plugin/indexer-elastic/build.xml
@@ -19,4 +19,17 @@
 
   <import file="../build-plugin.xml" />
 
+  <!-- Add compilation dependencies to classpath -->
+  <path id="plugin.deps">
+    <pathelement location="${build.dir}/test/conf"/>
+  </path>
+
+  <!-- Deploy Unit test dependencies -->
+  <target name="deps-test">
+    <copy toDir="${build.test}">
+      <fileset dir="${src.test}" excludes="**/*.java"/>
+    </copy>
+  </target>
+
+
 </project>

http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/src/plugin/indexer-elastic/plugin.xml
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-elastic/plugin.xml b/src/plugin/indexer-elastic/plugin.xml
index d99a665..401e342 100644
--- a/src/plugin/indexer-elastic/plugin.xml
+++ b/src/plugin/indexer-elastic/plugin.xml
@@ -6,9 +6,9 @@
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at
-  
+
   http://www.apache.org/licenses/LICENSE-2.0
-  
+
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -28,7 +28,6 @@
     <library name="guava-18.0.jar"/>
     <library name="HdrHistogram-2.1.6.jar"/>
     <library name="hppc-0.7.1.jar"/>
-    <library name="indexer-elastic.jar"/>
     <library name="jackson-core-2.6.6.jar"/>
     <library name="jackson-dataformat-cbor-2.6.6.jar"/>
     <library name="jackson-dataformat-smile-2.6.6.jar"/>

http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
index b0e70c8..29f36c7 100644
--- a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
+++ b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
@@ -19,10 +19,13 @@ package org.apache.nutch.indexwriter.elastic;
 public interface ElasticConstants {
   public static final String ELASTIC_PREFIX = "elastic.";
 
-  public static final String HOST = ELASTIC_PREFIX + "host";
+  public static final String HOSTS = ELASTIC_PREFIX + "host";
   public static final String PORT = ELASTIC_PREFIX + "port";
   public static final String CLUSTER = ELASTIC_PREFIX + "cluster";
   public static final String INDEX = ELASTIC_PREFIX + "index";
   public static final String MAX_BULK_DOCS = ELASTIC_PREFIX + "max.bulk.docs";
   public static final String MAX_BULK_LENGTH = ELASTIC_PREFIX + "max.bulk.size";
+  public static final String EXPONENTIAL_BACKOFF_MILLIS = ELASTIC_PREFIX + "exponential.backoff.millis";
+  public static final String EXPONENTIAL_BACKOFF_RETRIES = ELASTIC_PREFIX + "exponential.backoff.retries";
+  public static final String BULK_CLOSE_TIMEOUT = ELASTIC_PREFIX + "bulk.close.timeout";
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
index 9367e41..00b96f1 100644
--- a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
+++ b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
@@ -24,68 +24,92 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.nutch.indexer.IndexWriter;
 import org.apache.nutch.indexer.NutchDocument;
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.action.ListenableActionFuture;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequestBuilder;
-import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.settings.Settings.Builder;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.node.Node;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
+ * Sends NutchDocuments to a configured Elasticsearch index.
  */
 public class ElasticIndexWriter implements IndexWriter {
   public static Logger LOG = LoggerFactory.getLogger(ElasticIndexWriter.class);
 
+  private static final int DEFAULT_PORT = 9300;
   private static final int DEFAULT_MAX_BULK_DOCS = 250;
   private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
+  private static final int DEFAULT_EXP_BACKOFF_MILLIS = 100;
+  private static final int DEFAULT_EXP_BACKOFF_RETRIES = 10;
+  private static final int DEFAULT_BULK_CLOSE_TIMEOUT = 600;
+  private static final String DEFAULT_INDEX = "nutch";
 
+  private String defaultIndex;
   private Client client;
   private Node node;
-  private String defaultIndex;
+  private BulkProcessor bulkProcessor;
 
-  private Configuration config;
+  private long bulkCloseTimeout;
 
-  private BulkRequestBuilder bulk;
-  private ListenableActionFuture<BulkResponse> execute;
-  private int port = -1;
-  private String host = null;
-  private String clusterName = null;
-  private int maxBulkDocs;
-  private int maxBulkLength;
-  private long indexedDocs = 0;
-  private int bulkDocs = 0;
-  private int bulkLength = 0;
-  private boolean createNewBulk = false;
+  private Configuration config;
 
   @Override
   public void open(JobConf job, String name) throws IOException {
-    clusterName = job.get(ElasticConstants.CLUSTER);
+    bulkCloseTimeout = job.getLong(ElasticConstants.BULK_CLOSE_TIMEOUT,
+        DEFAULT_BULK_CLOSE_TIMEOUT);
+    defaultIndex = job.get(ElasticConstants.INDEX, DEFAULT_INDEX);
+
+    int maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS,
+        DEFAULT_MAX_BULK_DOCS);
+    int maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH,
+        DEFAULT_MAX_BULK_LENGTH);
+    int expBackoffMillis = job.getInt(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS,
+        DEFAULT_EXP_BACKOFF_MILLIS);
+    int expBackoffRetries = job.getInt(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES,
+        DEFAULT_EXP_BACKOFF_RETRIES);
+
+    client = makeClient(job);
+
+    LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}", maxBulkDocs, maxBulkLength);
+    bulkProcessor = BulkProcessor.builder(client, bulkProcessorListener())
+      .setBulkActions(maxBulkDocs)
+      .setBulkSize(new ByteSizeValue(maxBulkLength, ByteSizeUnit.BYTES))
+      .setConcurrentRequests(1)
+      .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
+          TimeValue.timeValueMillis(expBackoffMillis), expBackoffRetries))
+      .build();
+  }
 
-    host = job.get(ElasticConstants.HOST);
-    port = job.getInt(ElasticConstants.PORT, 9300);
+  /** Generates a TransportClient or NodeClient */
+  protected Client makeClient(Configuration conf) throws IOException {
+    String clusterName = conf.get(ElasticConstants.CLUSTER);
+    String[] hosts = conf.getStrings(ElasticConstants.HOSTS);
+    int port = conf.getInt(ElasticConstants.PORT, DEFAULT_PORT);
 
-    Builder settingsBuilder = Settings.builder();
+    Settings.Builder settingsBuilder = Settings.settingsBuilder();
 
     BufferedReader reader = new BufferedReader(
-        job.getConfResourceAsReader("elasticsearch.conf"));
+        conf.getConfResourceAsReader("elasticsearch.conf"));
     String line;
     String parts[];
-
     while ((line = reader.readLine()) != null) {
       if (StringUtils.isNotBlank(line) && !line.startsWith("#")) {
         line.trim();
@@ -97,29 +121,46 @@ public class ElasticIndexWriter implements IndexWriter {
       }
     }
 
+    // Set the cluster name and build the settings
     if (StringUtils.isNotBlank(clusterName))
       settingsBuilder.put("cluster.name", clusterName);
 
-    // Set the cluster name and build the settings
     Settings settings = settingsBuilder.build();
 
+    Client client = null;
+
     // Prefer TransportClient
-    if (host != null && port > 1) {
-      TransportClient transportClient = TransportClient.builder()
-          .settings(settings).build()
-          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
+    if (hosts != null && port > 1) {
+      TransportClient transportClient = TransportClient.builder().settings(settings).build();
+      for (String host: hosts)
+        transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
       client = transportClient;
     } else if (clusterName != null) {
       node = nodeBuilder().settings(settings).client(true).node();
       client = node.client();
     }
 
-    bulk = client.prepareBulk();
-    defaultIndex = job.get(ElasticConstants.INDEX, "nutch");
-    maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS,
-        DEFAULT_MAX_BULK_DOCS);
-    maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH,
-        DEFAULT_MAX_BULK_LENGTH);
+    return client;
+  }
+
+  /** Generates a default BulkProcessor.Listener */
+  protected BulkProcessor.Listener bulkProcessorListener() {
+    return new BulkProcessor.Listener() {
+      @Override
+      public void beforeBulk(long executionId, BulkRequest request) { }
+
+      @Override
+      public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+        throw new RuntimeException(failure);
+      }
+
+      @Override
+      public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+        if (response.hasFailures()) {
+          LOG.warn("Failures occurred during bulk request");
+        }
+      }
+    };
   }
 
   @Override
@@ -128,60 +169,23 @@ public class ElasticIndexWriter implements IndexWriter {
     String type = doc.getDocumentMeta().get("type");
     if (type == null)
       type = "doc";
-    IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id);
 
+    // Add each field of this doc to the index source
     Map<String, Object> source = new HashMap<String, Object>();
-
-    // Loop through all fields of this doc
     for (String fieldName : doc.getFieldNames()) {
-      if (doc.getField(fieldName).getValues().size() > 1) {
+      if (doc.getFieldValue(fieldName) != null) {
         source.put(fieldName, doc.getFieldValue(fieldName));
-        // Loop through the values to keep track of the size of this
-        // document
-        for (Object value : doc.getField(fieldName).getValues()) {
-          bulkLength += value.toString().length();
-        }
-      } else {
-        if (doc.getFieldValue(fieldName) != null) {
-          source.put(fieldName, doc.getFieldValue(fieldName));
-          bulkLength += doc.getFieldValue(fieldName).toString().length();
-        }
       }
     }
-    request.setSource(source);
-
-    // Add this indexing request to a bulk request
-    bulk.add(request);
-    indexedDocs++;
-    bulkDocs++;
-
-    if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
-      LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = "
-          + bulkLength + ", total docs = " + indexedDocs
-          + ", last doc in bulk = '" + id + "']");
-      // Flush the bulk of indexing requests
-      createNewBulk = true;
-      commit();
-    }
+
+    IndexRequest request = new IndexRequest(defaultIndex, type, id).source(source);
+    bulkProcessor.add(request);
   }
 
   @Override
   public void delete(String key) throws IOException {
-    try {
-      DeleteRequestBuilder builder = client.prepareDelete();
-      builder.setIndex(defaultIndex);
-      builder.setType("doc");
-      builder.setId(key);
-      builder.execute().actionGet();
-    } catch (ElasticsearchException e) {
-      throw makeIOException(e);
-    }
-  }
-
-  public static IOException makeIOException(ElasticsearchException e) {
-    final IOException ioe = new IOException();
-    ioe.initCause(e);
-    return ioe;
+    DeleteRequest request = new DeleteRequest(defaultIndex, "doc", key);
+    bulkProcessor.add(request);
   }
 
   @Override
@@ -191,51 +195,18 @@ public class ElasticIndexWriter implements IndexWriter {
 
   @Override
   public void commit() throws IOException {
-    if (execute != null) {
-      // wait for previous to finish
-      long beforeWait = System.currentTimeMillis();
-      BulkResponse actionGet = execute.actionGet();
-      if (actionGet.hasFailures()) {
-        for (BulkItemResponse item : actionGet) {
-          if (item.isFailed()) {
-            throw new RuntimeException("First failure in bulk: "
-                + item.getFailureMessage());
-          }
-        }
-      }
-      long msWaited = System.currentTimeMillis() - beforeWait;
-      LOG.info("Previous took in ms " + actionGet.getTookInMillis()
-          + ", including wait " + msWaited);
-      execute = null;
-    }
-    if (bulk != null) {
-      if (bulkDocs > 0) {
-        // start a flush, note that this is an asynchronous call
-        execute = bulk.execute();
-      }
-      bulk = null;
-    }
-    if (createNewBulk) {
-      // Prepare a new bulk request
-      bulk = client.prepareBulk();
-      bulkDocs = 0;
-      bulkLength = 0;
-    }
+    bulkProcessor.flush();
   }
 
   @Override
   public void close() throws IOException {
-    // Flush pending requests
-    LOG.info("Processing remaining requests [docs = " + bulkDocs
-        + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]");
-    createNewBulk = false;
-    commit();
-    // flush one more time to finalize the last bulk
-    LOG.info("Processing to finalize last execute");
-    createNewBulk = false;
-    commit();
-
-    // Close
+    // Close BulkProcessor (automatically flushes)
+    try {
+      bulkProcessor.awaitClose(bulkCloseTimeout, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOG.warn("interrupted while waiting for BulkProcessor to complete ({})", e.getMessage());
+    }
+
     client.close();
     if (node != null) {
       node.close();
@@ -247,14 +218,25 @@ public class ElasticIndexWriter implements IndexWriter {
     StringBuffer sb = new StringBuffer("ElasticIndexWriter\n");
     sb.append("\t").append(ElasticConstants.CLUSTER)
         .append(" : elastic prefix cluster\n");
-    sb.append("\t").append(ElasticConstants.HOST).append(" : hostname\n");
+    sb.append("\t").append(ElasticConstants.HOSTS).append(" : hostname\n");
     sb.append("\t").append(ElasticConstants.PORT).append(" : port\n");
     sb.append("\t").append(ElasticConstants.INDEX)
         .append(" : elastic index command \n");
     sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS)
-        .append(" : elastic bulk index doc counts. (default 250) \n");
+        .append(" : elastic bulk index doc counts. (default ")
+        .append(DEFAULT_MAX_BULK_DOCS).append(")\n");
     sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH)
-        .append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n");
+        .append(" : elastic bulk index length in bytes. (default ")
+        .append(DEFAULT_MAX_BULK_LENGTH).append(")\n");
+    sb.append("\t").append(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS)
+        .append(" : elastic bulk exponential backoff initial delay in milliseconds. (default ")
+        .append(DEFAULT_EXP_BACKOFF_MILLIS).append(")\n");
+    sb.append("\t").append(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES)
+        .append(" : elastic bulk exponential backoff max retries. (default ")
+        .append(DEFAULT_EXP_BACKOFF_RETRIES).append(")\n");
+    sb.append("\t").append(ElasticConstants.BULK_CLOSE_TIMEOUT)
+        .append(" : elastic timeout for the last bulk in seconds. (default ")
+        .append(DEFAULT_BULK_CLOSE_TIMEOUT).append(")\n");
     return sb.toString();
   }
 
@@ -262,9 +244,9 @@ public class ElasticIndexWriter implements IndexWriter {
   public void setConf(Configuration conf) {
     config = conf;
     String cluster = conf.get(ElasticConstants.CLUSTER);
-    String host = conf.get(ElasticConstants.HOST);
+    String hosts = conf.get(ElasticConstants.HOSTS);
 
-    if (StringUtils.isBlank(cluster) && StringUtils.isBlank(host)) {
+    if (StringUtils.isBlank(cluster) && StringUtils.isBlank(hosts)) {
       String message = "Missing elastic.cluster and elastic.host. At least one of them should be set in nutch-site.xml ";
       message += "\n" + describe();
       LOG.error(message);

http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/src/plugin/indexer-elastic/src/test/conf/nutch-site-test.xml
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-elastic/src/test/conf/nutch-site-test.xml b/src/plugin/indexer-elastic/src/test/conf/nutch-site-test.xml
new file mode 100644
index 0000000..0a37225
--- /dev/null
+++ b/src/plugin/indexer-elastic/src/test/conf/nutch-site-test.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+
+<property>
+  <name>http.agent.name</name>
+  <value>Nutch-Test</value>
+  <description></description>
+</property>
+
+
+<!-- Elasticsearch properties -->
+
+<property>
+  <name>elastic.host</name>
+  <value>localhost</value>
+  <description>The hostname to send documents to using TransportClient. Either host
+  and port must be defined or cluster.</description>
+</property>
+
+<property>
+  <name>elastic.port</name>
+  <value>9300</value>
+  <description>The port to connect to using TransportClient.</description>
+</property>
+
+<property>
+  <name>elastic.cluster</name>
+  <value>nutch</value>
+  <description>The cluster name to discover. Either host and port must be defined
+  or cluster.</description>
+</property>
+
+<property>
+  <name>elastic.index</name>
+  <value>nutch</value>
+  <description>Default index to send documents to.</description>
+</property>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/nutch/blob/80afa313/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/TestElasticIndexWriter.java
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/TestElasticIndexWriter.java b/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/TestElasticIndexWriter.java
new file mode 100644
index 0000000..bae9737
--- /dev/null
+++ b/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/TestElasticIndexWriter.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexwriter.elastic;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.util.NutchConfiguration;
+import org.elasticsearch.action.Action;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionRequestBuilder;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.support.AbstractClient;
+import org.elasticsearch.client.support.Headers;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestElasticIndexWriter {
+
+  boolean bulkRequestSuccessful, clusterSaturated;
+  int curNumFailures, maxNumFailures;
+  Configuration conf;
+  Client client;
+  ElasticIndexWriter testIndexWriter;
+
+  @Before
+  public void setup() {
+    conf = NutchConfiguration.create();
+    conf.addResource("nutch-site-test.xml");
+
+    bulkRequestSuccessful = false;
+    clusterSaturated = false;
+    curNumFailures = 0;
+    maxNumFailures = 0;
+
+    Settings settings = Settings.builder().build();
+    ThreadPool threadPool = new ThreadPool(settings);
+    Headers headers = new Headers(settings);
+
+    // customize the ES client to simulate responses from an ES cluster
+    client = new AbstractClient(settings, threadPool, headers) {
+      @Override
+      public void close() { }
+
+      @Override
+      protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
+          Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
+
+        BulkResponse response = null;
+        if (clusterSaturated) {
+          // pretend the cluster is saturated
+          curNumFailures++;
+          if (curNumFailures >= maxNumFailures) {
+            // pretend the cluster is suddenly no longer saturated
+            clusterSaturated = false;
+          }
+
+          // respond with a failure
+          BulkItemResponse failed = new BulkItemResponse(0, "index",
+              new BulkItemResponse.Failure("nutch", "index", "failure0",
+                  new EsRejectedExecutionException("saturated")));
+          response = new BulkResponse(new BulkItemResponse[]{failed}, 0);
+        } else {
+          // respond successfully
+          BulkItemResponse success = new BulkItemResponse(0, "index",
+              new IndexResponse("nutch", "index", "index0", 0, true));
+          response = new BulkResponse(new BulkItemResponse[]{success}, 0);
+        }
+
+        listener.onResponse((Response)response);
+      }
+    };
+
+    // customize the plugin to signal successful bulk operations
+    testIndexWriter = new ElasticIndexWriter() {
+      @Override
+      protected Client makeClient(Configuration conf) {
+        return client;
+      }
+
+      @Override
+      protected BulkProcessor.Listener bulkProcessorListener() {
+        return new BulkProcessor.Listener() {
+
+          @Override
+          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+            if (!response.hasFailures()) {
+              bulkRequestSuccessful = true;
+            }
+          }
+
+          @Override
+          public void afterBulk(long executionId, BulkRequest request, Throwable failure) { }
+
+          @Override
+          public void beforeBulk(long executionId, BulkRequest request) { }
+        };
+      }
+    };
+  }
+
+  @Test
+  public void testBulkMaxDocs() throws IOException {
+    int numDocs = 10;
+    conf.setInt(ElasticConstants.MAX_BULK_DOCS, numDocs);
+    JobConf job = new JobConf(conf);
+
+    testIndexWriter.setConf(conf);
+    testIndexWriter.open(job, "name");
+
+    NutchDocument doc = new NutchDocument();
+    doc.add("id", "http://www.example.com");
+
+    Assert.assertFalse(bulkRequestSuccessful);
+
+    for (int i = 0; i < numDocs; i++) {
+      testIndexWriter.write(doc);
+    }
+
+    testIndexWriter.close();
+
+    Assert.assertTrue(bulkRequestSuccessful);
+  }
+
+  @Test
+  public void testBulkMaxLength() throws IOException {
+    String key = "id";
+    String value = "http://www.example.com";
+
+    int defaultMaxBulkLength = conf.getInt(ElasticConstants.MAX_BULK_LENGTH, 2500500);
+
+    // Test that MAX_BULK_LENGTH is respected by lowering it 10x
+    int testMaxBulkLength = defaultMaxBulkLength / 10;
+
+    // This number is somewhat arbitrary, but must be a function of:
+    // - testMaxBulkLength
+    // - approximate size of each doc
+    int numDocs = testMaxBulkLength / (key.length() + value.length());
+
+    conf.setInt(ElasticConstants.MAX_BULK_LENGTH, testMaxBulkLength);
+    JobConf job = new JobConf(conf);
+
+    testIndexWriter.setConf(conf);
+    testIndexWriter.open(job, "name");
+
+    NutchDocument doc = new NutchDocument();
+    doc.add(key, value);
+
+    Assert.assertFalse(bulkRequestSuccessful);
+
+    for (int i = 0; i < numDocs; i++) {
+      testIndexWriter.write(doc);
+    }
+
+    testIndexWriter.close();
+
+    Assert.assertTrue(bulkRequestSuccessful);
+  }
+
+  @Test
+  public void testBackoffPolicy() throws IOException {
+    // set a non-zero "max-retry" value, **implying the cluster is saturated**
+    maxNumFailures = 5;
+    conf.setInt(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES, maxNumFailures);
+
+    int numDocs = 10;
+    conf.setInt(ElasticConstants.MAX_BULK_DOCS, numDocs);
+
+    JobConf job = new JobConf(conf);
+
+    testIndexWriter.setConf(conf);
+    testIndexWriter.open(job, "name");
+
+    NutchDocument doc = new NutchDocument();
+    doc.add("id", "http://www.example.com");
+
+    // pretend the remote cluster is "saturated"
+    clusterSaturated = true;
+
+    Assert.assertFalse(bulkRequestSuccessful);
+
+    // write enough docs to initiate one bulk request
+    for (int i = 0; i < numDocs; i++) {
+      testIndexWriter.write(doc);
+    }
+
+    testIndexWriter.close();
+
+    // the BulkProcessor should have retried `maxNumFailures + 1` times, then succeeded
+    Assert.assertTrue(bulkRequestSuccessful);
+  }
+
+}