You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by jn...@apache.org on 2014/04/04 16:43:24 UTC

svn commit: r1584722 - in /nutch/trunk: CHANGES.txt ivy/ivy.xml src/plugin/indexer-elastic/ivy.xml src/plugin/indexer-elastic/plugin.xml src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java

Author: jnioche
Date: Fri Apr  4 14:43:23 2014
New Revision: 1584722

URL: http://svn.apache.org/r1584722
Log:
NUTCH-1745 Upgraded to ElasticSearch 1.1.0

Modified:
    nutch/trunk/CHANGES.txt
    nutch/trunk/ivy/ivy.xml
    nutch/trunk/src/plugin/indexer-elastic/ivy.xml
    nutch/trunk/src/plugin/indexer-elastic/plugin.xml
    nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java

Modified: nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1584722&r1=1584721&r2=1584722&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Fri Apr  4 14:43:23 2014
@@ -2,6 +2,8 @@ Nutch Change Log
 
 Nutch Current Development
 
+* NUTCH-1745 Upgrade to ElasticSearch 1.1.0 (jnioche)
+
 * NUTCH-1645 Junit Test Case for Adaptive Fetch Schedule class (Yasin Kılınç, lufeng, Sertac TURKEL via snagel)
 
 * NUTCH-1737 Upgrade to recent JUnit 4.x (lewismc)

Modified: nutch/trunk/ivy/ivy.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/ivy/ivy.xml?rev=1584722&r1=1584721&r2=1584722&view=diff
==============================================================================
--- nutch/trunk/ivy/ivy.xml (original)
+++ nutch/trunk/ivy/ivy.xml Fri Apr  4 14:43:23 2014
@@ -34,9 +34,6 @@
 	</publications>
 
 	<dependencies>
-		<dependency org="org.elasticsearch" name="elasticsearch"
-			rev="0.90.1" conf="*->default" />
-
 		<dependency org="org.slf4j" name="slf4j-api" rev="1.6.1"
 			conf="*->master" />
 		<dependency org="org.slf4j" name="slf4j-log4j12" rev="1.6.1"

Modified: nutch/trunk/src/plugin/indexer-elastic/ivy.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/ivy.xml?rev=1584722&r1=1584721&r2=1584722&view=diff
==============================================================================
--- nutch/trunk/src/plugin/indexer-elastic/ivy.xml (original)
+++ nutch/trunk/src/plugin/indexer-elastic/ivy.xml Fri Apr  4 14:43:23 2014
@@ -36,7 +36,7 @@
   </publications>
 
   <dependencies>
-        <dependency org="org.elasticsearch" name="elasticsearch" rev="0.90.1"
+        <dependency org="org.elasticsearch" name="elasticsearch" rev="1.1.0"
                     conf="*->default"/>
   </dependencies>
   

Modified: nutch/trunk/src/plugin/indexer-elastic/plugin.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/plugin.xml?rev=1584722&r1=1584721&r2=1584722&view=diff
==============================================================================
--- nutch/trunk/src/plugin/indexer-elastic/plugin.xml (original)
+++ nutch/trunk/src/plugin/indexer-elastic/plugin.xml Fri Apr  4 14:43:23 2014
@@ -23,22 +23,21 @@
       <export name="*" />
     </library>
     
-    <library name="elasticsearch-0.90.1.jar"/>
-    <library name="jna-3.3.0.jar"/>
-    <library name="jts-1.12.jar"/>
-    <library name="log4j-1.2.17.jar"/>
-    <library name="lucene-codecs-4.3.0.jar"/>
-    <library name="lucene-core-4.3.0.jar"/>
-    <library name="lucene-grouping-4.3.0.jar"/>
-    <library name="lucene-highlighter-4.3.0.jar"/>
-    <library name="lucene-join-4.3.0.jar"/>
-    <library name="lucene-memory-4.3.0.jar"/>
-    <library name="lucene-queries-4.3.0.jar"/>
-    <library name="lucene-queryparser-4.3.0.jar"/>
-    <library name="lucene-sandbox-4.3.0.jar"/>
-    <library name="lucene-spatial-4.3.0.jar"/>
-    <library name="lucene-suggest-4.3.0.jar"/>
-    <library name="spatial4j-0.3.jar"/>
+    <library name="elasticsearch-1.1.0.jar"/>
+    <library name="lucene-analyzers-common-4.7.0.jar"/>
+    <library name="lucene-codecs-4.7.0.jar"/>
+    <library name="lucene-core-4.7.0.jar"/>
+    <library name="lucene-grouping-4.7.0.jar"/>
+    <library name="lucene-highlighter-4.7.0.jar"/>
+    <library name="lucene-join-4.7.0.jar"/>
+    <library name="lucene-memory-4.7.0.jar"/>
+    <library name="lucene-misc-4.7.0.jar"/>
+    <library name="lucene-queries-4.7.0.jar"/>
+    <library name="lucene-queryparser-4.7.0.jar"/>
+    <library name="lucene-sandbox-4.7.0.jar"/>
+    <library name="lucene-spatial-4.7.0.jar"/>
+    <library name="lucene-suggest-4.7.0.jar"/>
+    <library name="spatial4j-0.4.1.jar"/>
   </runtime>
 
   <requires>

Modified: nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java?rev=1584722&r1=1584721&r2=1584722&view=diff
==============================================================================
--- nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java (original)
+++ nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java Fri Apr  4 14:43:23 2014
@@ -14,36 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 package org.apache.nutch.indexwriter.elastic;
 
 import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
 
+import java.io.BufferedReader;
 import java.io.IOException;
-import java.net.URL;
 import java.util.HashMap;
 import java.util.Map;
-import java.io.BufferedReader;
-import java.io.IOException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.nutch.indexer.NutchDocument;
 import org.apache.nutch.indexer.IndexWriter;
-import org.elasticsearch.ElasticSearchException;
+import org.apache.nutch.indexer.NutchDocument;
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ListenableActionFuture;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.delete.DeleteRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.ImmutableSettings.Builder;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.node.Node;
 import org.slf4j.Logger;
@@ -78,12 +75,15 @@ public class ElasticIndexWriter implemen
   @Override
   public void open(JobConf job, String name) throws IOException {
     clusterName = job.get(ElasticConstants.CLUSTER);
+
     host = job.get(ElasticConstants.HOST);
-    port = job.getInt(ElasticConstants.PORT, -1);
+    port = job.getInt(ElasticConstants.PORT, 9300);
 
-    Builder settingsBuilder = ImmutableSettings.settingsBuilder();
-    
-    BufferedReader reader = new BufferedReader(job.getConfResourceAsReader("elasticsearch.conf"));
+    Builder settingsBuilder = ImmutableSettings.settingsBuilder().classLoader(
+        Settings.class.getClassLoader());
+
+    BufferedReader reader = new BufferedReader(
+        job.getConfResourceAsReader("elasticsearch.conf"));
     String line;
     String parts[];
 
@@ -98,12 +98,16 @@ public class ElasticIndexWriter implemen
       }
     }
 
+    if (StringUtils.isNotBlank(clusterName))
+      settingsBuilder.put("cluster.name", clusterName);
+
     // Set the cluster name and build the settings
-    Settings settings = settingsBuilder.put("cluster.name", clusterName).build();
-    
+    Settings settings = settingsBuilder.build();
+
     // Prefer TransportClient
     if (host != null && port > 1) {
-      client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress(host, port));
+      client = new TransportClient(settings)
+          .addTransportAddress(new InetSocketTransportAddress(host, port));
     } else if (clusterName != null) {
       node = nodeBuilder().settings(settings).client(true).node();
       client = node.client();
@@ -111,17 +115,18 @@ public class ElasticIndexWriter implemen
 
     bulk = client.prepareBulk();
     defaultIndex = job.get(ElasticConstants.INDEX, "nutch");
-    maxBulkDocs = job.getInt(
-            ElasticConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
-    maxBulkLength = job.getInt(
-            ElasticConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
+    maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS,
+        DEFAULT_MAX_BULK_DOCS);
+    maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH,
+        DEFAULT_MAX_BULK_LENGTH);
   }
 
   @Override
   public void write(NutchDocument doc) throws IOException {
-    String id = (String)doc.getFieldValue("url");
+    String id = (String) doc.getFieldValue("url");
     String type = doc.getDocumentMeta().get("type");
-    if (type == null) type = "doc";
+    if (type == null)
+      type = "doc";
     IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id);
 
     Map<String, Object> source = new HashMap<String, Object>();
@@ -130,7 +135,8 @@ public class ElasticIndexWriter implemen
     for (String fieldName : doc.getFieldNames()) {
       if (doc.getField(fieldName).getValues().size() > 1) {
         source.put(fieldName, doc.getFieldValue(fieldName));
-        // Loop through the values to keep track of the size of this document
+        // Loop through the values to keep track of the size of this
+        // document
         for (Object value : doc.getField(fieldName).getValues()) {
           bulkLength += value.toString().length();
         }
@@ -148,30 +154,28 @@ public class ElasticIndexWriter implemen
 
     if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
       LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = "
-              + bulkLength + ", total docs = " + indexedDocs
-              + ", last doc in bulk = '" + id + "']");
+          + bulkLength + ", total docs = " + indexedDocs
+          + ", last doc in bulk = '" + id + "']");
       // Flush the bulk of indexing requests
       createNewBulk = true;
       commit();
     }
   }
 
-
   @Override
   public void delete(String key) throws IOException {
-    try{
-      DeleteRequestBuilder builder =  client.prepareDelete();
+    try {
+      DeleteRequestBuilder builder = client.prepareDelete();
       builder.setIndex(defaultIndex);
       builder.setType("doc");
       builder.setId(key);
       builder.execute().actionGet();
-    }catch(ElasticSearchException e)
-    {
+    } catch (ElasticsearchException e) {
       throw makeIOException(e);
     }
   }
 
-  public static IOException makeIOException(ElasticSearchException e) {
+  public static IOException makeIOException(ElasticsearchException e) {
     final IOException ioe = new IOException();
     ioe.initCause(e);
     return ioe;
@@ -192,13 +196,13 @@ public class ElasticIndexWriter implemen
         for (BulkItemResponse item : actionGet) {
           if (item.isFailed()) {
             throw new RuntimeException("First failure in bulk: "
-                    + item.getFailureMessage());
+                + item.getFailureMessage());
           }
         }
       }
       long msWaited = System.currentTimeMillis() - beforeWait;
       LOG.info("Previous took in ms " + actionGet.getTookInMillis()
-              + ", including wait " + msWaited);
+          + ", including wait " + msWaited);
       execute = null;
     }
     if (bulk != null) {
@@ -220,7 +224,7 @@ public class ElasticIndexWriter implemen
   public void close() throws IOException {
     // Flush pending requests
     LOG.info("Processing remaining requests [docs = " + bulkDocs
-            + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]");
+        + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]");
     createNewBulk = false;
     commit();
     // flush one more time to finalize the last bulk
@@ -238,12 +242,16 @@ public class ElasticIndexWriter implemen
   @Override
   public String describe() {
     StringBuffer sb = new StringBuffer("ElasticIndexWriter\n");
-    sb.append("\t").append(ElasticConstants.CLUSTER).append(" : elastic prefix cluster\n");
+    sb.append("\t").append(ElasticConstants.CLUSTER)
+        .append(" : elastic prefix cluster\n");
     sb.append("\t").append(ElasticConstants.HOST).append(" : hostname\n");
     sb.append("\t").append(ElasticConstants.PORT).append(" : port\n");
-    sb.append("\t").append(ElasticConstants.INDEX).append(" : elastic index command \n");
-    sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS).append(" : elastic bulk index doc counts. (default 250) \n");
-    sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH).append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n");
+    sb.append("\t").append(ElasticConstants.INDEX)
+        .append(" : elastic index command \n");
+    sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS)
+        .append(" : elastic bulk index doc counts. (default 250) \n");
+    sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH)
+        .append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n");
     return sb.toString();
   }
 
@@ -251,16 +259,18 @@ public class ElasticIndexWriter implemen
   public void setConf(Configuration conf) {
     config = conf;
     String cluster = conf.get(ElasticConstants.CLUSTER);
-    if (cluster == null) {
-      String message = "Missing elastic.cluster. Should be set in nutch-site.xml ";
-      message+="\n"+describe();
+    String host = conf.get(ElasticConstants.HOST);
+
+    if (StringUtils.isBlank(cluster) && StringUtils.isBlank(host)) {
+      String message = "Missing elastic.cluster and elastic.host. At least one of them should be set in nutch-site.xml ";
+      message += "\n" + describe();
       LOG.error(message);
       throw new RuntimeException(message);
     }
   }
-    
+
   @Override
   public Configuration getConf() {
     return config;
   }
-} 
+}