You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ma...@apache.org on 2016/02/03 14:51:10 UTC
svn commit: r1728313 - in /nutch/trunk: ./ src/plugin/indexer-solr/
src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/
Author: markus
Date: Wed Feb 3 13:51:10 2016
New Revision: 1728313
URL: http://svn.apache.org/viewvc?rev=1728313&view=rev
Log:
NUTCH-2197 Add Solr 5 cloud indexer support
Modified:
nutch/trunk/CHANGES.txt
nutch/trunk/src/plugin/indexer-solr/ivy.xml
nutch/trunk/src/plugin/indexer-solr/plugin.xml
nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java
nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java
nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
Modified: nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1728313&r1=1728312&r2=1728313&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Wed Feb 3 13:51:10 2016
@@ -1,5 +1,7 @@
Nutch Change Log
+* NUTCH-2197 Add Solr 5 cloud indexer support (Jurian Broertjes via markus)
+
* NUTCH-2206 Provide example scoring.similarity.stopword.file (sujen)
* NUTCH-2204 Remove junit lib from runtime (snagel)
Modified: nutch/trunk/src/plugin/indexer-solr/ivy.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-solr/ivy.xml?rev=1728313&r1=1728312&r2=1728313&view=diff
==============================================================================
--- nutch/trunk/src/plugin/indexer-solr/ivy.xml (original)
+++ nutch/trunk/src/plugin/indexer-solr/ivy.xml Wed Feb 3 13:51:10 2016
@@ -36,9 +36,9 @@
</publications>
<dependencies>
- <dependency org="org.apache.solr" name="solr-solrj" rev="4.10.2" conf="*->default"/>
- <dependency org="org.apache.httpcomponents" name="httpclient" rev="4.3.1" conf="*->default"/>
- <dependency org="org.apache.httpcomponents" name="httpmime" rev="4.3.1" conf="*->default"/>
+ <dependency org="org.apache.solr" name="solr-solrj" rev="5.4.1"/>
+ <dependency org="org.apache.httpcomponents" name="httpcore" rev="4.4.1" conf="*->default"/>
+ <dependency org="org.apache.httpcomponents" name="httpmime" rev="4.4.1" conf="*->default"/>
</dependencies>
</ivy-module>
Modified: nutch/trunk/src/plugin/indexer-solr/plugin.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-solr/plugin.xml?rev=1728313&r1=1728312&r2=1728313&view=diff
==============================================================================
--- nutch/trunk/src/plugin/indexer-solr/plugin.xml (original)
+++ nutch/trunk/src/plugin/indexer-solr/plugin.xml Wed Feb 3 13:51:10 2016
@@ -22,17 +22,16 @@
<library name="indexer-solr.jar">
<export name="*" />
</library>
- <library name="commons-codec-1.9.jar"/>
- <library name="commons-io-2.3.jar"/>
- <library name="commons-logging-1.1.3.jar"/>
- <library name="httpclient-4.3.1.jar"/>
- <library name="httpcore-4.3.jar"/>
- <library name="httpmime-4.3.1.jar"/>
- <library name="noggit-0.5.jar"/>
- <library name="slf4j-api-1.7.6.jar"/>
- <library name="solr-solrj-4.10.2.jar"/>
- <library name="wstx-asl-3.2.7.jar"/>
- <library name="zookeeper-3.4.6.jar"/>
+ <library name="commons-io-2.4.jar"/>
+ <library name="httpclient-4.4.1.jar"/>
+ <library name="httpcore-4.4.1.jar"/>
+ <library name="httpmime-4.4.1.jar"/>
+ <library name="noggit-0.6.jar"/>
+ <library name="slf4j-api-1.7.7.jar"/>
+ <library name="solr-solrj-5.4.1.jar"/>
+ <library name="stax2-api-3.1.4.jar"/>
+ <library name="woodstox-core-asl-4.4.1.jar"/>
+ <library name="zookeeper-3.4.6.jar"/>
</runtime>
<requires>
Modified: nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java?rev=1728313&r1=1728312&r2=1728313&view=diff
==============================================================================
--- nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java (original)
+++ nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java Wed Feb 3 13:51:10 2016
@@ -17,7 +17,6 @@
package org.apache.nutch.indexwriter.solr;
public interface SolrConstants {
-
public static final String SOLR_PREFIX = "solr.";
public static final String SERVER_URL = SOLR_PREFIX + "server.url";
@@ -31,13 +30,23 @@ public interface SolrConstants {
public static final String USERNAME = SOLR_PREFIX + "auth.username";
public static final String PASSWORD = SOLR_PREFIX + "auth.password";
-
- public static final String SERVER_TYPE = SOLR_PREFIX + "server.type";
-
- public static final String ZOOKEEPER_URL = SOLR_PREFIX + "zookeeper.url";
-
- public static final String LOADBALANCE_URLS = SOLR_PREFIX + "loadbalance.urls";
-
+
+ public static final String COLLECTION = SOLR_PREFIX + "collection";
+
+ public static final String ZOOKEEPER_HOSTS = SOLR_PREFIX + "zookeeper.hosts";
+
+ public static final String ID_FIELD = "id";
+
+ public static final String URL_FIELD = "url";
+
+ public static final String BOOST_FIELD = "boost";
+
+ public static final String TIMESTAMP_FIELD = "tstamp";
+
+ public static final String DIGEST_FIELD = "digest";
+
+
+
@Deprecated
public static final String COMMIT_INDEX = SOLR_PREFIX + "commit.index";
Modified: nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java?rev=1728313&r1=1728312&r2=1728313&view=diff
==============================================================================
--- nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java (original)
+++ nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java Wed Feb 3 13:51:10 2016
@@ -17,6 +17,7 @@
package org.apache.nutch.indexwriter.solr;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -28,21 +29,33 @@ import org.apache.nutch.indexer.IndexWri
import org.apache.nutch.indexer.IndexerMapReduce;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.indexer.NutchField;
-import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.DateUtil;
+import org.apache.solr.common.util.NamedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.util.NutchConfiguration;
+
+// WORK AROUND FOR NOT REMOVING URL ENCODED URLS!!!
+import java.net.URLDecoder;
+
public class SolrIndexWriter implements IndexWriter {
public static final Logger LOG = LoggerFactory
.getLogger(SolrIndexWriter.class);
- private SolrServer solr;
+ private List<SolrClient> solrClients;
private SolrMappingReader solrMapping;
private ModifiableSolrParams params;
@@ -50,18 +63,24 @@ public class SolrIndexWriter implements
private final List<SolrInputDocument> inputDocs = new ArrayList<SolrInputDocument>();
+ private final List<SolrInputDocument> updateDocs = new ArrayList<SolrInputDocument>();
+
+ private final List<String> deleteIds = new ArrayList<String>();
+
private int batchSize;
private int numDeletes = 0;
+ private int totalAdds = 0;
+ private int totalDeletes = 0;
+ private int totalUpdates = 0;
private boolean delete = false;
public void open(JobConf job, String name) throws IOException {
- SolrServer server = SolrUtils.getSolrServer(job);
- init(server, job);
+ solrClients = SolrUtils.getSolrClients(job);
+ init(solrClients, job);
}
// package protected for tests
- void init(SolrServer server, JobConf job) throws IOException {
- solr = server;
+ void init(List<SolrClient> solrClients, JobConf job) throws IOException {
batchSize = job.getInt(SolrConstants.COMMIT_SIZE, 1000);
solrMapping = SolrMappingReader.getInstance(job);
delete = job.getBoolean(IndexerMapReduce.INDEXER_DELETE, false);
@@ -81,13 +100,38 @@ public class SolrIndexWriter implements
}
public void delete(String key) throws IOException {
+ try {
+ key = URLDecoder.decode(key, "UTF8");
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("Error decoding: " + key);
+ throw new IOException("UnsupportedEncodingException for " + key);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Could not decode: " + key + ", it probably wasn't encoded in the first place..");
+ }
+
+ // escape solr hash separator
+ key = key.replaceAll("!", "\\!");
+
if (delete) {
- try {
- solr.deleteById(key);
- numDeletes++;
- } catch (final SolrServerException e) {
- throw makeIOException(e);
+ deleteIds.add(key);
+ totalDeletes++;
+ }
+
+ if (deleteIds.size() >= batchSize) {
+ push();
+ }
+
+ }
+
+ public void deleteByQuery(String query) throws IOException {
+ try {
+ LOG.info("SolrWriter: deleting " + query);
+ for (SolrClient solrClient : solrClients) {
+ solrClient.deleteByQuery(query);
}
+ } catch (final SolrServerException e) {
+ LOG.error("Error deleting: " + deleteIds);
+ throw makeIOException(e);
}
}
@@ -98,6 +142,7 @@ public class SolrIndexWriter implements
public void write(NutchDocument doc) throws IOException {
final SolrInputDocument inputDoc = new SolrInputDocument();
+
for (final Entry<String, NutchField> e : doc) {
for (final Object val : e.getValue().getValues()) {
// normalise the string representation for a Date
@@ -122,48 +167,65 @@ public class SolrIndexWriter implements
inputDoc.setDocumentBoost(doc.getWeight());
inputDocs.add(inputDoc);
+ totalAdds++;
+
if (inputDocs.size() + numDeletes >= batchSize) {
+ push();
+ }
+ }
+
+ public void close() throws IOException {
+ commit();
+
+ for (SolrClient solrClient : solrClients) {
+ solrClient.close();
+ }
+ }
+
+ @Override
+ public void commit() throws IOException {
+ push();
+ try {
+ for (SolrClient solrClient : solrClients) {
+ solrClient.commit();
+ }
+ } catch (final SolrServerException e) {
+ LOG.error("Failed to commit solr connection: " + e.getMessage()); // FIXME
+ }
+ }
+
+ public void push() throws IOException {
+ if (inputDocs.size() > 0) {
try {
LOG.info("Indexing " + Integer.toString(inputDocs.size())
- + " documents");
+ + "/" + Integer.toString(totalAdds) + " documents");
LOG.info("Deleting " + Integer.toString(numDeletes) + " documents");
numDeletes = 0;
UpdateRequest req = new UpdateRequest();
req.add(inputDocs);
+ req.setAction(AbstractUpdateRequest.ACTION.OPTIMIZE, false, false);
req.setParams(params);
- req.process(solr);
+ for (SolrClient solrClient : solrClients) {
+ NamedList res = solrClient.request(req);
+ }
} catch (final SolrServerException e) {
throw makeIOException(e);
}
inputDocs.clear();
}
- }
- public void close() throws IOException {
- try {
- if (!inputDocs.isEmpty()) {
- LOG.info("Indexing " + Integer.toString(inputDocs.size())
- + " documents");
- if (numDeletes > 0) {
- LOG.info("Deleting " + Integer.toString(numDeletes) + " documents");
+ if (deleteIds.size() > 0) {
+ try {
+ LOG.info("SolrIndexer: deleting " + Integer.toString(deleteIds.size())
+ + "/" + Integer.toString(totalDeletes) + " documents");
+ for (SolrClient solrClient : solrClients) {
+ solrClient.deleteById(deleteIds);
}
- UpdateRequest req = new UpdateRequest();
- req.add(inputDocs);
- req.setParams(params);
- req.process(solr);
- inputDocs.clear();
+ } catch (final SolrServerException e) {
+ LOG.error("Error deleting: " + deleteIds);
+ throw makeIOException(e);
}
- } catch (final SolrServerException e) {
- throw makeIOException(e);
- }
- }
-
- @Override
- public void commit() throws IOException {
- try {
- solr.commit();
- } catch (SolrServerException e) {
- throw makeIOException(e);
+ deleteIds.clear();
}
}
@@ -182,9 +244,10 @@ public class SolrIndexWriter implements
public void setConf(Configuration conf) {
config = conf;
String serverURL = conf.get(SolrConstants.SERVER_URL);
- if (serverURL == null) {
- String message = "Missing Solr URL. Should be set via -D "
- + SolrConstants.SERVER_URL;
+ String zkHosts = conf.get(SolrConstants.ZOOKEEPER_HOSTS);
+ if (serverURL == null && zkHosts == null) {
+ String message = "Missing SOLR URL and Zookeeper URL. Either on should be set via -D "
+ + SolrConstants.SERVER_URL + " or -D " + SolrConstants.ZOOKEEPER_HOSTS;
message += "\n" + describe();
LOG.error(message);
throw new RuntimeException(message);
@@ -192,20 +255,17 @@ public class SolrIndexWriter implements
}
public String describe() {
- StringBuffer sb = new StringBuffer("SolrIndexWriter\n");
- sb.append("\t").append(SolrConstants.SERVER_TYPE)
- .append(" : Type of SolrServer to communicate with (default 'http' however options include 'cloud', 'lb' and 'concurrent')\n");
+ StringBuffer sb = new StringBuffer("SOLRIndexWriter\n");
sb.append("\t").append(SolrConstants.SERVER_URL)
- .append(" : URL of the Solr instance (mandatory)\n");
- sb.append("\t").append(SolrConstants.ZOOKEEPER_URL)
- .append(" : URL of the Zookeeper URL (mandatory if 'cloud' value for solr.server.type)\n");
- sb.append("\t").append(SolrConstants.LOADBALANCE_URLS)
- .append(" : Comma-separated string of Solr server strings to be used (madatory if 'lb' value for solr.server.type)\n");
+ .append(" : URL of the SOLR instance\n");
+ sb.append("\t").append(SolrConstants.ZOOKEEPER_HOSTS)
+ .append(" : URL of the Zookeeper quorum\n");
+ sb.append("\t").append(SolrConstants.COMMIT_SIZE)
+ .append(" : buffer size when sending to SOLR (default 1000)\n");
sb.append("\t")
.append(SolrConstants.MAPPING_FILE)
- .append(" : name of the mapping file for fields (default solrindex-mapping.xml)\n");
- sb.append("\t").append(SolrConstants.COMMIT_SIZE)
- .append(" : buffer size when sending to Solr (default 1000)\n");
+ .append(
+ " : name of the mapping file for fields (default solrindex-mapping.xml)\n");
sb.append("\t").append(SolrConstants.USE_AUTH)
.append(" : use authentication (default false)\n");
sb.append("\t").append(SolrConstants.USERNAME)
Modified: nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java?rev=1728313&r1=1728312&r2=1728313&view=diff
==============================================================================
--- nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java (original)
+++ nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java Wed Feb 3 13:51:10 2016
@@ -16,20 +16,15 @@
*/
package org.apache.nutch.indexwriter.solr;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
+
+import java.util.ArrayList;
+import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.solr.client.solrj.impl.CloudSolrServer;
-import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrServer;
-import org.apache.solr.client.solrj.impl.HttpSolrServer;
-import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
-import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
import java.net.MalformedURLException;
@@ -37,64 +32,45 @@ public class SolrUtils {
public static Logger LOG = LoggerFactory.getLogger(SolrUtils.class);
- private static SolrServer server;
-
- public static SolrServer getSolrServer(JobConf job)
- throws MalformedURLException {
-
- boolean auth = job.getBoolean(SolrConstants.USE_AUTH, false);
-
- CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- // Check for username/password
- if (auth) {
- String username = job.get(SolrConstants.USERNAME);
- LOG.info("Authenticating as: " + username);
- AuthScope scope = new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT,
- AuthScope.ANY_REALM, AuthScope.ANY_SCHEME);
- credentialsProvider.setCredentials(scope,
- new UsernamePasswordCredentials(username, job.get(SolrConstants.PASSWORD)));
- }
- CloseableHttpClient client =
- HttpClientBuilder.create().setDefaultCredentialsProvider(credentialsProvider).build();
-
- String solrServer = job.get(SolrConstants.SERVER_TYPE, "http");
- String zkHost = job.get(SolrConstants.ZOOKEEPER_URL, null);
- String solrServerUrl = job.get(SolrConstants.SERVER_URL);
-
- switch (solrServer) {
- case "cloud":
- server = new CloudSolrServer(zkHost);
- LOG.debug("CloudSolrServer selected as indexing server.");
- break;
- case "concurrent":
- server = new ConcurrentUpdateSolrServer(solrServerUrl, client, 1000, 10);
- LOG.debug("ConcurrentUpdateSolrServer selected as indexing server.");
- break;
- case "http":
- if (auth) {
- server = new HttpSolrServer(solrServerUrl, client);
- } else {
- server = new HttpSolrServer(solrServerUrl);
+ /**
+ *
+ *
+ * @param JobConf
+ * @return SolrClient
+ */
+ public static ArrayList<SolrClient> getSolrClients(JobConf job) throws MalformedURLException {
+ String[] urls = job.getStrings(SolrConstants.SERVER_URL);
+ String[] zkHostString = job.getStrings(SolrConstants.ZOOKEEPER_HOSTS);
+ ArrayList<SolrClient> solrClients = new ArrayList<SolrClient>();
+
+ if (zkHostString != null && zkHostString.length > 0) {
+ for (int i = 0; i < zkHostString.length; i++) {
+ CloudSolrClient sc = getCloudSolrClient(zkHostString[i]);
+ sc.setDefaultCollection(job.get(SolrConstants.COLLECTION));
+ solrClients.add(sc);
}
- LOG.debug("HttpSolrServer selected as indexing server.");
- break;
- case "lb":
- String[] lbServerString = job.get(SolrConstants.LOADBALANCE_URLS).split(",");
- server = new LBHttpSolrServer(client, lbServerString);
- LOG.debug("LBHttpSolrServer selected as indexing server.");
- break;
- default:
- if (auth) {
- server = new HttpSolrServer(solrServerUrl, client);
- } else {
- server = new HttpSolrServer(solrServerUrl);
+ } else {
+ for (int i = 0; i < urls.length; i++) {
+ SolrClient sc = new HttpSolrClient(urls[i]);
+ solrClients.add(sc);
}
- LOG.debug("HttpSolrServer selected as indexing server.");
- break;
}
- return server;
+
+ return solrClients;
+ }
+
+ public static CloudSolrClient getCloudSolrClient(String url) throws MalformedURLException {
+ CloudSolrClient sc = new CloudSolrClient(url.replace('|', ','));
+ sc.setParallelUpdates(true);
+ sc.connect();
+ return sc;
}
+ public static SolrClient getHttpSolrClient(String url) throws MalformedURLException {
+ SolrClient sc =new HttpSolrClient(url);
+ return sc;
+ }
+
public static String stripNonCharCodepoints(String input) {
StringBuilder retval = new StringBuilder();
char ch;
@@ -117,4 +93,5 @@ public class SolrUtils {
return retval.toString();
}
-}
\ No newline at end of file
+
+}