You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by le...@apache.org on 2014/01/16 23:05:25 UTC

svn commit: r1558929 - in /nutch/branches/2.x: ./ conf/ src/plugin/ src/plugin/indexer-elastic/ src/plugin/indexer-elastic/src/ src/plugin/indexer-elastic/src/java/ src/plugin/indexer-elastic/src/java/org/ src/plugin/indexer-elastic/src/java/org/apache...

Author: lewismc
Date: Thu Jan 16 22:05:24 2014
New Revision: 1558929

URL: http://svn.apache.org/r1558929
Log:
NUTCH-1655 Indexer Plugin for Elastic Search

Added:
    nutch/branches/2.x/conf/elasticsearch.conf
    nutch/branches/2.x/src/plugin/indexer-elastic/
    nutch/branches/2.x/src/plugin/indexer-elastic/build.xml
    nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml
    nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml
    nutch/branches/2.x/src/plugin/indexer-elastic/src/
    nutch/branches/2.x/src/plugin/indexer-elastic/src/java/
    nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/
    nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/
    nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/
    nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/
    nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/
    nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
    nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
Modified:
    nutch/branches/2.x/CHANGES.txt
    nutch/branches/2.x/build.xml
    nutch/branches/2.x/conf/nutch-default.xml
    nutch/branches/2.x/src/plugin/build.xml

Modified: nutch/branches/2.x/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/CHANGES.txt?rev=1558929&r1=1558928&r2=1558929&view=diff
==============================================================================
--- nutch/branches/2.x/CHANGES.txt (original)
+++ nutch/branches/2.x/CHANGES.txt Thu Jan 16 22:05:24 2014
@@ -2,6 +2,8 @@ Nutch Change Log
 
 Current Development
 
+* NUTCH-1655 Indexer Plugin for Elastic Search (Talat UYARER via lewismc)
+
 * NUTCH-1699 Tika Parser - Image Parse Bug (Mehmet Zahid Yüzügüldü, snagel via lewismc)
 
 * NUTCH-1568 port pluggable indexing architecture to 2.x (Talat UYARER via lewismc)

Modified: nutch/branches/2.x/build.xml
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/build.xml?rev=1558929&r1=1558928&r2=1558929&view=diff
==============================================================================
--- nutch/branches/2.x/build.xml (original)
+++ nutch/branches/2.x/build.xml Thu Jan 16 22:05:24 2014
@@ -933,6 +933,7 @@
         <source path="${basedir}/src/plugin/feed/src/java/" />
         <source path="${basedir}/src/plugin/feed/src/test/" /> -->
         <source path="${basedir}/src/plugin/indexer-solr/src/java/" />
+      	<source path="${basedir}/src/plugin/indexer-elastic/src/java/" />
       	<source path="${basedir}/src/plugin/index-anchor/src/java/" />
         <source path="${basedir}/src/plugin/index-anchor/src/test/" />
         <source path="${basedir}/src/plugin/index-basic/src/java/" />

Added: nutch/branches/2.x/conf/elasticsearch.conf
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/conf/elasticsearch.conf?rev=1558929&view=auto
==============================================================================
--- nutch/branches/2.x/conf/elasticsearch.conf (added)
+++ nutch/branches/2.x/conf/elasticsearch.conf Thu Jan 16 22:05:24 2014
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Settings for Elasticsearch indexer plugin
+# Format: key=value\n

Modified: nutch/branches/2.x/conf/nutch-default.xml
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/conf/nutch-default.xml?rev=1558929&r1=1558928&r2=1558929&view=diff
==============================================================================
--- nutch/branches/2.x/conf/nutch-default.xml (original)
+++ nutch/branches/2.x/conf/nutch-default.xml Thu Jan 16 22:05:24 2014
@@ -1149,10 +1149,33 @@
 </property>
 
 <!-- elasticsearch index properties -->
+<property>
+  <name>elastic.host</name>
+  <value></value>
+  <description>The hostname to send documents to using TransportClient.
+  Either host and port must be defined or cluster.
+  </description>
+</property>
+
+<property>
+  <name>elastic.port</name>
+  <value>9300</value>
+  <description>
+  The port to connect to using TransportClient.
+  </description>
+</property>
+
+<property>
+  <name>elastic.cluster</name>
+  <value></value>
+  <description>The cluster name to discover. Either host and potr must
+  be defined or cluster.
+  </description>
+</property>
 
 <property>
   <name>elastic.index</name>
-  <value>index</value>
+  <value>nutch</value>
   <description>
   The name of the elasticsearch index. Will normally be autocreated if it
   doesn't exist.
@@ -1161,18 +1184,20 @@
 
 <property>
   <name>elastic.max.bulk.docs</name>
-  <value>500</value>
+  <value>250</value>
   <description>
-  The number of docs in the batch that will trigger a flush to elasticsearch.
+  The number of docs in the batch that will trigger a flush to
+  elasticsearch.
   </description>
 </property>
 
 <property>
   <name>elastic.max.bulk.size</name>
-  <value>5001001</value>
-  <description> 
-  The total length of all indexed text in a batch that will trigger a flush to
-  elasticsearch, by checking after every document for excess of this amount.
+  <value>2500500</value>
+  <description>
+  The total length of all indexed text in a batch that will trigger a
+  flush to elasticsearch, by checking after every document for excess 
+  of this amount.
   </description>
 </property>
 

Modified: nutch/branches/2.x/src/plugin/build.xml
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/build.xml?rev=1558929&r1=1558928&r2=1558929&view=diff
==============================================================================
--- nutch/branches/2.x/src/plugin/build.xml (original)
+++ nutch/branches/2.x/src/plugin/build.xml Thu Jan 16 22:05:24 2014
@@ -31,6 +31,7 @@
      <ant dir="index-basic" target="deploy"/>
      <ant dir="index-more" target="deploy"/>
      <ant dir="indexer-solr" target="deploy"/>
+     <ant dir="indexer-elastic" target="deploy"/>
      <ant dir="language-identifier" target="deploy"/>
      <ant dir="lib-http" target="deploy"/>
      <ant dir="lib-nekohtml" target="deploy"/>
@@ -112,6 +113,7 @@
     <ant dir="index-basic" target="clean"/>
     <ant dir="index-more" target="clean"/>
     <ant dir="indexer-solr" target="clean"/>
+    <ant dir="indexer-elastic" target="clean"/>
     <ant dir="language-identifier" target="clean"/>
     <ant dir="lib-http" target="clean"/>
     <ant dir="lib-nekohtml" target="clean"/>

Added: nutch/branches/2.x/src/plugin/indexer-elastic/build.xml
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/build.xml?rev=1558929&view=auto
==============================================================================
--- nutch/branches/2.x/src/plugin/indexer-elastic/build.xml (added)
+++ nutch/branches/2.x/src/plugin/indexer-elastic/build.xml Thu Jan 16 22:05:24 2014
@@ -0,0 +1,22 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="indexer-elastic" default="jar-core">
+
+  <import file="../build-plugin.xml" />
+
+</project>

Added: nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml?rev=1558929&view=auto
==============================================================================
--- nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml (added)
+++ nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml Thu Jan 16 22:05:24 2014
@@ -0,0 +1,35 @@
+<?xml version="1.0" ?>
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+license agreements. See the NOTICE file distributed with this work for additional 
+information regarding copyright ownership. The ASF licenses this file to 
+You under the Apache License, Version 2.0 (the "License"); you may not use 
+this file except in compliance with the License. You may obtain a copy of 
+the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+by applicable law or agreed to in writing, software distributed under the 
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+OF ANY KIND, either express or implied. See the License for the specific 
+language governing permissions and limitations under the License. -->
+
+<ivy-module version="1.0">
+  <info organisation="org.apache.nutch" module="${ant.project.name}">
+    <license name="Apache 2.0" />
+    <ivyauthor name="Apache Nutch Team" url="http://nutch.apache.org" />
+    <description>Apache Nutch</description>
+  </info>
+
+  <configurations>
+    <include file="../../..//ivy/ivy-configurations.xml" />
+  </configurations>
+
+  <publications>
+    <!--get the artifact from our module name -->
+    <artifact conf="master" />
+  </publications>
+
+  <dependencies>
+    <dependency org="org.elasticsearch" name="elasticsearch"
+      rev="0.90.1" conf="*->default" />
+  </dependencies>
+
+</ivy-module>

Added: nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml?rev=1558929&view=auto
==============================================================================
--- nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml (added)
+++ nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml Thu Jan 16 22:05:24 2014
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<plugin id="indexer-elastic" name="ElasticIndexWriter" version="1.0.0"
+  provider-name="nutch.apache.org">
+
+<runtime>
+    <library name="indexer-elastic.jar">
+      <export name="*" />
+    </library>
+    
+    <library name="elasticsearch-0.90.1.jar"/>
+    <library name="jna-3.3.0.jar"/>
+    <library name="jts-1.12.jar"/>
+    <library name="log4j-1.2.17.jar"/>
+    <library name="lucene-codecs-4.3.0.jar"/>
+    <library name="lucene-core-4.3.0.jar"/>
+    <library name="lucene-grouping-4.3.0.jar"/>
+    <library name="lucene-highlighter-4.3.0.jar"/>
+    <library name="lucene-join-4.3.0.jar"/>
+    <library name="lucene-memory-4.3.0.jar"/>
+    <library name="lucene-queries-4.3.0.jar"/>
+    <library name="lucene-queryparser-4.3.0.jar"/>
+    <library name="lucene-sandbox-4.3.0.jar"/>
+    <library name="lucene-spatial-4.3.0.jar"/>
+    <library name="lucene-suggest-4.3.0.jar"/>
+    <library name="spatial4j-0.3.jar"/>
+  </runtime>
+
+  <requires>
+    <import plugin="nutch-extensionpoints" />
+  </requires>
+
+  <extension id="org.apache.nutch.indexer.elastic"
+    name="Elasticsearch Index Writer"
+    point="org.apache.nutch.indexer.IndexWriter">
+    <implementation id="ElasticIndexWriter"
+      class="org.apache.nutch.indexwriter.elastic.ElasticIndexWriter" />
+  </extension>
+
+</plugin>

Added: nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java?rev=1558929&view=auto
==============================================================================
--- nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java (added)
+++ nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java Thu Jan 16 22:05:24 2014
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexwriter.elastic;
+
+public interface ElasticConstants {
+  public static final String ELASTIC_PREFIX = "elastic.";
+
+  public static final String HOST = ELASTIC_PREFIX + "host";
+  public static final String PORT = ELASTIC_PREFIX + "port";
+  public static final String CLUSTER = ELASTIC_PREFIX + "cluster";
+  public static final String INDEX = ELASTIC_PREFIX + "index";
+  public static final String MAX_BULK_DOCS = ELASTIC_PREFIX + "max.bulk.docs";
+  public static final String MAX_BULK_LENGTH = ELASTIC_PREFIX + "max.bulk.size";
+}

Added: nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java?rev=1558929&view=auto
==============================================================================
--- nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java (added)
+++ nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java Thu Jan 16 22:05:24 2014
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.nutch.indexwriter.elastic;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.io.BufferedReader;
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.IndexWriter;
+import org.elasticsearch.ElasticSearchException;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.settings.ImmutableSettings.Builder;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.node.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class ElasticIndexWriter implements IndexWriter {
+  public static Logger LOG = LoggerFactory.getLogger(ElasticIndexWriter.class);
+
+  private static final int DEFAULT_MAX_BULK_DOCS = 250;
+  private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
+
+  private Client client;
+  private Node node;
+  private String defaultIndex;
+
+  private Configuration config;
+
+  private BulkRequestBuilder bulk;
+  private ListenableActionFuture<BulkResponse> execute;
+  private int port = -1;
+  private String host = null;
+  private String clusterName = null;
+  private int maxBulkDocs;
+  private int maxBulkLength;
+  private long indexedDocs = 0;
+  private int bulkDocs = 0;
+  private int bulkLength = 0;
+  private boolean createNewBulk = false;
+
+  @Override
+  public void open(Configuration job) throws IOException {
+    clusterName = job.get(ElasticConstants.CLUSTER);
+    host = job.get(ElasticConstants.HOST);
+    port = job.getInt(ElasticConstants.PORT, -1);
+
+    Builder settingsBuilder = ImmutableSettings.settingsBuilder();
+    
+    BufferedReader reader = new BufferedReader(job.getConfResourceAsReader("elasticsearch.conf"));
+    String line;
+    String parts[];
+
+    while ((line = reader.readLine()) != null) {
+      if (StringUtils.isNotBlank(line) && !line.startsWith("#")) {
+        line.trim();
+        parts = line.split("=");
+
+        if (parts.length == 2) {
+          settingsBuilder.put(parts[0].trim(), parts[1].trim());
+        }
+      }
+    }
+
+    // Set the cluster name and build the settings
+    Settings settings = settingsBuilder.put("cluster.name", clusterName).build();
+    
+    // Prefer TransportClient
+    if (host != null && port > 1) {
+      client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress(host, port));
+    } else if (clusterName != null) {
+      node = nodeBuilder().settings(settings).client(true).node();
+      client = node.client();
+    }
+
+    bulk = client.prepareBulk();
+    defaultIndex = job.get(ElasticConstants.INDEX, "nutch");
+    maxBulkDocs = job.getInt(
+            ElasticConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
+    maxBulkLength = job.getInt(
+            ElasticConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
+  }
+  @Override
+  public void write(NutchDocument doc) throws IOException {
+    String id = (String)doc.getFieldValue("url");
+    String type = doc.getDocumentMeta().get("type");
+    if (type == null) type = "doc";
+    IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id);
+
+    Map<String, Object> source = new HashMap<String, Object>();
+
+    // Loop through all fields of this doc
+    for (String fieldName : doc.getFieldNames()) {
+      if (doc.getFieldValues(fieldName).size() > 1) {
+        source.put(fieldName, doc.getFieldValue(fieldName));
+        // Loop through the values to keep track of the size of this document
+        for (Object value : doc.getFieldValues(fieldName)) {
+          bulkLength += value.toString().length();
+        }
+      } else {
+        source.put(fieldName, doc.getFieldValue(fieldName));
+        bulkLength += doc.getFieldValue(fieldName).toString().length();
+      }
+    }
+    request.setSource(source);
+
+    // Add this indexing request to a bulk request
+    bulk.add(request);
+    indexedDocs++;
+    bulkDocs++;
+
+    if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
+      LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = "
+              + bulkLength + ", total docs = " + indexedDocs
+              + ", last doc in bulk = '" + id + "']");
+      // Flush the bulk of indexing requests
+      createNewBulk = true;
+      commit();
+    }
+  }
+
+
+  @Override
+  public void delete(String key) throws IOException {
+    try{
+      DeleteRequestBuilder builder =  client.prepareDelete();
+      builder.setIndex(defaultIndex);
+      builder.setType("doc");      
+      builder.setId(key);
+      builder.execute().actionGet();
+    }catch(ElasticSearchException e)
+    {
+      throw makeIOException(e);
+    }
+  }
+
+  public static IOException makeIOException(ElasticSearchException e) {
+    final IOException ioe = new IOException();
+    ioe.initCause(e);
+    return ioe;
+  }
+
+  @Override
+  public void update(NutchDocument doc) throws IOException {
+    write(doc);
+  }
+
+  @Override
+  public void commit() throws IOException {
+    if (execute != null) {
+      // wait for previous to finish
+      long beforeWait = System.currentTimeMillis();
+      BulkResponse actionGet = execute.actionGet();
+      if (actionGet.hasFailures()) {
+        for (BulkItemResponse item : actionGet) {
+          if (item.isFailed()) {
+            throw new RuntimeException("First failure in bulk: "
+                    + item.getFailureMessage());
+          }
+        }
+      }
+      long msWaited = System.currentTimeMillis() - beforeWait;
+      LOG.info("Previous took in ms " + actionGet.getTookInMillis()
+              + ", including wait " + msWaited);
+      execute = null;
+    }
+    if (bulk != null) {
+      if (bulkDocs > 0) {
+        // start a flush, note that this is an asynchronous call
+        execute = bulk.execute();
+      }
+      bulk = null;
+    }
+    if (createNewBulk) {
+      // Prepare a new bulk request
+      bulk = client.prepareBulk();
+      bulkDocs = 0;
+      bulkLength = 0;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    // Flush pending requests
+    LOG.info("Processing remaining requests [docs = " + bulkDocs
+            + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]");
+    createNewBulk = false;
+    commit();
+    // flush one more time to finalize the last bulk
+    LOG.info("Processing to finalize last execute");
+    createNewBulk = false;
+    commit();
+
+    // Close
+    client.close();
+    if (node != null) {
+      node.close();
+    }
+  }
+
+  @Override
+  public String describe() {
+    StringBuffer sb = new StringBuffer("ElasticIndexWriter\n");
+    sb.append("\t").append(ElasticConstants.CLUSTER).append(" : elastic prefix cluster\n");
+    sb.append("\t").append(ElasticConstants.HOST).append(" : hostname\n");
+    sb.append("\t").append(ElasticConstants.PORT).append(" : port\n");
+    sb.append("\t").append(ElasticConstants.INDEX).append(" : elastic index command \n");
+    sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS).append(" : elastic bulk index doc counts. (default 250) \n");
+    sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH).append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n");
+    return sb.toString();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    config = conf;
+    String cluster = conf.get(ElasticConstants.CLUSTER);
+    if (cluster == null) {
+      String message = "Missing elastic.cluster. Should be set in nutch-site.xml ";
+      message+="\n"+describe();
+      LOG.error(message);
+      throw new RuntimeException(message);
+    }
+  }
+    
+  @Override
+  public Configuration getConf() {
+    return config;
+  }  
+}