You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by le...@apache.org on 2014/01/16 23:05:25 UTC
svn commit: r1558929 - in /nutch/branches/2.x: ./ conf/ src/plugin/
src/plugin/indexer-elastic/ src/plugin/indexer-elastic/src/
src/plugin/indexer-elastic/src/java/ src/plugin/indexer-elastic/src/java/org/
src/plugin/indexer-elastic/src/java/org/apache...
Author: lewismc
Date: Thu Jan 16 22:05:24 2014
New Revision: 1558929
URL: http://svn.apache.org/r1558929
Log:
NUTCH-1655 Indexer Plugin for Elastic Search
Added:
nutch/branches/2.x/conf/elasticsearch.conf
nutch/branches/2.x/src/plugin/indexer-elastic/
nutch/branches/2.x/src/plugin/indexer-elastic/build.xml
nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml
nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml
nutch/branches/2.x/src/plugin/indexer-elastic/src/
nutch/branches/2.x/src/plugin/indexer-elastic/src/java/
nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/
nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/
nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/
nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/
nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/
nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
Modified:
nutch/branches/2.x/CHANGES.txt
nutch/branches/2.x/build.xml
nutch/branches/2.x/conf/nutch-default.xml
nutch/branches/2.x/src/plugin/build.xml
Modified: nutch/branches/2.x/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/CHANGES.txt?rev=1558929&r1=1558928&r2=1558929&view=diff
==============================================================================
--- nutch/branches/2.x/CHANGES.txt (original)
+++ nutch/branches/2.x/CHANGES.txt Thu Jan 16 22:05:24 2014
@@ -2,6 +2,8 @@ Nutch Change Log
Current Development
+* NUTCH-1655 Indexer Plugin for Elastic Search (Talat UYARER via lewismc)
+
* NUTCH-1699 Tika Parser - Image Parse Bug (Mehmet Zahid Yüzügüldü, snagel via lewismc)
* NUTCH-1568 port pluggable indexing architecture to 2.x (Talat UYARER via lewismc)
Modified: nutch/branches/2.x/build.xml
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/build.xml?rev=1558929&r1=1558928&r2=1558929&view=diff
==============================================================================
--- nutch/branches/2.x/build.xml (original)
+++ nutch/branches/2.x/build.xml Thu Jan 16 22:05:24 2014
@@ -933,6 +933,7 @@
<source path="${basedir}/src/plugin/feed/src/java/" />
<source path="${basedir}/src/plugin/feed/src/test/" /> -->
<source path="${basedir}/src/plugin/indexer-solr/src/java/" />
+ <source path="${basedir}/src/plugin/indexer-elastic/src/java/" />
<source path="${basedir}/src/plugin/index-anchor/src/java/" />
<source path="${basedir}/src/plugin/index-anchor/src/test/" />
<source path="${basedir}/src/plugin/index-basic/src/java/" />
Added: nutch/branches/2.x/conf/elasticsearch.conf
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/conf/elasticsearch.conf?rev=1558929&view=auto
==============================================================================
--- nutch/branches/2.x/conf/elasticsearch.conf (added)
+++ nutch/branches/2.x/conf/elasticsearch.conf Thu Jan 16 22:05:24 2014
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Settings for Elasticsearch indexer plugin
+# Format: key=value\n
Modified: nutch/branches/2.x/conf/nutch-default.xml
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/conf/nutch-default.xml?rev=1558929&r1=1558928&r2=1558929&view=diff
==============================================================================
--- nutch/branches/2.x/conf/nutch-default.xml (original)
+++ nutch/branches/2.x/conf/nutch-default.xml Thu Jan 16 22:05:24 2014
@@ -1149,10 +1149,33 @@
</property>
<!-- elasticsearch index properties -->
+<property>
+ <name>elastic.host</name>
+ <value></value>
+ <description>The hostname to send documents to using TransportClient.
+ Either host and port must be defined or cluster.
+ </description>
+</property>
+
+<property>
+ <name>elastic.port</name>
+ <value>9300</value>
+ <description>
+ The port to connect to using TransportClient.
+ </description>
+</property>
+
+<property>
+ <name>elastic.cluster</name>
+ <value></value>
+ <description>The cluster name to discover. Either host and potr must
+ be defined or cluster.
+ </description>
+</property>
<property>
<name>elastic.index</name>
- <value>index</value>
+ <value>nutch</value>
<description>
The name of the elasticsearch index. Will normally be autocreated if it
doesn't exist.
@@ -1161,18 +1184,20 @@
<property>
<name>elastic.max.bulk.docs</name>
- <value>500</value>
+ <value>250</value>
<description>
- The number of docs in the batch that will trigger a flush to elasticsearch.
+ The number of docs in the batch that will trigger a flush to
+ elasticsearch.
</description>
</property>
<property>
<name>elastic.max.bulk.size</name>
- <value>5001001</value>
- <description>
- The total length of all indexed text in a batch that will trigger a flush to
- elasticsearch, by checking after every document for excess of this amount.
+ <value>2500500</value>
+ <description>
+ The total length of all indexed text in a batch that will trigger a
+ flush to elasticsearch, by checking after every document for excess
+ of this amount.
</description>
</property>
Modified: nutch/branches/2.x/src/plugin/build.xml
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/build.xml?rev=1558929&r1=1558928&r2=1558929&view=diff
==============================================================================
--- nutch/branches/2.x/src/plugin/build.xml (original)
+++ nutch/branches/2.x/src/plugin/build.xml Thu Jan 16 22:05:24 2014
@@ -31,6 +31,7 @@
<ant dir="index-basic" target="deploy"/>
<ant dir="index-more" target="deploy"/>
<ant dir="indexer-solr" target="deploy"/>
+ <ant dir="indexer-elastic" target="deploy"/>
<ant dir="language-identifier" target="deploy"/>
<ant dir="lib-http" target="deploy"/>
<ant dir="lib-nekohtml" target="deploy"/>
@@ -112,6 +113,7 @@
<ant dir="index-basic" target="clean"/>
<ant dir="index-more" target="clean"/>
<ant dir="indexer-solr" target="clean"/>
+ <ant dir="indexer-elastic" target="clean"/>
<ant dir="language-identifier" target="clean"/>
<ant dir="lib-http" target="clean"/>
<ant dir="lib-nekohtml" target="clean"/>
Added: nutch/branches/2.x/src/plugin/indexer-elastic/build.xml
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/build.xml?rev=1558929&view=auto
==============================================================================
--- nutch/branches/2.x/src/plugin/indexer-elastic/build.xml (added)
+++ nutch/branches/2.x/src/plugin/indexer-elastic/build.xml Thu Jan 16 22:05:24 2014
@@ -0,0 +1,22 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="indexer-elastic" default="jar-core">
+
+ <import file="../build-plugin.xml" />
+
+</project>
Added: nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml?rev=1558929&view=auto
==============================================================================
--- nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml (added)
+++ nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml Thu Jan 16 22:05:24 2014
@@ -0,0 +1,35 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more contributor
+license agreements. See the NOTICE file distributed with this work for additional
+information regarding copyright ownership. The ASF licenses this file to
+You under the Apache License, Version 2.0 (the "License"); you may not use
+this file except in compliance with the License. You may obtain a copy of
+the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+by applicable law or agreed to in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+OF ANY KIND, either express or implied. See the License for the specific
+language governing permissions and limitations under the License. -->
+
+<ivy-module version="1.0">
+ <info organisation="org.apache.nutch" module="${ant.project.name}">
+ <license name="Apache 2.0" />
+ <ivyauthor name="Apache Nutch Team" url="http://nutch.apache.org" />
+ <description>Apache Nutch</description>
+ </info>
+
+ <configurations>
+ <include file="../../..//ivy/ivy-configurations.xml" />
+ </configurations>
+
+ <publications>
+ <!--get the artifact from our module name -->
+ <artifact conf="master" />
+ </publications>
+
+ <dependencies>
+ <dependency org="org.elasticsearch" name="elasticsearch"
+ rev="0.90.1" conf="*->default" />
+ </dependencies>
+
+</ivy-module>
Added: nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml?rev=1558929&view=auto
==============================================================================
--- nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml (added)
+++ nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml Thu Jan 16 22:05:24 2014
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<plugin id="indexer-elastic" name="ElasticIndexWriter" version="1.0.0"
+ provider-name="nutch.apache.org">
+
+<runtime>
+ <library name="indexer-elastic.jar">
+ <export name="*" />
+ </library>
+
+ <library name="elasticsearch-0.90.1.jar"/>
+ <library name="jna-3.3.0.jar"/>
+ <library name="jts-1.12.jar"/>
+ <library name="log4j-1.2.17.jar"/>
+ <library name="lucene-codecs-4.3.0.jar"/>
+ <library name="lucene-core-4.3.0.jar"/>
+ <library name="lucene-grouping-4.3.0.jar"/>
+ <library name="lucene-highlighter-4.3.0.jar"/>
+ <library name="lucene-join-4.3.0.jar"/>
+ <library name="lucene-memory-4.3.0.jar"/>
+ <library name="lucene-queries-4.3.0.jar"/>
+ <library name="lucene-queryparser-4.3.0.jar"/>
+ <library name="lucene-sandbox-4.3.0.jar"/>
+ <library name="lucene-spatial-4.3.0.jar"/>
+ <library name="lucene-suggest-4.3.0.jar"/>
+ <library name="spatial4j-0.3.jar"/>
+ </runtime>
+
+ <requires>
+ <import plugin="nutch-extensionpoints" />
+ </requires>
+
+ <extension id="org.apache.nutch.indexer.elastic"
+ name="Elasticsearch Index Writer"
+ point="org.apache.nutch.indexer.IndexWriter">
+ <implementation id="ElasticIndexWriter"
+ class="org.apache.nutch.indexwriter.elastic.ElasticIndexWriter" />
+ </extension>
+
+</plugin>
Added: nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java?rev=1558929&view=auto
==============================================================================
--- nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java (added)
+++ nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java Thu Jan 16 22:05:24 2014
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexwriter.elastic;
+
+public interface ElasticConstants {
+ public static final String ELASTIC_PREFIX = "elastic.";
+
+ public static final String HOST = ELASTIC_PREFIX + "host";
+ public static final String PORT = ELASTIC_PREFIX + "port";
+ public static final String CLUSTER = ELASTIC_PREFIX + "cluster";
+ public static final String INDEX = ELASTIC_PREFIX + "index";
+ public static final String MAX_BULK_DOCS = ELASTIC_PREFIX + "max.bulk.docs";
+ public static final String MAX_BULK_LENGTH = ELASTIC_PREFIX + "max.bulk.size";
+}
Added: nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java?rev=1558929&view=auto
==============================================================================
--- nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java (added)
+++ nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java Thu Jan 16 22:05:24 2014
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexwriter.elastic;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.io.BufferedReader;
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.IndexWriter;
+import org.elasticsearch.ElasticSearchException;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.settings.ImmutableSettings.Builder;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.node.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class ElasticIndexWriter implements IndexWriter {
+ public static Logger LOG = LoggerFactory.getLogger(ElasticIndexWriter.class);
+
+ private static final int DEFAULT_MAX_BULK_DOCS = 250;
+ private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
+
+ private Client client;
+ private Node node;
+ private String defaultIndex;
+
+ private Configuration config;
+
+ private BulkRequestBuilder bulk;
+ private ListenableActionFuture<BulkResponse> execute;
+ private int port = -1;
+ private String host = null;
+ private String clusterName = null;
+ private int maxBulkDocs;
+ private int maxBulkLength;
+ private long indexedDocs = 0;
+ private int bulkDocs = 0;
+ private int bulkLength = 0;
+ private boolean createNewBulk = false;
+
+ @Override
+ public void open(Configuration job) throws IOException {
+ clusterName = job.get(ElasticConstants.CLUSTER);
+ host = job.get(ElasticConstants.HOST);
+ port = job.getInt(ElasticConstants.PORT, -1);
+
+ Builder settingsBuilder = ImmutableSettings.settingsBuilder();
+
+ BufferedReader reader = new BufferedReader(job.getConfResourceAsReader("elasticsearch.conf"));
+ String line;
+ String parts[];
+
+ while ((line = reader.readLine()) != null) {
+ if (StringUtils.isNotBlank(line) && !line.startsWith("#")) {
+ line.trim();
+ parts = line.split("=");
+
+ if (parts.length == 2) {
+ settingsBuilder.put(parts[0].trim(), parts[1].trim());
+ }
+ }
+ }
+
+ // Set the cluster name and build the settings
+ Settings settings = settingsBuilder.put("cluster.name", clusterName).build();
+
+ // Prefer TransportClient
+ if (host != null && port > 1) {
+ client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress(host, port));
+ } else if (clusterName != null) {
+ node = nodeBuilder().settings(settings).client(true).node();
+ client = node.client();
+ }
+
+ bulk = client.prepareBulk();
+ defaultIndex = job.get(ElasticConstants.INDEX, "nutch");
+ maxBulkDocs = job.getInt(
+ ElasticConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
+ maxBulkLength = job.getInt(
+ ElasticConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
+ }
+ @Override
+ public void write(NutchDocument doc) throws IOException {
+ String id = (String)doc.getFieldValue("url");
+ String type = doc.getDocumentMeta().get("type");
+ if (type == null) type = "doc";
+ IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id);
+
+ Map<String, Object> source = new HashMap<String, Object>();
+
+ // Loop through all fields of this doc
+ for (String fieldName : doc.getFieldNames()) {
+ if (doc.getFieldValues(fieldName).size() > 1) {
+ source.put(fieldName, doc.getFieldValue(fieldName));
+ // Loop through the values to keep track of the size of this document
+ for (Object value : doc.getFieldValues(fieldName)) {
+ bulkLength += value.toString().length();
+ }
+ } else {
+ source.put(fieldName, doc.getFieldValue(fieldName));
+ bulkLength += doc.getFieldValue(fieldName).toString().length();
+ }
+ }
+ request.setSource(source);
+
+ // Add this indexing request to a bulk request
+ bulk.add(request);
+ indexedDocs++;
+ bulkDocs++;
+
+ if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
+ LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = "
+ + bulkLength + ", total docs = " + indexedDocs
+ + ", last doc in bulk = '" + id + "']");
+ // Flush the bulk of indexing requests
+ createNewBulk = true;
+ commit();
+ }
+ }
+
+
+ @Override
+ public void delete(String key) throws IOException {
+ try{
+ DeleteRequestBuilder builder = client.prepareDelete();
+ builder.setIndex(defaultIndex);
+ builder.setType("doc");
+ builder.setId(key);
+ builder.execute().actionGet();
+ }catch(ElasticSearchException e)
+ {
+ throw makeIOException(e);
+ }
+ }
+
+ public static IOException makeIOException(ElasticSearchException e) {
+ final IOException ioe = new IOException();
+ ioe.initCause(e);
+ return ioe;
+ }
+
+ @Override
+ public void update(NutchDocument doc) throws IOException {
+ write(doc);
+ }
+
+ @Override
+ public void commit() throws IOException {
+ if (execute != null) {
+ // wait for previous to finish
+ long beforeWait = System.currentTimeMillis();
+ BulkResponse actionGet = execute.actionGet();
+ if (actionGet.hasFailures()) {
+ for (BulkItemResponse item : actionGet) {
+ if (item.isFailed()) {
+ throw new RuntimeException("First failure in bulk: "
+ + item.getFailureMessage());
+ }
+ }
+ }
+ long msWaited = System.currentTimeMillis() - beforeWait;
+ LOG.info("Previous took in ms " + actionGet.getTookInMillis()
+ + ", including wait " + msWaited);
+ execute = null;
+ }
+ if (bulk != null) {
+ if (bulkDocs > 0) {
+ // start a flush, note that this is an asynchronous call
+ execute = bulk.execute();
+ }
+ bulk = null;
+ }
+ if (createNewBulk) {
+ // Prepare a new bulk request
+ bulk = client.prepareBulk();
+ bulkDocs = 0;
+ bulkLength = 0;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Flush pending requests
+ LOG.info("Processing remaining requests [docs = " + bulkDocs
+ + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]");
+ createNewBulk = false;
+ commit();
+ // flush one more time to finalize the last bulk
+ LOG.info("Processing to finalize last execute");
+ createNewBulk = false;
+ commit();
+
+ // Close
+ client.close();
+ if (node != null) {
+ node.close();
+ }
+ }
+
+ @Override
+ public String describe() {
+ StringBuffer sb = new StringBuffer("ElasticIndexWriter\n");
+ sb.append("\t").append(ElasticConstants.CLUSTER).append(" : elastic prefix cluster\n");
+ sb.append("\t").append(ElasticConstants.HOST).append(" : hostname\n");
+ sb.append("\t").append(ElasticConstants.PORT).append(" : port\n");
+ sb.append("\t").append(ElasticConstants.INDEX).append(" : elastic index command \n");
+ sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS).append(" : elastic bulk index doc counts. (default 250) \n");
+ sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH).append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n");
+ return sb.toString();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ config = conf;
+ String cluster = conf.get(ElasticConstants.CLUSTER);
+ if (cluster == null) {
+ String message = "Missing elastic.cluster. Should be set in nutch-site.xml ";
+ message+="\n"+describe();
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return config;
+ }
+}