You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 21:32:59 UTC

[4/5] nutch git commit: Merge branch 'master' into NUTCH-2293

Merge branch 'master' into NUTCH-2293


Project: http://git-wip-us.apache.org/repos/asf/nutch/repo
Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/2175c767
Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/2175c767
Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/2175c767

Branch: refs/heads/NUTCH-2292
Commit: 2175c767c64f92b77ebe497750241e4740acea9b
Parents: 0bf453e 80afa31
Author: Thamme Gowda <th...@apache.org>
Authored: Sat Jul 16 14:32:19 2016 -0700
Committer: Thamme Gowda <th...@apache.org>
Committed: Sat Jul 16 14:32:19 2016 -0700

----------------------------------------------------------------------
 build.xml                                       |   1 +
 conf/nutch-default.xml                          |  25 +-
 nutch-plugins/build.xml                         |   1 +
 nutch-plugins/indexer-elastic/build.xml         |  13 +
 nutch-plugins/indexer-elastic/plugin.xml        |   5 +-
 .../indexwriter/elastic/ElasticConstants.java   |   5 +-
 .../indexwriter/elastic/ElasticIndexWriter.java | 236 +++++++++----------
 .../nutch/indexwriter/solr/SolrUtils.java       |   8 +-
 .../src/test/conf/nutch-site-test.xml           |  57 +++++
 .../elastic/TestElasticIndexWriter.java         | 221 +++++++++++++++++
 10 files changed, 436 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/build.xml
----------------------------------------------------------------------
diff --cc nutch-plugins/build.xml
index 75ae2e7,0000000..20ef870
mode 100755,000000..100755
--- a/nutch-plugins/build.xml
+++ b/nutch-plugins/build.xml
@@@ -1,213 -1,0 +1,214 @@@
 +<?xml version="1.0"?>
 +<!--
 + Licensed to the Apache Software Foundation (ASF) under one or more
 + contributor license agreements.  See the NOTICE file distributed with
 + this work for additional information regarding copyright ownership.
 + The ASF licenses this file to You under the Apache License, Version 2.0
 + (the "License"); you may not use this file except in compliance with
 + the License.  You may obtain a copy of the License at
 +
 +     http://www.apache.org/licenses/LICENSE-2.0
 +
 + Unless required by applicable law or agreed to in writing, software
 + distributed under the License is distributed on an "AS IS" BASIS,
 + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + See the License for the specific language governing permissions and
 + limitations under the License.
 +-->
 +<project name="Nutch" default="deploy-core" basedir=".">
 +
 +  <target name="deploy-core">
 +    <ant target="compile-core" inheritall="false" dir="../.."/>
 +    <ant target="deploy"/>
 +  </target>
 +
 +  <!-- ====================================================== -->
 +  <!-- Build & deploy all the plugin jars.                    -->
 +  <!-- ====================================================== -->
 +  <target name="deploy">
 +     <ant dir="creativecommons" target="deploy"/>
 +     <ant dir="feed" target="deploy"/>
 +     <ant dir="headings" target="deploy"/>
 +     <ant dir="index-basic" target="deploy"/>
 +     <ant dir="index-anchor" target="deploy"/>
 +     <ant dir="index-geoip" target="deploy"/>
 +     <ant dir="index-more" target="deploy"/>
 +     <ant dir="index-replace" target="deploy"/>
 +     <ant dir="index-static" target="deploy"/>
 +     <ant dir="index-metadata" target="deploy"/>
 +     <ant dir="index-links" target="deploy"/>
 +     <ant dir="mimetype-filter" target="deploy"/>
 +     <ant dir="indexer-cloudsearch" target="deploy"/>
 +     <ant dir="indexer-dummy" target="deploy"/>
 +     <ant dir="indexer-elastic" target="deploy"/>
 +     <ant dir="indexer-solr" target="deploy"/>
 +     <ant dir="language-identifier" target="deploy"/>
 +     <ant dir="lib-http" target="deploy"/>
 +     <ant dir="lib-nekohtml" target="deploy"/>
 +     <ant dir="lib-regex-filter" target="deploy"/>
 +     <ant dir="lib-xml" target="deploy"/>
 +     <ant dir="microformats-reltag" target="deploy"/>
 +     <ant dir="nutch-extensionpoints" target="deploy"/>
 +     <ant dir="protocol-file" target="deploy"/>
 +     <ant dir="protocol-ftp" target="deploy"/>
 +     <ant dir="protocol-http" target="deploy"/>
 +     <ant dir="protocol-httpclient" target="deploy"/>
 +     <ant dir="lib-htmlunit" target="deploy"/>
 +     <ant dir="protocol-htmlunit" target="deploy" />
 +     <ant dir="lib-selenium" target="deploy"/>
 +     <ant dir="protocol-selenium" target="deploy" />
 +     <ant dir="protocol-interactiveselenium" target="deploy" />
 +     <ant dir="parse-ext" target="deploy"/>
 +     <ant dir="parse-js" target="deploy"/>
 +     <ant dir="parse-html" target="deploy"/>
 +     <ant dir="parse-metatags" target="deploy"/>
 +     <ant dir="parse-swf" target="deploy"/>
 +     <ant dir="parse-tika" target="deploy"/>
 +     <ant dir="parse-zip" target="deploy"/>
 +     <ant dir="scoring-depth" target="deploy"/>
 +     <ant dir="scoring-opic" target="deploy"/>
 +     <ant dir="scoring-link" target="deploy"/>
 +     <ant dir="scoring-similarity" target="deploy"/>
 +     <ant dir="subcollection" target="deploy"/>
 +     <ant dir="tld" target="deploy"/>
 +     <ant dir="urlfilter-automaton" target="deploy"/>
 +     <ant dir="urlfilter-domain" target="deploy" />
 +     <ant dir="urlfilter-domainblacklist" target="deploy" />
 +     <ant dir="urlfilter-prefix" target="deploy"/>
 +     <ant dir="urlfilter-regex" target="deploy"/>
 +     <ant dir="urlfilter-suffix" target="deploy"/>
 +     <ant dir="urlfilter-validator" target="deploy"/>
 +     <ant dir="urlfilter-ignoreexempt" target="deploy"/>
 +     <ant dir="parsefilter-naivebayes" target="deploy"/>
 +     <ant dir="parsefilter-regex" target="deploy"/>
 +     <ant dir="urlmeta" target="deploy"/>
 +     <ant dir="urlnormalizer-ajax" target="deploy"/>
 +     <ant dir="urlnormalizer-basic" target="deploy"/>
 +     <ant dir="urlnormalizer-host" target="deploy"/>
 +     <ant dir="urlnormalizer-pass" target="deploy"/>
 +     <ant dir="urlnormalizer-protocol" target="deploy"/>
 +     <ant dir="urlnormalizer-querystring" target="deploy"/>
 +     <ant dir="urlnormalizer-regex" target="deploy"/>
 +     <ant dir="urlnormalizer-slash" target="deploy"/>
 +  </target>
 +
 +  <!-- ====================================================== -->
 +  <!-- Test all of the plugins.                               -->
 +  <!-- ====================================================== -->
 +  <target name="test">
 +    <parallel threadCount="2">
 +     <ant dir="creativecommons" target="test"/>
 +     <ant dir="index-basic" target="test"/>
 +     <ant dir="index-anchor" target="test"/>
 +     <ant dir="index-geoip" target="test"/>
 +     <ant dir="index-more" target="test"/>
 +     <ant dir="index-static" target="test"/>
 +     <ant dir="index-replace" target="test"/>
 +     <ant dir="index-links" target="test"/>
 +     <ant dir="mimetype-filter" target="test"/>
++     <ant dir="indexer-elastic" target="test"/>
 +     <ant dir="language-identifier" target="test"/>
 +     <ant dir="lib-http" target="test"/>
 +     <ant dir="protocol-file" target="test"/>
 +     <ant dir="protocol-http" target="test"/>
 +     <ant dir="protocol-httpclient" target="test"/>
 +     <!--ant dir="parse-ext" target="test"/-->
 +     <ant dir="feed" target="test"/>
 +     <ant dir="parse-html" target="test"/>
 +     <ant dir="parse-metatags" target="test"/>
 +     <ant dir="parse-swf" target="test"/>
 +     <ant dir="parse-tika" target="test"/>
 +     <ant dir="parse-zip" target="test"/>
 +     <ant dir="parsefilter-regex" target="test"/>
 +     <ant dir="subcollection" target="test"/>
 +     <ant dir="urlfilter-automaton" target="test"/>
 +     <ant dir="urlfilter-domain" target="test"/>
 +     <ant dir="urlfilter-domainblacklist" target="test"/>
 +     <ant dir="urlfilter-prefix" target="test"/>
 +     <ant dir="urlfilter-regex" target="test"/>
 +     <ant dir="urlfilter-suffix" target="test"/>
 +     <ant dir="urlfilter-validator" target="test"/>
 +     <ant dir="urlfilter-ignoreexempt" target="test"/>
 +     <ant dir="urlnormalizer-ajax" target="test"/>
 +     <ant dir="urlnormalizer-basic" target="test"/>
 +     <ant dir="urlnormalizer-host" target="test"/>
 +     <ant dir="urlnormalizer-pass" target="test"/>
 +     <ant dir="urlnormalizer-protocol" target="test"/>
 +     <ant dir="urlnormalizer-querystring" target="test"/>
 +     <ant dir="urlnormalizer-regex" target="test"/>
 +     <ant dir="urlnormalizer-slash" target="test"/>
 +    </parallel>
 +  </target>
 +
 +  <!-- ====================================================== -->
 +  <!-- Clean all of the plugins.                              -->
 +  <!-- ====================================================== -->
 +  <target name="clean">
 +    <ant dir="creativecommons" target="clean"/>
 +    <ant dir="feed" target="clean"/>
 +    <ant dir="headings" target="clean"/>
 +    <ant dir="index-basic" target="clean"/>
 +    <ant dir="index-anchor" target="clean"/>
 +    <ant dir="index-geoip" target="clean"/>
 +    <ant dir="index-more" target="clean"/>
 +    <ant dir="index-static" target="clean"/>
 +    <ant dir="index-replace" target="clean"/>
 +    <ant dir="index-metadata" target="clean"/>
 +    <ant dir="index-links" target="clean"/>
 +    <ant dir="mimetype-filter" target="clean"/>
 +    <ant dir="indexer-cloudsearch" target="clean"/>
 +    <ant dir="indexer-dummy" target="clean"/>
 +    <ant dir="indexer-elastic" target="clean"/>
 +    <ant dir="indexer-solr" target="clean"/>
 +    <ant dir="language-identifier" target="clean"/>
 +    <!-- <ant dir="lib-commons-httpclient" target="clean"/> -->
 +    <ant dir="lib-http" target="clean"/>
 +    <!-- <ant dir="lib-lucene-analyzers" target="clean"/>-->
 +    <ant dir="lib-nekohtml" target="clean"/>
 +    <ant dir="lib-regex-filter" target="clean"/>
 +    <ant dir="lib-xml" target="clean"/>
 +    <ant dir="microformats-reltag" target="clean"/>
 +    <ant dir="nutch-extensionpoints" target="clean"/>
 +    <ant dir="protocol-file" target="clean"/>
 +    <ant dir="protocol-ftp" target="clean"/>
 +    <ant dir="protocol-http" target="clean"/>
 +    <ant dir="protocol-httpclient" target="clean"/>
 +    <ant dir="lib-htmlunit" target="clean"/>
 +    <ant dir="protocol-htmlunit" target="clean" />
 +    <ant dir="lib-selenium" target="clean"/>
 +    <ant dir="protocol-selenium" target="clean" />
 +    <ant dir="protocol-interactiveselenium" target="clean" />
 +    <ant dir="parse-ext" target="clean"/>
 +    <ant dir="parse-js" target="clean"/>
 +    <ant dir="parse-html" target="clean"/>
 +    <ant dir="parse-metatags" target="clean"/>
 +    <ant dir="parse-swf" target="clean"/>
 +    <ant dir="parse-tika" target="clean"/>
 +    <ant dir="parse-zip" target="clean"/>
 +    <ant dir="parsefilter-regex" target="clean"/>
 +    <ant dir="scoring-depth" target="clean"/>
 +    <ant dir="scoring-opic" target="clean"/>
 +    <ant dir="scoring-link" target="clean"/>
 +    <ant dir="scoring-similarity" target="clean"/>
 +    <ant dir="subcollection" target="clean"/>
 +    <ant dir="tld" target="clean"/>
 +    <ant dir="urlfilter-automaton" target="clean"/>
 +    <ant dir="urlfilter-domain" target="clean" />
 +    <ant dir="urlfilter-domainblacklist" target="clean" />
 +    <ant dir="urlfilter-prefix" target="clean"/>
 +    <ant dir="urlfilter-regex" target="clean"/>
 +    <ant dir="urlfilter-suffix" target="clean"/>
 +    <ant dir="urlfilter-validator" target="clean"/>
 +    <ant dir="urlfilter-ignoreexempt" target="clean"/>
 +    <ant dir="parsefilter-naivebayes" target="clean" />
 +    <ant dir="urlmeta" target="clean"/>
 +    <ant dir="urlnormalizer-ajax" target="clean"/>
 +    <ant dir="urlnormalizer-basic" target="clean"/>
 +    <ant dir="urlnormalizer-host" target="clean"/>
 +    <ant dir="urlnormalizer-pass" target="clean"/>
 +    <ant dir="urlnormalizer-protocol" target="clean"/>
 +    <ant dir="urlnormalizer-querystring" target="clean"/>
 +    <ant dir="urlnormalizer-regex" target="clean"/>
 +    <ant dir="urlnormalizer-slash" target="clean"/>
 +  </target>
 +</project>

http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-elastic/build.xml
----------------------------------------------------------------------
diff --cc nutch-plugins/indexer-elastic/build.xml
index 38955ff,0000000..6955f61
mode 100644,000000..100644
--- a/nutch-plugins/indexer-elastic/build.xml
+++ b/nutch-plugins/indexer-elastic/build.xml
@@@ -1,22 -1,0 +1,35 @@@
 +<?xml version="1.0"?>
 +<!--
 + Licensed to the Apache Software Foundation (ASF) under one or more
 + contributor license agreements.  See the NOTICE file distributed with
 + this work for additional information regarding copyright ownership.
 + The ASF licenses this file to You under the Apache License, Version 2.0
 + (the "License"); you may not use this file except in compliance with
 + the License.  You may obtain a copy of the License at
 +
 +     http://www.apache.org/licenses/LICENSE-2.0
 +
 + Unless required by applicable law or agreed to in writing, software
 + distributed under the License is distributed on an "AS IS" BASIS,
 + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + See the License for the specific language governing permissions and
 + limitations under the License.
 +-->
 +<project name="indexer-elastic" default="jar-core">
 +
 +  <import file="../build-plugin.xml" />
 +
++  <!-- Add compilation dependencies to classpath -->
++  <path id="plugin.deps">
++    <pathelement location="${build.dir}/test/conf"/>
++  </path>
++
++  <!-- Deploy Unit test dependencies -->
++  <target name="deps-test">
++    <copy toDir="${build.test}">
++      <fileset dir="${src.test}" excludes="**/*.java"/>
++    </copy>
++  </target>
++
++
 +</project>

http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-elastic/plugin.xml
----------------------------------------------------------------------
diff --cc nutch-plugins/indexer-elastic/plugin.xml
index d99a665,0000000..401e342
mode 100644,000000..100644
--- a/nutch-plugins/indexer-elastic/plugin.xml
+++ b/nutch-plugins/indexer-elastic/plugin.xml
@@@ -1,71 -1,0 +1,70 @@@
 +<?xml version="1.0" encoding="UTF-8"?>
 +<!--
 +  Licensed to the Apache Software Foundation (ASF) under one or more
 +  contributor license agreements.  See the NOTICE file distributed with
 +  this work for additional information regarding copyright ownership.
 +  The ASF licenses this file to You under the Apache License, Version 2.0
 +  (the "License"); you may not use this file except in compliance with
 +  the License.  You may obtain a copy of the License at
-   
++
 +  http://www.apache.org/licenses/LICENSE-2.0
-   
++
 +  Unless required by applicable law or agreed to in writing, software
 +  distributed under the License is distributed on an "AS IS" BASIS,
 +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +  See the License for the specific language governing permissions and
 +  limitations under the License.
 +-->
 +<plugin id="indexer-elastic" name="ElasticIndexWriter" version="1.0.0"
 +  provider-name="nutch.apache.org">
 +
 +  <runtime>
 +    <library name="indexer-elastic.jar">
 +      <export name="*" />
 +    </library>
 +    <library name="elasticsearch-2.3.3.jar"/>
 +    <library name="commons-cli-1.3.1.jar"/>
 +    <library name="compress-lzf-1.0.2.jar"/>
 +    <library name="guava-18.0.jar"/>
 +    <library name="HdrHistogram-2.1.6.jar"/>
 +    <library name="hppc-0.7.1.jar"/>
-     <library name="indexer-elastic.jar"/>
 +    <library name="jackson-core-2.6.6.jar"/>
 +    <library name="jackson-dataformat-cbor-2.6.6.jar"/>
 +    <library name="jackson-dataformat-smile-2.6.6.jar"/>
 +    <library name="jackson-dataformat-yaml-2.6.6.jar"/>
 +    <library name="joda-convert-1.2.jar"/>
 +    <library name="joda-time-2.8.2.jar"/>
 +    <library name="jsr166e-1.1.0.jar"/>
 +    <library name="lucene-analyzers-common-5.5.0.jar"/>
 +    <library name="lucene-backward-codecs-5.5.0.jar"/>
 +    <library name="lucene-core-5.5.0.jar"/>
 +    <library name="lucene-grouping-5.5.0.jar"/>
 +    <library name="lucene-highlighter-5.5.0.jar"/>
 +    <library name="lucene-join-5.5.0.jar"/>
 +    <library name="lucene-memory-5.5.0.jar"/>
 +    <library name="lucene-misc-5.5.0.jar"/>
 +    <library name="lucene-queries-5.5.0.jar"/>
 +    <library name="lucene-queryparser-5.5.0.jar"/>
 +    <library name="lucene-sandbox-5.5.0.jar"/>
 +    <library name="lucene-spatial-5.5.0.jar"/>
 +    <library name="lucene-spatial3d-5.5.0.jar"/>
 +    <library name="lucene-suggest-5.5.0.jar"/>
 +    <library name="netty-3.10.5.Final.jar"/>
 +    <library name="securesm-1.0.jar"/>
 +    <library name="snakeyaml-1.15.jar"/>
 +    <library name="spatial4j-0.5.jar"/>
 +    <library name="t-digest-3.0.jar"/>
 +  </runtime>
 +
 +  <requires>
 +    <import plugin="nutch-extensionpoints" />
 +  </requires>
 +
 +  <extension id="org.apache.nutch.indexer.elastic"
 +    name="Elasticsearch Index Writer"
 +    point="org.apache.nutch.indexer.IndexWriter">
 +    <implementation id="ElasticIndexWriter"
 +      class="org.apache.nutch.indexwriter.elastic.ElasticIndexWriter" />
 +  </extension>
 +
 +</plugin>

http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
----------------------------------------------------------------------
diff --cc nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
index b0e70c8,0000000..29f36c7
mode 100644,000000..100644
--- a/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
+++ b/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
@@@ -1,28 -1,0 +1,31 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.nutch.indexwriter.elastic;
 +
 +public interface ElasticConstants {
 +  public static final String ELASTIC_PREFIX = "elastic.";
 +
-   public static final String HOST = ELASTIC_PREFIX + "host";
++  public static final String HOSTS = ELASTIC_PREFIX + "host";
 +  public static final String PORT = ELASTIC_PREFIX + "port";
 +  public static final String CLUSTER = ELASTIC_PREFIX + "cluster";
 +  public static final String INDEX = ELASTIC_PREFIX + "index";
 +  public static final String MAX_BULK_DOCS = ELASTIC_PREFIX + "max.bulk.docs";
 +  public static final String MAX_BULK_LENGTH = ELASTIC_PREFIX + "max.bulk.size";
++  public static final String EXPONENTIAL_BACKOFF_MILLIS = ELASTIC_PREFIX + "exponential.backoff.millis";
++  public static final String EXPONENTIAL_BACKOFF_RETRIES = ELASTIC_PREFIX + "exponential.backoff.retries";
++  public static final String BULK_CLOSE_TIMEOUT = ELASTIC_PREFIX + "bulk.close.timeout";
 +}

http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
----------------------------------------------------------------------
diff --cc nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
index 9367e41,0000000..00b96f1
mode 100644,000000..100644
--- a/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
+++ b/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
@@@ -1,279 -1,0 +1,261 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.nutch.indexwriter.elastic;
 +
 +import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
 +
 +import java.io.BufferedReader;
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.util.HashMap;
 +import java.util.Map;
++import java.util.concurrent.TimeUnit;
 +
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.mapred.JobConf;
 +import org.apache.nutch.indexer.IndexWriter;
 +import org.apache.nutch.indexer.NutchDocument;
- import org.elasticsearch.ElasticsearchException;
- import org.elasticsearch.action.ListenableActionFuture;
- import org.elasticsearch.action.bulk.BulkItemResponse;
- import org.elasticsearch.action.bulk.BulkRequestBuilder;
 +import org.elasticsearch.action.bulk.BulkResponse;
- import org.elasticsearch.action.delete.DeleteRequestBuilder;
- import org.elasticsearch.action.index.IndexRequestBuilder;
++import org.elasticsearch.action.bulk.BulkRequest;
++import org.elasticsearch.action.bulk.BackoffPolicy;
++import org.elasticsearch.action.bulk.BulkProcessor;
++import org.elasticsearch.action.delete.DeleteRequest;
++import org.elasticsearch.action.index.IndexRequest;
 +import org.elasticsearch.client.Client;
 +import org.elasticsearch.client.transport.TransportClient;
++import org.elasticsearch.common.unit.ByteSizeUnit;
++import org.elasticsearch.common.unit.ByteSizeValue;
++import org.elasticsearch.common.unit.TimeValue;
 +import org.elasticsearch.common.settings.Settings;
- import org.elasticsearch.common.settings.Settings.Builder;
 +import org.elasticsearch.common.transport.InetSocketTransportAddress;
 +import org.elasticsearch.node.Node;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +/**
++ * Sends NutchDocuments to a configured Elasticsearch index.
 + */
 +public class ElasticIndexWriter implements IndexWriter {
 +  public static Logger LOG = LoggerFactory.getLogger(ElasticIndexWriter.class);
 +
++  private static final int DEFAULT_PORT = 9300;
 +  private static final int DEFAULT_MAX_BULK_DOCS = 250;
 +  private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
++  private static final int DEFAULT_EXP_BACKOFF_MILLIS = 100;
++  private static final int DEFAULT_EXP_BACKOFF_RETRIES = 10;
++  private static final int DEFAULT_BULK_CLOSE_TIMEOUT = 600;
++  private static final String DEFAULT_INDEX = "nutch";
 +
++  private String defaultIndex;
 +  private Client client;
 +  private Node node;
-   private String defaultIndex;
++  private BulkProcessor bulkProcessor;
 +
-   private Configuration config;
++  private long bulkCloseTimeout;
 +
-   private BulkRequestBuilder bulk;
-   private ListenableActionFuture<BulkResponse> execute;
-   private int port = -1;
-   private String host = null;
-   private String clusterName = null;
-   private int maxBulkDocs;
-   private int maxBulkLength;
-   private long indexedDocs = 0;
-   private int bulkDocs = 0;
-   private int bulkLength = 0;
-   private boolean createNewBulk = false;
++  private Configuration config;
 +
 +  @Override
 +  public void open(JobConf job, String name) throws IOException {
-     clusterName = job.get(ElasticConstants.CLUSTER);
++    bulkCloseTimeout = job.getLong(ElasticConstants.BULK_CLOSE_TIMEOUT,
++        DEFAULT_BULK_CLOSE_TIMEOUT);
++    defaultIndex = job.get(ElasticConstants.INDEX, DEFAULT_INDEX);
++
++    int maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS,
++        DEFAULT_MAX_BULK_DOCS);
++    int maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH,
++        DEFAULT_MAX_BULK_LENGTH);
++    int expBackoffMillis = job.getInt(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS,
++        DEFAULT_EXP_BACKOFF_MILLIS);
++    int expBackoffRetries = job.getInt(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES,
++        DEFAULT_EXP_BACKOFF_RETRIES);
++
++    client = makeClient(job);
++
++    LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}", maxBulkDocs, maxBulkLength);
++    bulkProcessor = BulkProcessor.builder(client, bulkProcessorListener())
++      .setBulkActions(maxBulkDocs)
++      .setBulkSize(new ByteSizeValue(maxBulkLength, ByteSizeUnit.BYTES))
++      .setConcurrentRequests(1)
++      .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
++          TimeValue.timeValueMillis(expBackoffMillis), expBackoffRetries))
++      .build();
++  }
 +
-     host = job.get(ElasticConstants.HOST);
-     port = job.getInt(ElasticConstants.PORT, 9300);
++  /** Generates a TransportClient or NodeClient */
++  protected Client makeClient(Configuration conf) throws IOException {
++    String clusterName = conf.get(ElasticConstants.CLUSTER);
++    String[] hosts = conf.getStrings(ElasticConstants.HOSTS);
++    int port = conf.getInt(ElasticConstants.PORT, DEFAULT_PORT);
 +
-     Builder settingsBuilder = Settings.builder();
++    Settings.Builder settingsBuilder = Settings.settingsBuilder();
 +
 +    BufferedReader reader = new BufferedReader(
-         job.getConfResourceAsReader("elasticsearch.conf"));
++        conf.getConfResourceAsReader("elasticsearch.conf"));
 +    String line;
 +    String parts[];
- 
 +    while ((line = reader.readLine()) != null) {
 +      if (StringUtils.isNotBlank(line) && !line.startsWith("#")) {
 +        line.trim();
 +        parts = line.split("=");
 +
 +        if (parts.length == 2) {
 +          settingsBuilder.put(parts[0].trim(), parts[1].trim());
 +        }
 +      }
 +    }
 +
++    // Set the cluster name and build the settings
 +    if (StringUtils.isNotBlank(clusterName))
 +      settingsBuilder.put("cluster.name", clusterName);
 +
-     // Set the cluster name and build the settings
 +    Settings settings = settingsBuilder.build();
 +
++    Client client = null;
++
 +    // Prefer TransportClient
-     if (host != null && port > 1) {
-       TransportClient transportClient = TransportClient.builder()
-           .settings(settings).build()
-           .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
++    if (hosts != null && port > 1) {
++      TransportClient transportClient = TransportClient.builder().settings(settings).build();
++      for (String host: hosts)
++        transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
 +      client = transportClient;
 +    } else if (clusterName != null) {
 +      node = nodeBuilder().settings(settings).client(true).node();
 +      client = node.client();
 +    }
 +
-     bulk = client.prepareBulk();
-     defaultIndex = job.get(ElasticConstants.INDEX, "nutch");
-     maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS,
-         DEFAULT_MAX_BULK_DOCS);
-     maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH,
-         DEFAULT_MAX_BULK_LENGTH);
++    return client;
++  }
++
++  /** Generates a default BulkProcessor.Listener */
++  protected BulkProcessor.Listener bulkProcessorListener() {
++    return new BulkProcessor.Listener() {
++      @Override
++      public void beforeBulk(long executionId, BulkRequest request) { }
++
++      @Override
++      public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
++        throw new RuntimeException(failure);
++      }
++
++      @Override
++      public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
++        if (response.hasFailures()) {
++          LOG.warn("Failures occurred during bulk request");
++        }
++      }
++    };
 +  }
 +
 +  @Override
 +  public void write(NutchDocument doc) throws IOException {
 +    String id = (String) doc.getFieldValue("id");
 +    String type = doc.getDocumentMeta().get("type");
 +    if (type == null)
 +      type = "doc";
-     IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id);
 +
++    // Add each field of this doc to the index source
 +    Map<String, Object> source = new HashMap<String, Object>();
- 
-     // Loop through all fields of this doc
 +    for (String fieldName : doc.getFieldNames()) {
-       if (doc.getField(fieldName).getValues().size() > 1) {
++      if (doc.getFieldValue(fieldName) != null) {
 +        source.put(fieldName, doc.getFieldValue(fieldName));
-         // Loop through the values to keep track of the size of this
-         // document
-         for (Object value : doc.getField(fieldName).getValues()) {
-           bulkLength += value.toString().length();
-         }
-       } else {
-         if (doc.getFieldValue(fieldName) != null) {
-           source.put(fieldName, doc.getFieldValue(fieldName));
-           bulkLength += doc.getFieldValue(fieldName).toString().length();
-         }
 +      }
 +    }
-     request.setSource(source);
- 
-     // Add this indexing request to a bulk request
-     bulk.add(request);
-     indexedDocs++;
-     bulkDocs++;
- 
-     if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
-       LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = "
-           + bulkLength + ", total docs = " + indexedDocs
-           + ", last doc in bulk = '" + id + "']");
-       // Flush the bulk of indexing requests
-       createNewBulk = true;
-       commit();
-     }
++
++    IndexRequest request = new IndexRequest(defaultIndex, type, id).source(source);
++    bulkProcessor.add(request);
 +  }
 +
 +  @Override
 +  public void delete(String key) throws IOException {
-     try {
-       DeleteRequestBuilder builder = client.prepareDelete();
-       builder.setIndex(defaultIndex);
-       builder.setType("doc");
-       builder.setId(key);
-       builder.execute().actionGet();
-     } catch (ElasticsearchException e) {
-       throw makeIOException(e);
-     }
-   }
- 
-   public static IOException makeIOException(ElasticsearchException e) {
-     final IOException ioe = new IOException();
-     ioe.initCause(e);
-     return ioe;
++    DeleteRequest request = new DeleteRequest(defaultIndex, "doc", key);
++    bulkProcessor.add(request);
 +  }
 +
 +  @Override
 +  public void update(NutchDocument doc) throws IOException {
 +    write(doc);
 +  }
 +
 +  @Override
 +  public void commit() throws IOException {
-     if (execute != null) {
-       // wait for previous to finish
-       long beforeWait = System.currentTimeMillis();
-       BulkResponse actionGet = execute.actionGet();
-       if (actionGet.hasFailures()) {
-         for (BulkItemResponse item : actionGet) {
-           if (item.isFailed()) {
-             throw new RuntimeException("First failure in bulk: "
-                 + item.getFailureMessage());
-           }
-         }
-       }
-       long msWaited = System.currentTimeMillis() - beforeWait;
-       LOG.info("Previous took in ms " + actionGet.getTookInMillis()
-           + ", including wait " + msWaited);
-       execute = null;
-     }
-     if (bulk != null) {
-       if (bulkDocs > 0) {
-         // start a flush, note that this is an asynchronous call
-         execute = bulk.execute();
-       }
-       bulk = null;
-     }
-     if (createNewBulk) {
-       // Prepare a new bulk request
-       bulk = client.prepareBulk();
-       bulkDocs = 0;
-       bulkLength = 0;
-     }
++    bulkProcessor.flush();
 +  }
 +
 +  @Override
 +  public void close() throws IOException {
-     // Flush pending requests
-     LOG.info("Processing remaining requests [docs = " + bulkDocs
-         + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]");
-     createNewBulk = false;
-     commit();
-     // flush one more time to finalize the last bulk
-     LOG.info("Processing to finalize last execute");
-     createNewBulk = false;
-     commit();
- 
-     // Close
++    // Close BulkProcessor (automatically flushes)
++    try {
++      bulkProcessor.awaitClose(bulkCloseTimeout, TimeUnit.SECONDS);
++    } catch (InterruptedException e) {
++      LOG.warn("interrupted while waiting for BulkProcessor to complete ({})", e.getMessage());
++    }
++
 +    client.close();
 +    if (node != null) {
 +      node.close();
 +    }
 +  }
 +
 +  @Override
 +  public String describe() {
 +    StringBuffer sb = new StringBuffer("ElasticIndexWriter\n");
 +    sb.append("\t").append(ElasticConstants.CLUSTER)
 +        .append(" : elastic prefix cluster\n");
-     sb.append("\t").append(ElasticConstants.HOST).append(" : hostname\n");
++    sb.append("\t").append(ElasticConstants.HOSTS).append(" : hostname\n");
 +    sb.append("\t").append(ElasticConstants.PORT).append(" : port\n");
 +    sb.append("\t").append(ElasticConstants.INDEX)
 +        .append(" : elastic index command \n");
 +    sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS)
-         .append(" : elastic bulk index doc counts. (default 250) \n");
++        .append(" : elastic bulk index doc counts. (default ")
++        .append(DEFAULT_MAX_BULK_DOCS).append(")\n");
 +    sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH)
-         .append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n");
++        .append(" : elastic bulk index length in bytes. (default ")
++        .append(DEFAULT_MAX_BULK_LENGTH).append(")\n");
++    sb.append("\t").append(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS)
++        .append(" : elastic bulk exponential backoff initial delay in milliseconds. (default ")
++        .append(DEFAULT_EXP_BACKOFF_MILLIS).append(")\n");
++    sb.append("\t").append(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES)
++        .append(" : elastic bulk exponential backoff max retries. (default ")
++        .append(DEFAULT_EXP_BACKOFF_RETRIES).append(")\n");
++    sb.append("\t").append(ElasticConstants.BULK_CLOSE_TIMEOUT)
++        .append(" : elastic timeout for the last bulk in seconds. (default ")
++        .append(DEFAULT_BULK_CLOSE_TIMEOUT).append(")\n");
 +    return sb.toString();
 +  }
 +
 +  @Override
 +  public void setConf(Configuration conf) {
 +    config = conf;
 +    String cluster = conf.get(ElasticConstants.CLUSTER);
-     String host = conf.get(ElasticConstants.HOST);
++    String hosts = conf.get(ElasticConstants.HOSTS);
 +
-     if (StringUtils.isBlank(cluster) && StringUtils.isBlank(host)) {
++    if (StringUtils.isBlank(cluster) && StringUtils.isBlank(hosts)) {
 +      String message = "Missing elastic.cluster and elastic.host. At least one of them should be set in nutch-site.xml ";
 +      message += "\n" + describe();
 +      LOG.error(message);
 +      throw new RuntimeException(message);
 +    }
 +  }
 +
 +  @Override
 +  public Configuration getConf() {
 +    return config;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-solr/src/main/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
----------------------------------------------------------------------
diff --cc nutch-plugins/indexer-solr/src/main/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
index eec0080,0000000..d70bc62
mode 100644,000000..100644
--- a/nutch-plugins/indexer-solr/src/main/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
+++ b/nutch-plugins/indexer-solr/src/main/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
@@@ -1,97 -1,0 +1,99 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.nutch.indexwriter.solr;
 +
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.apache.hadoop.mapred.JobConf;
++import org.apache.http.impl.client.SystemDefaultHttpClient;
 +import org.apache.solr.client.solrj.SolrClient;
 +import org.apache.solr.client.solrj.impl.HttpSolrClient;
 +import org.apache.solr.client.solrj.impl.CloudSolrClient;
 +
 +import java.net.MalformedURLException;
 +
 +public class SolrUtils {
 +
 +  public static Logger LOG = LoggerFactory.getLogger(SolrUtils.class);
++  private static HttpClient HTTP_CLIENT = new SystemDefaultHttpClient();
 +
 +  /**
 +   *
 +   *
 +   * @param JobConf
 +   * @return SolrClient
 +   */
 +  public static ArrayList<SolrClient> getSolrClients(JobConf job) throws MalformedURLException {
 +    String[] urls = job.getStrings(SolrConstants.SERVER_URL);
 +    String[] zkHostString = job.getStrings(SolrConstants.ZOOKEEPER_HOSTS);
 +    ArrayList<SolrClient> solrClients = new ArrayList<SolrClient>();
 +    
 +    if (zkHostString != null && zkHostString.length > 0) {
 +      for (int i = 0; i < zkHostString.length; i++) {
 +        CloudSolrClient sc = getCloudSolrClient(zkHostString[i]);
 +        sc.setDefaultCollection(job.get(SolrConstants.COLLECTION));
 +        solrClients.add(sc);
 +      }
 +    } else {
 +      for (int i = 0; i < urls.length; i++) {
-         SolrClient sc = new HttpSolrClient(urls[i]);
++        SolrClient sc = new HttpSolrClient(urls[i], HTTP_CLIENT);
 +        solrClients.add(sc);
 +      }
 +    }
 +
 +    return solrClients;
 +  }
 +
 +  public static CloudSolrClient getCloudSolrClient(String url) throws MalformedURLException {
-     CloudSolrClient sc = new CloudSolrClient(url.replace('|', ','));
++    CloudSolrClient sc = new CloudSolrClient(url.replace('|', ','), HTTP_CLIENT);
 +    sc.setParallelUpdates(true);
 +    sc.connect();
 +    return sc;
 +  }
 +
 +  public static SolrClient getHttpSolrClient(String url) throws MalformedURLException {
-     SolrClient sc =new HttpSolrClient(url);
++    SolrClient sc =new HttpSolrClient(url, HTTP_CLIENT);
 +    return sc;
 +  }
 +  
 +  public static String stripNonCharCodepoints(String input) {
 +    StringBuilder retval = new StringBuilder();
 +    char ch;
 +
 +    for (int i = 0; i < input.length(); i++) {
 +      ch = input.charAt(i);
 +
 +      // Strip all non-characters
 +      // http://unicode.org/cldr/utility/list-unicodeset.jsp?a=[:Noncharacter_Code_Point=True:]
 +      // and non-printable control characters except tabulator, new line and
 +      // carriage return
 +      if (ch % 0x10000 != 0xffff && // 0xffff - 0x10ffff range step 0x10000
 +          ch % 0x10000 != 0xfffe && // 0xfffe - 0x10fffe range
 +          (ch <= 0xfdd0 || ch >= 0xfdef) && // 0xfdd0 - 0xfdef
 +          (ch > 0x1F || ch == 0x9 || ch == 0xa || ch == 0xd)) {
 +
 +        retval.append(ch);
 +      }
 +    }
 +
 +    return retval.toString();
 +  }
 +
 +}