You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ma...@apache.org on 2016/02/03 14:51:10 UTC

svn commit: r1728313 - in /nutch/trunk: ./ src/plugin/indexer-solr/ src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/

Author: markus
Date: Wed Feb  3 13:51:10 2016
New Revision: 1728313

URL: http://svn.apache.org/viewvc?rev=1728313&view=rev
Log:
NUTCH-2197 Add Solr 5 cloud indexer support

Modified:
    nutch/trunk/CHANGES.txt
    nutch/trunk/src/plugin/indexer-solr/ivy.xml
    nutch/trunk/src/plugin/indexer-solr/plugin.xml
    nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java
    nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java
    nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java

Modified: nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1728313&r1=1728312&r2=1728313&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Wed Feb  3 13:51:10 2016
@@ -1,5 +1,7 @@
 Nutch Change Log
 
+* NUTCH-2197 Add Solr 5 cloud indexer support (Jurian Broertjes via markus)
+
 * NUTCH-2206 Provide example scoring.similarity.stopword.file (sujen)
 
 * NUTCH-2204 Remove junit lib from runtime (snagel)

Modified: nutch/trunk/src/plugin/indexer-solr/ivy.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-solr/ivy.xml?rev=1728313&r1=1728312&r2=1728313&view=diff
==============================================================================
--- nutch/trunk/src/plugin/indexer-solr/ivy.xml (original)
+++ nutch/trunk/src/plugin/indexer-solr/ivy.xml Wed Feb  3 13:51:10 2016
@@ -36,9 +36,9 @@
   </publications>
 
   <dependencies>
-    <dependency org="org.apache.solr" name="solr-solrj" rev="4.10.2" conf="*->default"/>
-    <dependency org="org.apache.httpcomponents" name="httpclient" rev="4.3.1"  conf="*->default"/>
-   <dependency org="org.apache.httpcomponents" name="httpmime" rev="4.3.1"  conf="*->default"/>
+    <dependency org="org.apache.solr" name="solr-solrj" rev="5.4.1"/>
+    <dependency org="org.apache.httpcomponents" name="httpcore" rev="4.4.1" conf="*->default"/>
+    <dependency org="org.apache.httpcomponents" name="httpmime" rev="4.4.1" conf="*->default"/>
   </dependencies>
   
 </ivy-module>

Modified: nutch/trunk/src/plugin/indexer-solr/plugin.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-solr/plugin.xml?rev=1728313&r1=1728312&r2=1728313&view=diff
==============================================================================
--- nutch/trunk/src/plugin/indexer-solr/plugin.xml (original)
+++ nutch/trunk/src/plugin/indexer-solr/plugin.xml Wed Feb  3 13:51:10 2016
@@ -22,17 +22,16 @@
     <library name="indexer-solr.jar">
       <export name="*" />
     </library>
-      <library name="commons-codec-1.9.jar"/>
-      <library name="commons-io-2.3.jar"/>
-      <library name="commons-logging-1.1.3.jar"/>
-      <library name="httpclient-4.3.1.jar"/>
-      <library name="httpcore-4.3.jar"/>
-      <library name="httpmime-4.3.1.jar"/>
-      <library name="noggit-0.5.jar"/>
-      <library name="slf4j-api-1.7.6.jar"/>
-      <library name="solr-solrj-4.10.2.jar"/>
-      <library name="wstx-asl-3.2.7.jar"/>
-      <library name="zookeeper-3.4.6.jar"/>
+      <library name="commons-io-2.4.jar"/>
+      <library name="httpclient-4.4.1.jar"/>
+      <library name="httpcore-4.4.1.jar"/>
+      <library name="httpmime-4.4.1.jar"/>
+      <library name="noggit-0.6.jar"/>
+      <library name="slf4j-api-1.7.7.jar"/>
+      <library name="solr-solrj-5.4.1.jar"/>
+      <library name="stax2-api-3.1.4.jar"/>
+      <library name="woodstox-core-asl-4.4.1.jar"/>
+      <library name="zookeeper-3.4.6.jar"/> 
   </runtime>
 
   <requires>

Modified: nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java?rev=1728313&r1=1728312&r2=1728313&view=diff
==============================================================================
--- nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java (original)
+++ nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java Wed Feb  3 13:51:10 2016
@@ -17,7 +17,6 @@
 package org.apache.nutch.indexwriter.solr;
 
 public interface SolrConstants {
-  
   public static final String SOLR_PREFIX = "solr.";
 
   public static final String SERVER_URL = SOLR_PREFIX + "server.url";
@@ -31,13 +30,23 @@ public interface SolrConstants {
   public static final String USERNAME = SOLR_PREFIX + "auth.username";
 
   public static final String PASSWORD = SOLR_PREFIX + "auth.password";
-  
-  public static final String SERVER_TYPE = SOLR_PREFIX + "server.type";
-  
-  public static final String ZOOKEEPER_URL = SOLR_PREFIX + "zookeeper.url";
-  
-  public static final String LOADBALANCE_URLS = SOLR_PREFIX + "loadbalance.urls";
-  
+
+  public static final String COLLECTION = SOLR_PREFIX + "collection";
+
+  public static final String ZOOKEEPER_HOSTS = SOLR_PREFIX + "zookeeper.hosts";
+
+  public static final String ID_FIELD = "id";
+
+  public static final String URL_FIELD = "url";
+
+  public static final String BOOST_FIELD = "boost";
+
+  public static final String TIMESTAMP_FIELD = "tstamp";
+
+  public static final String DIGEST_FIELD = "digest";
+
+
+
   @Deprecated
   public static final String COMMIT_INDEX = SOLR_PREFIX + "commit.index";
 

Modified: nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java?rev=1728313&r1=1728312&r2=1728313&view=diff
==============================================================================
--- nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java (original)
+++ nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java Wed Feb  3 13:51:10 2016
@@ -17,6 +17,7 @@
 package org.apache.nutch.indexwriter.solr;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
@@ -28,21 +29,33 @@ import org.apache.nutch.indexer.IndexWri
 import org.apache.nutch.indexer.IndexerMapReduce;
 import org.apache.nutch.indexer.NutchDocument;
 import org.apache.nutch.indexer.NutchField;
-import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.DateUtil;
+import org.apache.solr.common.util.NamedList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.util.NutchConfiguration;
+
+// WORK AROUND FOR NOT REMOVING URL ENCODED URLS!!!
+import java.net.URLDecoder;
+
 public class SolrIndexWriter implements IndexWriter {
 
   public static final Logger LOG = LoggerFactory
       .getLogger(SolrIndexWriter.class);
 
-  private SolrServer solr;
+  private List<SolrClient> solrClients;
   private SolrMappingReader solrMapping;
   private ModifiableSolrParams params;
 
@@ -50,18 +63,24 @@ public class SolrIndexWriter implements
 
   private final List<SolrInputDocument> inputDocs = new ArrayList<SolrInputDocument>();
 
+  private final List<SolrInputDocument> updateDocs = new ArrayList<SolrInputDocument>();
+    
+  private final List<String> deleteIds = new ArrayList<String>();
+
   private int batchSize;
   private int numDeletes = 0;
+  private int totalAdds = 0;
+  private int totalDeletes = 0;
+  private int totalUpdates = 0;
   private boolean delete = false;
 
   public void open(JobConf job, String name) throws IOException {
-    SolrServer server = SolrUtils.getSolrServer(job);
-    init(server, job);
+    solrClients = SolrUtils.getSolrClients(job);
+    init(solrClients, job);
   }
 
   // package protected for tests
-  void init(SolrServer server, JobConf job) throws IOException {
-    solr = server;
+  void init(List<SolrClient> solrClients, JobConf job) throws IOException {
     batchSize = job.getInt(SolrConstants.COMMIT_SIZE, 1000);
     solrMapping = SolrMappingReader.getInstance(job);
     delete = job.getBoolean(IndexerMapReduce.INDEXER_DELETE, false);
@@ -81,13 +100,38 @@ public class SolrIndexWriter implements
   }
 
   public void delete(String key) throws IOException {
+    try {
+      key = URLDecoder.decode(key, "UTF8");
+    } catch (UnsupportedEncodingException e) {
+      LOG.error("Error decoding: " + key);
+      throw new IOException("UnsupportedEncodingException for " + key);
+    } catch (IllegalArgumentException e) {
+      LOG.warn("Could not decode: " + key + ", it probably wasn't encoded in the first place..");
+    }
+    
+    // escape solr hash separator
+    key = key.replaceAll("!", "\\!");
+    
     if (delete) {
-      try {
-        solr.deleteById(key);
-        numDeletes++;
-      } catch (final SolrServerException e) {
-        throw makeIOException(e);
+      deleteIds.add(key);
+      totalDeletes++;
+    }
+    
+    if (deleteIds.size() >= batchSize) {
+      push();
+    }
+
+  }
+
+  public void deleteByQuery(String query) throws IOException {
+    try {
+      LOG.info("SolrWriter: deleting " + query);
+      for (SolrClient solrClient : solrClients) {
+        solrClient.deleteByQuery(query);
       }
+    } catch (final SolrServerException e) {
+      LOG.error("Error deleting: " + deleteIds);
+      throw makeIOException(e);
     }
   }
 
@@ -98,6 +142,7 @@ public class SolrIndexWriter implements
 
   public void write(NutchDocument doc) throws IOException {
     final SolrInputDocument inputDoc = new SolrInputDocument();
+
     for (final Entry<String, NutchField> e : doc) {
       for (final Object val : e.getValue().getValues()) {
         // normalise the string representation for a Date
@@ -122,48 +167,65 @@ public class SolrIndexWriter implements
 
     inputDoc.setDocumentBoost(doc.getWeight());
     inputDocs.add(inputDoc);
+    totalAdds++;
+
     if (inputDocs.size() + numDeletes >= batchSize) {
+      push();
+    }
+  }
+
+  public void close() throws IOException {
+    commit();
+
+    for (SolrClient solrClient : solrClients) {
+      solrClient.close();
+    }
+  }
+
+  @Override
+  public void commit() throws IOException {
+    push();
+    try {
+      for (SolrClient solrClient : solrClients) {
+        solrClient.commit();
+      }
+    } catch (final SolrServerException e) {
+      LOG.error("Failed to commit solr connection: " + e.getMessage()); // FIXME
+    }
+  }
+    
+  public void push() throws IOException {
+    if (inputDocs.size() > 0) {
       try {
         LOG.info("Indexing " + Integer.toString(inputDocs.size())
-            + " documents");
+            + "/" + Integer.toString(totalAdds) + " documents");
         LOG.info("Deleting " + Integer.toString(numDeletes) + " documents");
         numDeletes = 0;
         UpdateRequest req = new UpdateRequest();
         req.add(inputDocs);
+        req.setAction(AbstractUpdateRequest.ACTION.OPTIMIZE, false, false);
         req.setParams(params);
-        req.process(solr);
+        for (SolrClient solrClient : solrClients) {
+          NamedList res = solrClient.request(req);
+        }
       } catch (final SolrServerException e) {
         throw makeIOException(e);
       }
       inputDocs.clear();
     }
-  }
 
-  public void close() throws IOException {
-    try {
-      if (!inputDocs.isEmpty()) {
-        LOG.info("Indexing " + Integer.toString(inputDocs.size())
-            + " documents");
-        if (numDeletes > 0) {
-          LOG.info("Deleting " + Integer.toString(numDeletes) + " documents");
+    if (deleteIds.size() > 0) {
+      try {
+        LOG.info("SolrIndexer: deleting " + Integer.toString(deleteIds.size()) 
+            + "/" + Integer.toString(totalDeletes) + " documents");
+        for (SolrClient solrClient : solrClients) {
+          solrClient.deleteById(deleteIds);
         }
-        UpdateRequest req = new UpdateRequest();
-        req.add(inputDocs);
-        req.setParams(params);
-        req.process(solr);
-        inputDocs.clear();
+      } catch (final SolrServerException e) {
+        LOG.error("Error deleting: " + deleteIds);
+        throw makeIOException(e);
       }
-    } catch (final SolrServerException e) {
-      throw makeIOException(e);
-    }
-  }
-
-  @Override
-  public void commit() throws IOException {
-    try {
-      solr.commit();
-    } catch (SolrServerException e) {
-      throw makeIOException(e);
+      deleteIds.clear();
     }
   }
 
@@ -182,9 +244,10 @@ public class SolrIndexWriter implements
   public void setConf(Configuration conf) {
     config = conf;
     String serverURL = conf.get(SolrConstants.SERVER_URL);
-    if (serverURL == null) {
-      String message = "Missing Solr URL. Should be set via -D "
-          + SolrConstants.SERVER_URL;
+    String zkHosts = conf.get(SolrConstants.ZOOKEEPER_HOSTS);
+    if (serverURL == null && zkHosts == null) {
+      String message = "Missing SOLR URL and Zookeeper URL. Either on should be set via -D "
+          + SolrConstants.SERVER_URL + " or -D " + SolrConstants.ZOOKEEPER_HOSTS;
       message += "\n" + describe();
       LOG.error(message);
       throw new RuntimeException(message);
@@ -192,20 +255,17 @@ public class SolrIndexWriter implements
   }
 
   public String describe() {
-    StringBuffer sb = new StringBuffer("SolrIndexWriter\n");
-    sb.append("\t").append(SolrConstants.SERVER_TYPE)
-        .append(" : Type of SolrServer to communicate with (default 'http' however options include 'cloud', 'lb' and 'concurrent')\n");
+    StringBuffer sb = new StringBuffer("SOLRIndexWriter\n");
     sb.append("\t").append(SolrConstants.SERVER_URL)
-        .append(" : URL of the Solr instance (mandatory)\n");
-    sb.append("\t").append(SolrConstants.ZOOKEEPER_URL)
-        .append(" : URL of the Zookeeper URL (mandatory if 'cloud' value for solr.server.type)\n");
-    sb.append("\t").append(SolrConstants.LOADBALANCE_URLS)
-        .append(" : Comma-separated string of Solr server strings to be used (madatory if 'lb' value for solr.server.type)\n");
+        .append(" : URL of the SOLR instance\n");
+    sb.append("\t").append(SolrConstants.ZOOKEEPER_HOSTS)
+        .append(" : URL of the Zookeeper quorum\n");
+    sb.append("\t").append(SolrConstants.COMMIT_SIZE)
+        .append(" : buffer size when sending to SOLR (default 1000)\n");
     sb.append("\t")
         .append(SolrConstants.MAPPING_FILE)
-        .append(" : name of the mapping file for fields (default solrindex-mapping.xml)\n");
-    sb.append("\t").append(SolrConstants.COMMIT_SIZE)
-        .append(" : buffer size when sending to Solr (default 1000)\n");
+        .append(
+            " : name of the mapping file for fields (default solrindex-mapping.xml)\n");
     sb.append("\t").append(SolrConstants.USE_AUTH)
         .append(" : use authentication (default false)\n");
     sb.append("\t").append(SolrConstants.USERNAME)

Modified: nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java?rev=1728313&r1=1728312&r2=1728313&view=diff
==============================================================================
--- nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java (original)
+++ nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java Wed Feb  3 13:51:10 2016
@@ -16,20 +16,15 @@
  */
 package org.apache.nutch.indexwriter.solr;
 
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
+
+import java.util.ArrayList;
+import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.solr.client.solrj.impl.CloudSolrServer;
-import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrServer;
-import org.apache.solr.client.solrj.impl.HttpSolrServer;
-import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
-import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
 
 import java.net.MalformedURLException;
 
@@ -37,64 +32,45 @@ public class SolrUtils {
 
   public static Logger LOG = LoggerFactory.getLogger(SolrUtils.class);
 
-  private static SolrServer server;
-
-  public static SolrServer getSolrServer(JobConf job)
-      throws MalformedURLException {
-
-    boolean auth = job.getBoolean(SolrConstants.USE_AUTH, false);
-
-    CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
-    // Check for username/password
-    if (auth) {
-      String username = job.get(SolrConstants.USERNAME);
-      LOG.info("Authenticating as: " + username);
-      AuthScope scope = new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT,
-          AuthScope.ANY_REALM, AuthScope.ANY_SCHEME);
-      credentialsProvider.setCredentials(scope, 
-          new UsernamePasswordCredentials(username, job.get(SolrConstants.PASSWORD)));
-    }
-    CloseableHttpClient client = 
-        HttpClientBuilder.create().setDefaultCredentialsProvider(credentialsProvider).build();
-
-    String solrServer = job.get(SolrConstants.SERVER_TYPE, "http");
-    String zkHost = job.get(SolrConstants.ZOOKEEPER_URL, null);
-    String solrServerUrl = job.get(SolrConstants.SERVER_URL);
-
-    switch (solrServer) {
-    case "cloud":
-      server = new CloudSolrServer(zkHost);
-      LOG.debug("CloudSolrServer selected as indexing server.");
-      break;
-    case "concurrent":
-      server = new ConcurrentUpdateSolrServer(solrServerUrl, client, 1000, 10);
-      LOG.debug("ConcurrentUpdateSolrServer selected as indexing server.");
-      break;
-    case "http":
-      if (auth) {
-        server = new HttpSolrServer(solrServerUrl, client);
-      } else {
-        server = new HttpSolrServer(solrServerUrl);
+  /**
+   *
+   *
+   * @param JobConf
+   * @return SolrClient
+   */
+  public static ArrayList<SolrClient> getSolrClients(JobConf job) throws MalformedURLException {
+    String[] urls = job.getStrings(SolrConstants.SERVER_URL);
+    String[] zkHostString = job.getStrings(SolrConstants.ZOOKEEPER_HOSTS);
+    ArrayList<SolrClient> solrClients = new ArrayList<SolrClient>();
+    
+    if (zkHostString != null && zkHostString.length > 0) {
+      for (int i = 0; i < zkHostString.length; i++) {
+        CloudSolrClient sc = getCloudSolrClient(zkHostString[i]);
+        sc.setDefaultCollection(job.get(SolrConstants.COLLECTION));
+        solrClients.add(sc);
       }
-      LOG.debug("HttpSolrServer selected as indexing server.");
-      break;
-    case "lb":
-      String[] lbServerString = job.get(SolrConstants.LOADBALANCE_URLS).split(",");
-      server = new LBHttpSolrServer(client, lbServerString);
-      LOG.debug("LBHttpSolrServer selected as indexing server.");
-      break;
-    default:
-      if (auth) {
-        server = new HttpSolrServer(solrServerUrl, client);
-      } else {
-        server = new HttpSolrServer(solrServerUrl);
+    } else {
+      for (int i = 0; i < urls.length; i++) {
+        SolrClient sc = new HttpSolrClient(urls[i]);
+        solrClients.add(sc);
       }
-      LOG.debug("HttpSolrServer selected as indexing server.");
-      break;
     }
-    return server;
+
+    return solrClients;
+  }
+
+  public static CloudSolrClient getCloudSolrClient(String url) throws MalformedURLException {
+    CloudSolrClient sc = new CloudSolrClient(url.replace('|', ','));
+    sc.setParallelUpdates(true);
+    sc.connect();
+    return sc;
   }
 
+  public static SolrClient getHttpSolrClient(String url) throws MalformedURLException {
+    SolrClient sc =new HttpSolrClient(url);
+    return sc;
+  }
+  
   public static String stripNonCharCodepoints(String input) {
     StringBuilder retval = new StringBuilder();
     char ch;
@@ -117,4 +93,5 @@ public class SolrUtils {
 
     return retval.toString();
   }
-}
\ No newline at end of file
+
+}