You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by jn...@apache.org on 2014/04/04 16:43:24 UTC
svn commit: r1584722 - in /nutch/trunk: CHANGES.txt ivy/ivy.xml
src/plugin/indexer-elastic/ivy.xml src/plugin/indexer-elastic/plugin.xml
src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
Author: jnioche
Date: Fri Apr 4 14:43:23 2014
New Revision: 1584722
URL: http://svn.apache.org/r1584722
Log:
NUTCH-1745 Upgraded to ElasticSearch 1.1.0
Modified:
nutch/trunk/CHANGES.txt
nutch/trunk/ivy/ivy.xml
nutch/trunk/src/plugin/indexer-elastic/ivy.xml
nutch/trunk/src/plugin/indexer-elastic/plugin.xml
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
Modified: nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1584722&r1=1584721&r2=1584722&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Fri Apr 4 14:43:23 2014
@@ -2,6 +2,8 @@ Nutch Change Log
Nutch Current Development
+* NUTCH-1745 Upgrade to ElasticSearch 1.1.0 (jnioche)
+
* NUTCH-1645 Junit Test Case for Adaptive Fetch Schedule class (Yasin Kılınç, lufeng, Sertac TURKEL via snagel)
* NUTCH-1737 Upgrade to recent JUnit 4.x (lewismc)
Modified: nutch/trunk/ivy/ivy.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/ivy/ivy.xml?rev=1584722&r1=1584721&r2=1584722&view=diff
==============================================================================
--- nutch/trunk/ivy/ivy.xml (original)
+++ nutch/trunk/ivy/ivy.xml Fri Apr 4 14:43:23 2014
@@ -34,9 +34,6 @@
</publications>
<dependencies>
- <dependency org="org.elasticsearch" name="elasticsearch"
- rev="0.90.1" conf="*->default" />
-
<dependency org="org.slf4j" name="slf4j-api" rev="1.6.1"
conf="*->master" />
<dependency org="org.slf4j" name="slf4j-log4j12" rev="1.6.1"
Modified: nutch/trunk/src/plugin/indexer-elastic/ivy.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/ivy.xml?rev=1584722&r1=1584721&r2=1584722&view=diff
==============================================================================
--- nutch/trunk/src/plugin/indexer-elastic/ivy.xml (original)
+++ nutch/trunk/src/plugin/indexer-elastic/ivy.xml Fri Apr 4 14:43:23 2014
@@ -36,7 +36,7 @@
</publications>
<dependencies>
- <dependency org="org.elasticsearch" name="elasticsearch" rev="0.90.1"
+ <dependency org="org.elasticsearch" name="elasticsearch" rev="1.1.0"
conf="*->default"/>
</dependencies>
Modified: nutch/trunk/src/plugin/indexer-elastic/plugin.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/plugin.xml?rev=1584722&r1=1584721&r2=1584722&view=diff
==============================================================================
--- nutch/trunk/src/plugin/indexer-elastic/plugin.xml (original)
+++ nutch/trunk/src/plugin/indexer-elastic/plugin.xml Fri Apr 4 14:43:23 2014
@@ -23,22 +23,21 @@
<export name="*" />
</library>
- <library name="elasticsearch-0.90.1.jar"/>
- <library name="jna-3.3.0.jar"/>
- <library name="jts-1.12.jar"/>
- <library name="log4j-1.2.17.jar"/>
- <library name="lucene-codecs-4.3.0.jar"/>
- <library name="lucene-core-4.3.0.jar"/>
- <library name="lucene-grouping-4.3.0.jar"/>
- <library name="lucene-highlighter-4.3.0.jar"/>
- <library name="lucene-join-4.3.0.jar"/>
- <library name="lucene-memory-4.3.0.jar"/>
- <library name="lucene-queries-4.3.0.jar"/>
- <library name="lucene-queryparser-4.3.0.jar"/>
- <library name="lucene-sandbox-4.3.0.jar"/>
- <library name="lucene-spatial-4.3.0.jar"/>
- <library name="lucene-suggest-4.3.0.jar"/>
- <library name="spatial4j-0.3.jar"/>
+ <library name="elasticsearch-1.1.0.jar"/>
+ <library name="lucene-analyzers-common-4.7.0.jar"/>
+ <library name="lucene-codecs-4.7.0.jar"/>
+ <library name="lucene-core-4.7.0.jar"/>
+ <library name="lucene-grouping-4.7.0.jar"/>
+ <library name="lucene-highlighter-4.7.0.jar"/>
+ <library name="lucene-join-4.7.0.jar"/>
+ <library name="lucene-memory-4.7.0.jar"/>
+ <library name="lucene-misc-4.7.0.jar"/>
+ <library name="lucene-queries-4.7.0.jar"/>
+ <library name="lucene-queryparser-4.7.0.jar"/>
+ <library name="lucene-sandbox-4.7.0.jar"/>
+ <library name="lucene-spatial-4.7.0.jar"/>
+ <library name="lucene-suggest-4.7.0.jar"/>
+ <library name="spatial4j-0.4.1.jar"/>
</runtime>
<requires>
Modified: nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java?rev=1584722&r1=1584721&r2=1584722&view=diff
==============================================================================
--- nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java (original)
+++ nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java Fri Apr 4 14:43:23 2014
@@ -14,36 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
package org.apache.nutch.indexwriter.elastic;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+import java.io.BufferedReader;
import java.io.IOException;
-import java.net.URL;
import java.util.HashMap;
import java.util.Map;
-import java.io.BufferedReader;
-import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.indexer.IndexWriter;
-import org.elasticsearch.ElasticSearchException;
+import org.apache.nutch.indexer.NutchDocument;
+import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.ImmutableSettings.Builder;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.node.Node;
import org.slf4j.Logger;
@@ -78,12 +75,15 @@ public class ElasticIndexWriter implemen
@Override
public void open(JobConf job, String name) throws IOException {
clusterName = job.get(ElasticConstants.CLUSTER);
+
host = job.get(ElasticConstants.HOST);
- port = job.getInt(ElasticConstants.PORT, -1);
+ port = job.getInt(ElasticConstants.PORT, 9300);
- Builder settingsBuilder = ImmutableSettings.settingsBuilder();
-
- BufferedReader reader = new BufferedReader(job.getConfResourceAsReader("elasticsearch.conf"));
+ Builder settingsBuilder = ImmutableSettings.settingsBuilder().classLoader(
+ Settings.class.getClassLoader());
+
+ BufferedReader reader = new BufferedReader(
+ job.getConfResourceAsReader("elasticsearch.conf"));
String line;
String parts[];
@@ -98,12 +98,16 @@ public class ElasticIndexWriter implemen
}
}
+ if (StringUtils.isNotBlank(clusterName))
+ settingsBuilder.put("cluster.name", clusterName);
+
// Set the cluster name and build the settings
- Settings settings = settingsBuilder.put("cluster.name", clusterName).build();
-
+ Settings settings = settingsBuilder.build();
+
// Prefer TransportClient
if (host != null && port > 1) {
- client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress(host, port));
+ client = new TransportClient(settings)
+ .addTransportAddress(new InetSocketTransportAddress(host, port));
} else if (clusterName != null) {
node = nodeBuilder().settings(settings).client(true).node();
client = node.client();
@@ -111,17 +115,18 @@ public class ElasticIndexWriter implemen
bulk = client.prepareBulk();
defaultIndex = job.get(ElasticConstants.INDEX, "nutch");
- maxBulkDocs = job.getInt(
- ElasticConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
- maxBulkLength = job.getInt(
- ElasticConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
+ maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS,
+ DEFAULT_MAX_BULK_DOCS);
+ maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH,
+ DEFAULT_MAX_BULK_LENGTH);
}
@Override
public void write(NutchDocument doc) throws IOException {
- String id = (String)doc.getFieldValue("url");
+ String id = (String) doc.getFieldValue("url");
String type = doc.getDocumentMeta().get("type");
- if (type == null) type = "doc";
+ if (type == null)
+ type = "doc";
IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id);
Map<String, Object> source = new HashMap<String, Object>();
@@ -130,7 +135,8 @@ public class ElasticIndexWriter implemen
for (String fieldName : doc.getFieldNames()) {
if (doc.getField(fieldName).getValues().size() > 1) {
source.put(fieldName, doc.getFieldValue(fieldName));
- // Loop through the values to keep track of the size of this document
+ // Loop through the values to keep track of the size of this
+ // document
for (Object value : doc.getField(fieldName).getValues()) {
bulkLength += value.toString().length();
}
@@ -148,30 +154,28 @@ public class ElasticIndexWriter implemen
if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = "
- + bulkLength + ", total docs = " + indexedDocs
- + ", last doc in bulk = '" + id + "']");
+ + bulkLength + ", total docs = " + indexedDocs
+ + ", last doc in bulk = '" + id + "']");
// Flush the bulk of indexing requests
createNewBulk = true;
commit();
}
}
-
@Override
public void delete(String key) throws IOException {
- try{
- DeleteRequestBuilder builder = client.prepareDelete();
+ try {
+ DeleteRequestBuilder builder = client.prepareDelete();
builder.setIndex(defaultIndex);
builder.setType("doc");
builder.setId(key);
builder.execute().actionGet();
- }catch(ElasticSearchException e)
- {
+ } catch (ElasticsearchException e) {
throw makeIOException(e);
}
}
- public static IOException makeIOException(ElasticSearchException e) {
+ public static IOException makeIOException(ElasticsearchException e) {
final IOException ioe = new IOException();
ioe.initCause(e);
return ioe;
@@ -192,13 +196,13 @@ public class ElasticIndexWriter implemen
for (BulkItemResponse item : actionGet) {
if (item.isFailed()) {
throw new RuntimeException("First failure in bulk: "
- + item.getFailureMessage());
+ + item.getFailureMessage());
}
}
}
long msWaited = System.currentTimeMillis() - beforeWait;
LOG.info("Previous took in ms " + actionGet.getTookInMillis()
- + ", including wait " + msWaited);
+ + ", including wait " + msWaited);
execute = null;
}
if (bulk != null) {
@@ -220,7 +224,7 @@ public class ElasticIndexWriter implemen
public void close() throws IOException {
// Flush pending requests
LOG.info("Processing remaining requests [docs = " + bulkDocs
- + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]");
+ + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]");
createNewBulk = false;
commit();
// flush one more time to finalize the last bulk
@@ -238,12 +242,16 @@ public class ElasticIndexWriter implemen
@Override
public String describe() {
StringBuffer sb = new StringBuffer("ElasticIndexWriter\n");
- sb.append("\t").append(ElasticConstants.CLUSTER).append(" : elastic prefix cluster\n");
+ sb.append("\t").append(ElasticConstants.CLUSTER)
+ .append(" : elastic prefix cluster\n");
sb.append("\t").append(ElasticConstants.HOST).append(" : hostname\n");
sb.append("\t").append(ElasticConstants.PORT).append(" : port\n");
- sb.append("\t").append(ElasticConstants.INDEX).append(" : elastic index command \n");
- sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS).append(" : elastic bulk index doc counts. (default 250) \n");
- sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH).append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n");
+ sb.append("\t").append(ElasticConstants.INDEX)
+ .append(" : elastic index command \n");
+ sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS)
+ .append(" : elastic bulk index doc counts. (default 250) \n");
+ sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH)
+ .append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n");
return sb.toString();
}
@@ -251,16 +259,18 @@ public class ElasticIndexWriter implemen
public void setConf(Configuration conf) {
config = conf;
String cluster = conf.get(ElasticConstants.CLUSTER);
- if (cluster == null) {
- String message = "Missing elastic.cluster. Should be set in nutch-site.xml ";
- message+="\n"+describe();
+ String host = conf.get(ElasticConstants.HOST);
+
+ if (StringUtils.isBlank(cluster) && StringUtils.isBlank(host)) {
+ String message = "Missing elastic.cluster and elastic.host. At least one of them should be set in nutch-site.xml ";
+ message += "\n" + describe();
LOG.error(message);
throw new RuntimeException(message);
}
}
-
+
@Override
public Configuration getConf() {
return config;
}
-}
+}