You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 21:32:59 UTC
[4/5] nutch git commit: Merge branch 'master' into NUTCH-2293
Merge branch 'master' into NUTCH-2293
Project: http://git-wip-us.apache.org/repos/asf/nutch/repo
Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/2175c767
Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/2175c767
Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/2175c767
Branch: refs/heads/NUTCH-2292
Commit: 2175c767c64f92b77ebe497750241e4740acea9b
Parents: 0bf453e 80afa31
Author: Thamme Gowda <th...@apache.org>
Authored: Sat Jul 16 14:32:19 2016 -0700
Committer: Thamme Gowda <th...@apache.org>
Committed: Sat Jul 16 14:32:19 2016 -0700
----------------------------------------------------------------------
build.xml | 1 +
conf/nutch-default.xml | 25 +-
nutch-plugins/build.xml | 1 +
nutch-plugins/indexer-elastic/build.xml | 13 +
nutch-plugins/indexer-elastic/plugin.xml | 5 +-
.../indexwriter/elastic/ElasticConstants.java | 5 +-
.../indexwriter/elastic/ElasticIndexWriter.java | 236 +++++++++----------
.../nutch/indexwriter/solr/SolrUtils.java | 8 +-
.../src/test/conf/nutch-site-test.xml | 57 +++++
.../elastic/TestElasticIndexWriter.java | 221 +++++++++++++++++
10 files changed, 436 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/build.xml
----------------------------------------------------------------------
diff --cc nutch-plugins/build.xml
index 75ae2e7,0000000..20ef870
mode 100755,000000..100755
--- a/nutch-plugins/build.xml
+++ b/nutch-plugins/build.xml
@@@ -1,213 -1,0 +1,214 @@@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="Nutch" default="deploy-core" basedir=".">
+
+ <target name="deploy-core">
+ <ant target="compile-core" inheritall="false" dir="../.."/>
+ <ant target="deploy"/>
+ </target>
+
+ <!-- ====================================================== -->
+ <!-- Build & deploy all the plugin jars. -->
+ <!-- ====================================================== -->
+ <target name="deploy">
+ <ant dir="creativecommons" target="deploy"/>
+ <ant dir="feed" target="deploy"/>
+ <ant dir="headings" target="deploy"/>
+ <ant dir="index-basic" target="deploy"/>
+ <ant dir="index-anchor" target="deploy"/>
+ <ant dir="index-geoip" target="deploy"/>
+ <ant dir="index-more" target="deploy"/>
+ <ant dir="index-replace" target="deploy"/>
+ <ant dir="index-static" target="deploy"/>
+ <ant dir="index-metadata" target="deploy"/>
+ <ant dir="index-links" target="deploy"/>
+ <ant dir="mimetype-filter" target="deploy"/>
+ <ant dir="indexer-cloudsearch" target="deploy"/>
+ <ant dir="indexer-dummy" target="deploy"/>
+ <ant dir="indexer-elastic" target="deploy"/>
+ <ant dir="indexer-solr" target="deploy"/>
+ <ant dir="language-identifier" target="deploy"/>
+ <ant dir="lib-http" target="deploy"/>
+ <ant dir="lib-nekohtml" target="deploy"/>
+ <ant dir="lib-regex-filter" target="deploy"/>
+ <ant dir="lib-xml" target="deploy"/>
+ <ant dir="microformats-reltag" target="deploy"/>
+ <ant dir="nutch-extensionpoints" target="deploy"/>
+ <ant dir="protocol-file" target="deploy"/>
+ <ant dir="protocol-ftp" target="deploy"/>
+ <ant dir="protocol-http" target="deploy"/>
+ <ant dir="protocol-httpclient" target="deploy"/>
+ <ant dir="lib-htmlunit" target="deploy"/>
+ <ant dir="protocol-htmlunit" target="deploy" />
+ <ant dir="lib-selenium" target="deploy"/>
+ <ant dir="protocol-selenium" target="deploy" />
+ <ant dir="protocol-interactiveselenium" target="deploy" />
+ <ant dir="parse-ext" target="deploy"/>
+ <ant dir="parse-js" target="deploy"/>
+ <ant dir="parse-html" target="deploy"/>
+ <ant dir="parse-metatags" target="deploy"/>
+ <ant dir="parse-swf" target="deploy"/>
+ <ant dir="parse-tika" target="deploy"/>
+ <ant dir="parse-zip" target="deploy"/>
+ <ant dir="scoring-depth" target="deploy"/>
+ <ant dir="scoring-opic" target="deploy"/>
+ <ant dir="scoring-link" target="deploy"/>
+ <ant dir="scoring-similarity" target="deploy"/>
+ <ant dir="subcollection" target="deploy"/>
+ <ant dir="tld" target="deploy"/>
+ <ant dir="urlfilter-automaton" target="deploy"/>
+ <ant dir="urlfilter-domain" target="deploy" />
+ <ant dir="urlfilter-domainblacklist" target="deploy" />
+ <ant dir="urlfilter-prefix" target="deploy"/>
+ <ant dir="urlfilter-regex" target="deploy"/>
+ <ant dir="urlfilter-suffix" target="deploy"/>
+ <ant dir="urlfilter-validator" target="deploy"/>
+ <ant dir="urlfilter-ignoreexempt" target="deploy"/>
+ <ant dir="parsefilter-naivebayes" target="deploy"/>
+ <ant dir="parsefilter-regex" target="deploy"/>
+ <ant dir="urlmeta" target="deploy"/>
+ <ant dir="urlnormalizer-ajax" target="deploy"/>
+ <ant dir="urlnormalizer-basic" target="deploy"/>
+ <ant dir="urlnormalizer-host" target="deploy"/>
+ <ant dir="urlnormalizer-pass" target="deploy"/>
+ <ant dir="urlnormalizer-protocol" target="deploy"/>
+ <ant dir="urlnormalizer-querystring" target="deploy"/>
+ <ant dir="urlnormalizer-regex" target="deploy"/>
+ <ant dir="urlnormalizer-slash" target="deploy"/>
+ </target>
+
+ <!-- ====================================================== -->
+ <!-- Test all of the plugins. -->
+ <!-- ====================================================== -->
+ <target name="test">
+ <parallel threadCount="2">
+ <ant dir="creativecommons" target="test"/>
+ <ant dir="index-basic" target="test"/>
+ <ant dir="index-anchor" target="test"/>
+ <ant dir="index-geoip" target="test"/>
+ <ant dir="index-more" target="test"/>
+ <ant dir="index-static" target="test"/>
+ <ant dir="index-replace" target="test"/>
+ <ant dir="index-links" target="test"/>
+ <ant dir="mimetype-filter" target="test"/>
++ <ant dir="indexer-elastic" target="test"/>
+ <ant dir="language-identifier" target="test"/>
+ <ant dir="lib-http" target="test"/>
+ <ant dir="protocol-file" target="test"/>
+ <ant dir="protocol-http" target="test"/>
+ <ant dir="protocol-httpclient" target="test"/>
+ <!--ant dir="parse-ext" target="test"/-->
+ <ant dir="feed" target="test"/>
+ <ant dir="parse-html" target="test"/>
+ <ant dir="parse-metatags" target="test"/>
+ <ant dir="parse-swf" target="test"/>
+ <ant dir="parse-tika" target="test"/>
+ <ant dir="parse-zip" target="test"/>
+ <ant dir="parsefilter-regex" target="test"/>
+ <ant dir="subcollection" target="test"/>
+ <ant dir="urlfilter-automaton" target="test"/>
+ <ant dir="urlfilter-domain" target="test"/>
+ <ant dir="urlfilter-domainblacklist" target="test"/>
+ <ant dir="urlfilter-prefix" target="test"/>
+ <ant dir="urlfilter-regex" target="test"/>
+ <ant dir="urlfilter-suffix" target="test"/>
+ <ant dir="urlfilter-validator" target="test"/>
+ <ant dir="urlfilter-ignoreexempt" target="test"/>
+ <ant dir="urlnormalizer-ajax" target="test"/>
+ <ant dir="urlnormalizer-basic" target="test"/>
+ <ant dir="urlnormalizer-host" target="test"/>
+ <ant dir="urlnormalizer-pass" target="test"/>
+ <ant dir="urlnormalizer-protocol" target="test"/>
+ <ant dir="urlnormalizer-querystring" target="test"/>
+ <ant dir="urlnormalizer-regex" target="test"/>
+ <ant dir="urlnormalizer-slash" target="test"/>
+ </parallel>
+ </target>
+
+ <!-- ====================================================== -->
+ <!-- Clean all of the plugins. -->
+ <!-- ====================================================== -->
+ <target name="clean">
+ <ant dir="creativecommons" target="clean"/>
+ <ant dir="feed" target="clean"/>
+ <ant dir="headings" target="clean"/>
+ <ant dir="index-basic" target="clean"/>
+ <ant dir="index-anchor" target="clean"/>
+ <ant dir="index-geoip" target="clean"/>
+ <ant dir="index-more" target="clean"/>
+ <ant dir="index-static" target="clean"/>
+ <ant dir="index-replace" target="clean"/>
+ <ant dir="index-metadata" target="clean"/>
+ <ant dir="index-links" target="clean"/>
+ <ant dir="mimetype-filter" target="clean"/>
+ <ant dir="indexer-cloudsearch" target="clean"/>
+ <ant dir="indexer-dummy" target="clean"/>
+ <ant dir="indexer-elastic" target="clean"/>
+ <ant dir="indexer-solr" target="clean"/>
+ <ant dir="language-identifier" target="clean"/>
+ <!-- <ant dir="lib-commons-httpclient" target="clean"/> -->
+ <ant dir="lib-http" target="clean"/>
+ <!-- <ant dir="lib-lucene-analyzers" target="clean"/>-->
+ <ant dir="lib-nekohtml" target="clean"/>
+ <ant dir="lib-regex-filter" target="clean"/>
+ <ant dir="lib-xml" target="clean"/>
+ <ant dir="microformats-reltag" target="clean"/>
+ <ant dir="nutch-extensionpoints" target="clean"/>
+ <ant dir="protocol-file" target="clean"/>
+ <ant dir="protocol-ftp" target="clean"/>
+ <ant dir="protocol-http" target="clean"/>
+ <ant dir="protocol-httpclient" target="clean"/>
+ <ant dir="lib-htmlunit" target="clean"/>
+ <ant dir="protocol-htmlunit" target="clean" />
+ <ant dir="lib-selenium" target="clean"/>
+ <ant dir="protocol-selenium" target="clean" />
+ <ant dir="protocol-interactiveselenium" target="clean" />
+ <ant dir="parse-ext" target="clean"/>
+ <ant dir="parse-js" target="clean"/>
+ <ant dir="parse-html" target="clean"/>
+ <ant dir="parse-metatags" target="clean"/>
+ <ant dir="parse-swf" target="clean"/>
+ <ant dir="parse-tika" target="clean"/>
+ <ant dir="parse-zip" target="clean"/>
+ <ant dir="parsefilter-regex" target="clean"/>
+ <ant dir="scoring-depth" target="clean"/>
+ <ant dir="scoring-opic" target="clean"/>
+ <ant dir="scoring-link" target="clean"/>
+ <ant dir="scoring-similarity" target="clean"/>
+ <ant dir="subcollection" target="clean"/>
+ <ant dir="tld" target="clean"/>
+ <ant dir="urlfilter-automaton" target="clean"/>
+ <ant dir="urlfilter-domain" target="clean" />
+ <ant dir="urlfilter-domainblacklist" target="clean" />
+ <ant dir="urlfilter-prefix" target="clean"/>
+ <ant dir="urlfilter-regex" target="clean"/>
+ <ant dir="urlfilter-suffix" target="clean"/>
+ <ant dir="urlfilter-validator" target="clean"/>
+ <ant dir="urlfilter-ignoreexempt" target="clean"/>
+ <ant dir="parsefilter-naivebayes" target="clean" />
+ <ant dir="urlmeta" target="clean"/>
+ <ant dir="urlnormalizer-ajax" target="clean"/>
+ <ant dir="urlnormalizer-basic" target="clean"/>
+ <ant dir="urlnormalizer-host" target="clean"/>
+ <ant dir="urlnormalizer-pass" target="clean"/>
+ <ant dir="urlnormalizer-protocol" target="clean"/>
+ <ant dir="urlnormalizer-querystring" target="clean"/>
+ <ant dir="urlnormalizer-regex" target="clean"/>
+ <ant dir="urlnormalizer-slash" target="clean"/>
+ </target>
+</project>
http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-elastic/build.xml
----------------------------------------------------------------------
diff --cc nutch-plugins/indexer-elastic/build.xml
index 38955ff,0000000..6955f61
mode 100644,000000..100644
--- a/nutch-plugins/indexer-elastic/build.xml
+++ b/nutch-plugins/indexer-elastic/build.xml
@@@ -1,22 -1,0 +1,35 @@@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="indexer-elastic" default="jar-core">
+
+ <import file="../build-plugin.xml" />
+
++ <!-- Add compilation dependencies to classpath -->
++ <path id="plugin.deps">
++ <pathelement location="${build.dir}/test/conf"/>
++ </path>
++
++ <!-- Deploy Unit test dependencies -->
++ <target name="deps-test">
++ <copy toDir="${build.test}">
++ <fileset dir="${src.test}" excludes="**/*.java"/>
++ </copy>
++ </target>
++
++
+</project>
http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-elastic/plugin.xml
----------------------------------------------------------------------
diff --cc nutch-plugins/indexer-elastic/plugin.xml
index d99a665,0000000..401e342
mode 100644,000000..100644
--- a/nutch-plugins/indexer-elastic/plugin.xml
+++ b/nutch-plugins/indexer-elastic/plugin.xml
@@@ -1,71 -1,0 +1,70 @@@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
-
++
+ http://www.apache.org/licenses/LICENSE-2.0
-
++
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<plugin id="indexer-elastic" name="ElasticIndexWriter" version="1.0.0"
+ provider-name="nutch.apache.org">
+
+ <runtime>
+ <library name="indexer-elastic.jar">
+ <export name="*" />
+ </library>
+ <library name="elasticsearch-2.3.3.jar"/>
+ <library name="commons-cli-1.3.1.jar"/>
+ <library name="compress-lzf-1.0.2.jar"/>
+ <library name="guava-18.0.jar"/>
+ <library name="HdrHistogram-2.1.6.jar"/>
+ <library name="hppc-0.7.1.jar"/>
- <library name="indexer-elastic.jar"/>
+ <library name="jackson-core-2.6.6.jar"/>
+ <library name="jackson-dataformat-cbor-2.6.6.jar"/>
+ <library name="jackson-dataformat-smile-2.6.6.jar"/>
+ <library name="jackson-dataformat-yaml-2.6.6.jar"/>
+ <library name="joda-convert-1.2.jar"/>
+ <library name="joda-time-2.8.2.jar"/>
+ <library name="jsr166e-1.1.0.jar"/>
+ <library name="lucene-analyzers-common-5.5.0.jar"/>
+ <library name="lucene-backward-codecs-5.5.0.jar"/>
+ <library name="lucene-core-5.5.0.jar"/>
+ <library name="lucene-grouping-5.5.0.jar"/>
+ <library name="lucene-highlighter-5.5.0.jar"/>
+ <library name="lucene-join-5.5.0.jar"/>
+ <library name="lucene-memory-5.5.0.jar"/>
+ <library name="lucene-misc-5.5.0.jar"/>
+ <library name="lucene-queries-5.5.0.jar"/>
+ <library name="lucene-queryparser-5.5.0.jar"/>
+ <library name="lucene-sandbox-5.5.0.jar"/>
+ <library name="lucene-spatial-5.5.0.jar"/>
+ <library name="lucene-spatial3d-5.5.0.jar"/>
+ <library name="lucene-suggest-5.5.0.jar"/>
+ <library name="netty-3.10.5.Final.jar"/>
+ <library name="securesm-1.0.jar"/>
+ <library name="snakeyaml-1.15.jar"/>
+ <library name="spatial4j-0.5.jar"/>
+ <library name="t-digest-3.0.jar"/>
+ </runtime>
+
+ <requires>
+ <import plugin="nutch-extensionpoints" />
+ </requires>
+
+ <extension id="org.apache.nutch.indexer.elastic"
+ name="Elasticsearch Index Writer"
+ point="org.apache.nutch.indexer.IndexWriter">
+ <implementation id="ElasticIndexWriter"
+ class="org.apache.nutch.indexwriter.elastic.ElasticIndexWriter" />
+ </extension>
+
+</plugin>
http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
----------------------------------------------------------------------
diff --cc nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
index b0e70c8,0000000..29f36c7
mode 100644,000000..100644
--- a/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
+++ b/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
@@@ -1,28 -1,0 +1,31 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexwriter.elastic;
+
+public interface ElasticConstants {
+ public static final String ELASTIC_PREFIX = "elastic.";
+
- public static final String HOST = ELASTIC_PREFIX + "host";
++ public static final String HOSTS = ELASTIC_PREFIX + "host";
+ public static final String PORT = ELASTIC_PREFIX + "port";
+ public static final String CLUSTER = ELASTIC_PREFIX + "cluster";
+ public static final String INDEX = ELASTIC_PREFIX + "index";
+ public static final String MAX_BULK_DOCS = ELASTIC_PREFIX + "max.bulk.docs";
+ public static final String MAX_BULK_LENGTH = ELASTIC_PREFIX + "max.bulk.size";
++ public static final String EXPONENTIAL_BACKOFF_MILLIS = ELASTIC_PREFIX + "exponential.backoff.millis";
++ public static final String EXPONENTIAL_BACKOFF_RETRIES = ELASTIC_PREFIX + "exponential.backoff.retries";
++ public static final String BULK_CLOSE_TIMEOUT = ELASTIC_PREFIX + "bulk.close.timeout";
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
----------------------------------------------------------------------
diff --cc nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
index 9367e41,0000000..00b96f1
mode 100644,000000..100644
--- a/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
+++ b/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
@@@ -1,279 -1,0 +1,261 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexwriter.elastic;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
++import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.nutch.indexer.IndexWriter;
+import org.apache.nutch.indexer.NutchDocument;
- import org.elasticsearch.ElasticsearchException;
- import org.elasticsearch.action.ListenableActionFuture;
- import org.elasticsearch.action.bulk.BulkItemResponse;
- import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
- import org.elasticsearch.action.delete.DeleteRequestBuilder;
- import org.elasticsearch.action.index.IndexRequestBuilder;
++import org.elasticsearch.action.bulk.BulkRequest;
++import org.elasticsearch.action.bulk.BackoffPolicy;
++import org.elasticsearch.action.bulk.BulkProcessor;
++import org.elasticsearch.action.delete.DeleteRequest;
++import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
++import org.elasticsearch.common.unit.ByteSizeUnit;
++import org.elasticsearch.common.unit.ByteSizeValue;
++import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.settings.Settings;
- import org.elasticsearch.common.settings.Settings.Builder;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.node.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
++ * Sends NutchDocuments to a configured Elasticsearch index.
+ */
+public class ElasticIndexWriter implements IndexWriter {
+ public static Logger LOG = LoggerFactory.getLogger(ElasticIndexWriter.class);
+
++ private static final int DEFAULT_PORT = 9300;
+ private static final int DEFAULT_MAX_BULK_DOCS = 250;
+ private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
++ private static final int DEFAULT_EXP_BACKOFF_MILLIS = 100;
++ private static final int DEFAULT_EXP_BACKOFF_RETRIES = 10;
++ private static final int DEFAULT_BULK_CLOSE_TIMEOUT = 600;
++ private static final String DEFAULT_INDEX = "nutch";
+
++ private String defaultIndex;
+ private Client client;
+ private Node node;
- private String defaultIndex;
++ private BulkProcessor bulkProcessor;
+
- private Configuration config;
++ private long bulkCloseTimeout;
+
- private BulkRequestBuilder bulk;
- private ListenableActionFuture<BulkResponse> execute;
- private int port = -1;
- private String host = null;
- private String clusterName = null;
- private int maxBulkDocs;
- private int maxBulkLength;
- private long indexedDocs = 0;
- private int bulkDocs = 0;
- private int bulkLength = 0;
- private boolean createNewBulk = false;
++ private Configuration config;
+
+ @Override
+ public void open(JobConf job, String name) throws IOException {
- clusterName = job.get(ElasticConstants.CLUSTER);
++ bulkCloseTimeout = job.getLong(ElasticConstants.BULK_CLOSE_TIMEOUT,
++ DEFAULT_BULK_CLOSE_TIMEOUT);
++ defaultIndex = job.get(ElasticConstants.INDEX, DEFAULT_INDEX);
++
++ int maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS,
++ DEFAULT_MAX_BULK_DOCS);
++ int maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH,
++ DEFAULT_MAX_BULK_LENGTH);
++ int expBackoffMillis = job.getInt(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS,
++ DEFAULT_EXP_BACKOFF_MILLIS);
++ int expBackoffRetries = job.getInt(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES,
++ DEFAULT_EXP_BACKOFF_RETRIES);
++
++ client = makeClient(job);
++
++ LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}", maxBulkDocs, maxBulkLength);
++ bulkProcessor = BulkProcessor.builder(client, bulkProcessorListener())
++ .setBulkActions(maxBulkDocs)
++ .setBulkSize(new ByteSizeValue(maxBulkLength, ByteSizeUnit.BYTES))
++ .setConcurrentRequests(1)
++ .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
++ TimeValue.timeValueMillis(expBackoffMillis), expBackoffRetries))
++ .build();
++ }
+
- host = job.get(ElasticConstants.HOST);
- port = job.getInt(ElasticConstants.PORT, 9300);
++ /** Generates a TransportClient or NodeClient */
++ protected Client makeClient(Configuration conf) throws IOException {
++ String clusterName = conf.get(ElasticConstants.CLUSTER);
++ String[] hosts = conf.getStrings(ElasticConstants.HOSTS);
++ int port = conf.getInt(ElasticConstants.PORT, DEFAULT_PORT);
+
- Builder settingsBuilder = Settings.builder();
++ Settings.Builder settingsBuilder = Settings.settingsBuilder();
+
+ BufferedReader reader = new BufferedReader(
- job.getConfResourceAsReader("elasticsearch.conf"));
++ conf.getConfResourceAsReader("elasticsearch.conf"));
+ String line;
+ String parts[];
-
+ while ((line = reader.readLine()) != null) {
+ if (StringUtils.isNotBlank(line) && !line.startsWith("#")) {
+ line.trim();
+ parts = line.split("=");
+
+ if (parts.length == 2) {
+ settingsBuilder.put(parts[0].trim(), parts[1].trim());
+ }
+ }
+ }
+
++ // Set the cluster name and build the settings
+ if (StringUtils.isNotBlank(clusterName))
+ settingsBuilder.put("cluster.name", clusterName);
+
- // Set the cluster name and build the settings
+ Settings settings = settingsBuilder.build();
+
++ Client client = null;
++
+ // Prefer TransportClient
- if (host != null && port > 1) {
- TransportClient transportClient = TransportClient.builder()
- .settings(settings).build()
- .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
++ if (hosts != null && port > 1) {
++ TransportClient transportClient = TransportClient.builder().settings(settings).build();
++ for (String host: hosts)
++ transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
+ client = transportClient;
+ } else if (clusterName != null) {
+ node = nodeBuilder().settings(settings).client(true).node();
+ client = node.client();
+ }
+
- bulk = client.prepareBulk();
- defaultIndex = job.get(ElasticConstants.INDEX, "nutch");
- maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS,
- DEFAULT_MAX_BULK_DOCS);
- maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH,
- DEFAULT_MAX_BULK_LENGTH);
++ return client;
++ }
++
++ /** Generates a default BulkProcessor.Listener */
++ protected BulkProcessor.Listener bulkProcessorListener() {
++ return new BulkProcessor.Listener() {
++ @Override
++ public void beforeBulk(long executionId, BulkRequest request) { }
++
++ @Override
++ public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
++ throw new RuntimeException(failure);
++ }
++
++ @Override
++ public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
++ if (response.hasFailures()) {
++ LOG.warn("Failures occurred during bulk request");
++ }
++ }
++ };
+ }
+
+ @Override
+ public void write(NutchDocument doc) throws IOException {
+ String id = (String) doc.getFieldValue("id");
+ String type = doc.getDocumentMeta().get("type");
+ if (type == null)
+ type = "doc";
- IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id);
+
++ // Add each field of this doc to the index source
+ Map<String, Object> source = new HashMap<String, Object>();
-
- // Loop through all fields of this doc
+ for (String fieldName : doc.getFieldNames()) {
- if (doc.getField(fieldName).getValues().size() > 1) {
++ if (doc.getFieldValue(fieldName) != null) {
+ source.put(fieldName, doc.getFieldValue(fieldName));
- // Loop through the values to keep track of the size of this
- // document
- for (Object value : doc.getField(fieldName).getValues()) {
- bulkLength += value.toString().length();
- }
- } else {
- if (doc.getFieldValue(fieldName) != null) {
- source.put(fieldName, doc.getFieldValue(fieldName));
- bulkLength += doc.getFieldValue(fieldName).toString().length();
- }
+ }
+ }
- request.setSource(source);
-
- // Add this indexing request to a bulk request
- bulk.add(request);
- indexedDocs++;
- bulkDocs++;
-
- if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
- LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = "
- + bulkLength + ", total docs = " + indexedDocs
- + ", last doc in bulk = '" + id + "']");
- // Flush the bulk of indexing requests
- createNewBulk = true;
- commit();
- }
++
++ IndexRequest request = new IndexRequest(defaultIndex, type, id).source(source);
++ bulkProcessor.add(request);
+ }
+
+ @Override
+ public void delete(String key) throws IOException {
- try {
- DeleteRequestBuilder builder = client.prepareDelete();
- builder.setIndex(defaultIndex);
- builder.setType("doc");
- builder.setId(key);
- builder.execute().actionGet();
- } catch (ElasticsearchException e) {
- throw makeIOException(e);
- }
- }
-
- public static IOException makeIOException(ElasticsearchException e) {
- final IOException ioe = new IOException();
- ioe.initCause(e);
- return ioe;
++ DeleteRequest request = new DeleteRequest(defaultIndex, "doc", key);
++ bulkProcessor.add(request);
+ }
+
+ @Override
+ public void update(NutchDocument doc) throws IOException {
+ write(doc);
+ }
+
+ @Override
+ public void commit() throws IOException {
- if (execute != null) {
- // wait for previous to finish
- long beforeWait = System.currentTimeMillis();
- BulkResponse actionGet = execute.actionGet();
- if (actionGet.hasFailures()) {
- for (BulkItemResponse item : actionGet) {
- if (item.isFailed()) {
- throw new RuntimeException("First failure in bulk: "
- + item.getFailureMessage());
- }
- }
- }
- long msWaited = System.currentTimeMillis() - beforeWait;
- LOG.info("Previous took in ms " + actionGet.getTookInMillis()
- + ", including wait " + msWaited);
- execute = null;
- }
- if (bulk != null) {
- if (bulkDocs > 0) {
- // start a flush, note that this is an asynchronous call
- execute = bulk.execute();
- }
- bulk = null;
- }
- if (createNewBulk) {
- // Prepare a new bulk request
- bulk = client.prepareBulk();
- bulkDocs = 0;
- bulkLength = 0;
- }
++ bulkProcessor.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
- // Flush pending requests
- LOG.info("Processing remaining requests [docs = " + bulkDocs
- + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]");
- createNewBulk = false;
- commit();
- // flush one more time to finalize the last bulk
- LOG.info("Processing to finalize last execute");
- createNewBulk = false;
- commit();
-
- // Close
++ // Close BulkProcessor (automatically flushes)
++ try {
++ bulkProcessor.awaitClose(bulkCloseTimeout, TimeUnit.SECONDS);
++ } catch (InterruptedException e) {
++ LOG.warn("interrupted while waiting for BulkProcessor to complete ({})", e.getMessage());
++ }
++
+ client.close();
+ if (node != null) {
+ node.close();
+ }
+ }
+
+ @Override
+ public String describe() {
+ StringBuffer sb = new StringBuffer("ElasticIndexWriter\n");
+ sb.append("\t").append(ElasticConstants.CLUSTER)
+ .append(" : elastic prefix cluster\n");
- sb.append("\t").append(ElasticConstants.HOST).append(" : hostname\n");
++ sb.append("\t").append(ElasticConstants.HOSTS).append(" : hostname\n");
+ sb.append("\t").append(ElasticConstants.PORT).append(" : port\n");
+ sb.append("\t").append(ElasticConstants.INDEX)
+ .append(" : elastic index command \n");
+ sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS)
- .append(" : elastic bulk index doc counts. (default 250) \n");
++ .append(" : elastic bulk index doc counts. (default ")
++ .append(DEFAULT_MAX_BULK_DOCS).append(")\n");
+ sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH)
- .append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n");
++ .append(" : elastic bulk index length in bytes. (default ")
++ .append(DEFAULT_MAX_BULK_LENGTH).append(")\n");
++ sb.append("\t").append(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS)
++ .append(" : elastic bulk exponential backoff initial delay in milliseconds. (default ")
++ .append(DEFAULT_EXP_BACKOFF_MILLIS).append(")\n");
++ sb.append("\t").append(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES)
++ .append(" : elastic bulk exponential backoff max retries. (default ")
++ .append(DEFAULT_EXP_BACKOFF_RETRIES).append(")\n");
++ sb.append("\t").append(ElasticConstants.BULK_CLOSE_TIMEOUT)
++ .append(" : elastic timeout for the last bulk in seconds. (default ")
++ .append(DEFAULT_BULK_CLOSE_TIMEOUT).append(")\n");
+ return sb.toString();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ config = conf;
+ String cluster = conf.get(ElasticConstants.CLUSTER);
- String host = conf.get(ElasticConstants.HOST);
++ String hosts = conf.get(ElasticConstants.HOSTS);
+
- if (StringUtils.isBlank(cluster) && StringUtils.isBlank(host)) {
++ if (StringUtils.isBlank(cluster) && StringUtils.isBlank(hosts)) {
+ String message = "Missing elastic.cluster and elastic.host. At least one of them should be set in nutch-site.xml ";
+ message += "\n" + describe();
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return config;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-solr/src/main/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
----------------------------------------------------------------------
diff --cc nutch-plugins/indexer-solr/src/main/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
index eec0080,0000000..d70bc62
mode 100644,000000..100644
--- a/nutch-plugins/indexer-solr/src/main/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
+++ b/nutch-plugins/indexer-solr/src/main/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
@@@ -1,97 -1,0 +1,99 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexwriter.solr;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.mapred.JobConf;
++import org.apache.http.impl.client.SystemDefaultHttpClient;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+
+import java.net.MalformedURLException;
+
+public class SolrUtils {
+
+ public static Logger LOG = LoggerFactory.getLogger(SolrUtils.class);
++ private static HttpClient HTTP_CLIENT = new SystemDefaultHttpClient();
+
+ /**
+ *
+ *
+ * @param JobConf
+ * @return SolrClient
+ */
+ public static ArrayList<SolrClient> getSolrClients(JobConf job) throws MalformedURLException {
+ String[] urls = job.getStrings(SolrConstants.SERVER_URL);
+ String[] zkHostString = job.getStrings(SolrConstants.ZOOKEEPER_HOSTS);
+ ArrayList<SolrClient> solrClients = new ArrayList<SolrClient>();
+
+ if (zkHostString != null && zkHostString.length > 0) {
+ for (int i = 0; i < zkHostString.length; i++) {
+ CloudSolrClient sc = getCloudSolrClient(zkHostString[i]);
+ sc.setDefaultCollection(job.get(SolrConstants.COLLECTION));
+ solrClients.add(sc);
+ }
+ } else {
+ for (int i = 0; i < urls.length; i++) {
- SolrClient sc = new HttpSolrClient(urls[i]);
++ SolrClient sc = new HttpSolrClient(urls[i], HTTP_CLIENT);
+ solrClients.add(sc);
+ }
+ }
+
+ return solrClients;
+ }
+
+ public static CloudSolrClient getCloudSolrClient(String url) throws MalformedURLException {
- CloudSolrClient sc = new CloudSolrClient(url.replace('|', ','));
++ CloudSolrClient sc = new CloudSolrClient(url.replace('|', ','), HTTP_CLIENT);
+ sc.setParallelUpdates(true);
+ sc.connect();
+ return sc;
+ }
+
+ public static SolrClient getHttpSolrClient(String url) throws MalformedURLException {
- SolrClient sc =new HttpSolrClient(url);
++ SolrClient sc =new HttpSolrClient(url, HTTP_CLIENT);
+ return sc;
+ }
+
+ public static String stripNonCharCodepoints(String input) {
+ StringBuilder retval = new StringBuilder();
+ char ch;
+
+ for (int i = 0; i < input.length(); i++) {
+ ch = input.charAt(i);
+
+ // Strip all non-characters
+ // http://unicode.org/cldr/utility/list-unicodeset.jsp?a=[:Noncharacter_Code_Point=True:]
+ // and non-printable control characters except tabulator, new line and
+ // carriage return
+ if (ch % 0x10000 != 0xffff && // 0xffff - 0x10ffff range step 0x10000
+ ch % 0x10000 != 0xfffe && // 0xfffe - 0x10fffe range
+ (ch <= 0xfdd0 || ch >= 0xfdef) && // 0xfdd0 - 0xfdef
+ (ch > 0x1F || ch == 0x9 || ch == 0xa || ch == 0xd)) {
+
+ retval.append(ch);
+ }
+ }
+
+ return retval.toString();
+ }
+
+}