You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2013/10/21 20:58:44 UTC

svn commit: r1534320 [32/39] - in /lucene/dev/branches/lucene4956: ./ dev-tools/ dev-tools/idea/.idea/ dev-tools/idea/lucene/expressions/ dev-tools/idea/solr/contrib/velocity/ dev-tools/maven/ dev-tools/maven/lucene/ dev-tools/maven/lucene/expressions/...

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java Mon Oct 21 18:58:24 2013
@@ -18,12 +18,14 @@
 package org.apache.solr.servlet;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -35,9 +37,11 @@ import org.apache.solr.common.util.Conte
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.ConfigSolr;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrConfig;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.SolrResourceLoader;
 import org.apache.solr.handler.ContentStreamHandlerBase;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequestBase;
@@ -60,6 +64,7 @@ import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -131,12 +136,44 @@ public class SolrDispatchFilter implemen
     log.info("SolrDispatchFilter.init() done");
   }
 
+  private ConfigSolr loadConfigSolr(SolrResourceLoader loader) {
+
+    String solrxmlLocation = System.getProperty("solr.solrxml.location", "solrhome");
+
+    if (solrxmlLocation == null || "solrhome".equalsIgnoreCase(solrxmlLocation))
+      return ConfigSolr.fromSolrHome(loader, loader.getInstanceDir());
+
+    if ("zookeeper".equalsIgnoreCase(solrxmlLocation)) {
+      String zkHost = System.getProperty("zkHost");
+      log.info("Trying to read solr.xml from " + zkHost);
+      if (StringUtils.isEmpty(zkHost))
+        throw new SolrException(ErrorCode.SERVER_ERROR,
+            "Could not load solr.xml from zookeeper: zkHost system property not set");
+      SolrZkClient zkClient = new SolrZkClient(zkHost, 30000);
+      try {
+        if (!zkClient.exists("/solr.xml", true))
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Could not load solr.xml from zookeeper: node not found");
+        byte[] data = zkClient.getData("/solr.xml", null, null, true);
+        return ConfigSolr.fromInputStream(loader, new ByteArrayInputStream(data));
+      } catch (Exception e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Could not load solr.xml from zookeeper", e);
+      } finally {
+        zkClient.close();
+      }
+    }
+
+    throw new SolrException(ErrorCode.SERVER_ERROR,
+        "Bad solr.solrxml.location set: " + solrxmlLocation + " - should be 'solrhome' or 'zookeeper'");
+  }
+
   /**
    * Override this to change CoreContainer initialization
    * @return a CoreContainer to hold this server's cores
    */
   protected CoreContainer createCoreContainer() {
-    CoreContainer cores = new CoreContainer();
+    SolrResourceLoader loader = new SolrResourceLoader(SolrResourceLoader.locateSolrHome());
+    ConfigSolr config = loadConfigSolr(loader);
+    CoreContainer cores = new CoreContainer(loader, config);
     cores.load();
     return cores;
   }
@@ -165,7 +202,7 @@ public class SolrDispatchFilter implemen
     }
     
     if (this.cores == null) {
-      ((HttpServletResponse)response).sendError( 503, "Server is shutting down" );
+      ((HttpServletResponse)response).sendError( 503, "Server is shutting down or failed to initialize" );
       return;
     }
     CoreContainer cores = this.cores;

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java Mon Oct 21 18:58:24 2013
@@ -290,21 +290,6 @@ public final class ZookeeperInfoServlet 
           printZnode(json, path);
         }
 
-        /*
-        if (stat.getNumChildren() != 0)
-        {
-          writeKeyValue(json, "children_count",  stat.getNumChildren(), false );
-          out.println(", \"children_count\" : \"" + stat.getNumChildren() + "\"");
-        }
-        */
-
-        //if (stat.getDataLength() != 0)
-        if (data != null) {
-          String str = new BytesRef(data).utf8ToString();
-          //?? writeKeyValue(json, "content", str, false );
-          // Does nothing now, but on the assumption this will be used later we'll leave it in. If it comes out
-          // the catches below need to be restructured.
-        }
       } catch (IllegalArgumentException e) {
         // path doesn't exist (must have been removed)
         writeKeyValue(json, "warning", "(path gone)", false);
@@ -381,6 +366,16 @@ public final class ZookeeperInfoServlet 
         // Trickily, the call to zkClient.getData fills in the stat variable
         byte[] data = zkClient.getData(path, null, stat, true);
 
+        String dataStr = null;
+        String dataStrErr = null;
+        if (null != data) {
+          try {
+            dataStr = (new BytesRef(data)).utf8ToString();
+          } catch (Exception e) {
+            dataStrErr = "data is not parsable as a utf8 String: " + e.toString();
+          }
+        }
+
         json.writeString("znode");
         json.writeNameSeparator();
         json.startObject();
@@ -397,15 +392,18 @@ public final class ZookeeperInfoServlet 
         writeKeyValue(json, "ctime", time(stat.getCtime()), false);
         writeKeyValue(json, "cversion", stat.getCversion(), false);
         writeKeyValue(json, "czxid", stat.getCzxid(), false);
-        writeKeyValue(json, "dataLength", stat.getDataLength(), false);
         writeKeyValue(json, "ephemeralOwner", stat.getEphemeralOwner(), false);
         writeKeyValue(json, "mtime", time(stat.getMtime()), false);
         writeKeyValue(json, "mzxid", stat.getMzxid(), false);
         writeKeyValue(json, "pzxid", stat.getPzxid(), false);
+        writeKeyValue(json, "dataLength", stat.getDataLength(), false);
+        if (null != dataStrErr) {
+          writeKeyValue(json, "dataNote", dataStrErr, false);
+        }
         json.endObject();
 
-        if (data != null) {
-          writeKeyValue(json, "data", new BytesRef(data).utf8ToString(), false);
+        if (null != dataStr) {
+          writeKeyValue(json, "data", dataStr, false);
         }
         json.endObject();
       } catch (KeeperException e) {

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/spelling/SpellCheckCollator.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/spelling/SpellCheckCollator.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/spelling/SpellCheckCollator.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/spelling/SpellCheckCollator.java Mon Oct 21 18:58:24 2013
@@ -147,10 +147,14 @@ public class SpellCheckCollator {
           hits = (Integer) checkResponse.rsp.getToLog().get("hits");
         } catch (EarlyTerminatingCollectorException etce) {
           assert (docCollectionLimit > 0);
-          if (etce.getLastDocId() + 1 == maxDocId) {
-            hits = docCollectionLimit;
+          assert 0 < etce.getNumberScanned();
+          assert 0 < etce.getNumberCollected();
+
+          if (etce.getNumberScanned() == maxDocId) {
+            hits = etce.getNumberCollected();
           } else {
-            hits = maxDocId / ((etce.getLastDocId() + 1) / docCollectionLimit);
+            hits = (int) ( ((float)( maxDocId * etce.getNumberCollected() )) 
+                           / (float)etce.getNumberScanned() );
           }
         } catch (Exception e) {
           LOG.warn("Exception trying to re-query to check if a spell check possibility would return any hits.", e);

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java Mon Oct 21 18:58:24 2013
@@ -23,22 +23,24 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.BaseDirectory;
+import org.apache.lucene.store.BufferedIndexOutput;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.NoLockFactory;
+import org.apache.lucene.util.IOUtils;
 import org.apache.solr.store.blockcache.CustomBufferedIndexInput;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HdfsDirectory extends Directory {
+public class HdfsDirectory extends BaseDirectory {
   public static Logger LOG = LoggerFactory.getLogger(HdfsDirectory.class);
   
   public static final int BUFFER_SIZE = 8192;
@@ -62,7 +64,7 @@ public class HdfsDirectory extends Direc
         fileSystem.mkdirs(hdfsDirPath);
       }
     } catch (Exception e) {
-      IOUtils.closeQuietly(fileSystem);
+      org.apache.solr.util.IOUtils.closeQuietly(fileSystem);
       throw new RuntimeException("Problem creating directory: " + hdfsDirPath,
           e);
     }
@@ -108,7 +110,7 @@ public class HdfsDirectory extends Direc
   }
   
   private IndexInput openInput(String name, int bufferSize) throws IOException {
-    return new HdfsNormalIndexInput(name, getFileSystem(), new Path(
+    return new HdfsIndexInput(name, getFileSystem(), new Path(
         hdfsDirPath, name), BUFFER_SIZE);
   }
   
@@ -163,16 +165,16 @@ public class HdfsDirectory extends Direc
     return configuration;
   }
   
-  static class HdfsNormalIndexInput extends CustomBufferedIndexInput {
+  static class HdfsIndexInput extends CustomBufferedIndexInput {
     public static Logger LOG = LoggerFactory
-        .getLogger(HdfsNormalIndexInput.class);
+        .getLogger(HdfsIndexInput.class);
     
     private final Path path;
     private final FSDataInputStream inputStream;
     private final long length;
     private boolean clone = false;
     
-    public HdfsNormalIndexInput(String name, FileSystem fileSystem, Path path,
+    public HdfsIndexInput(String name, FileSystem fileSystem, Path path,
         int bufferSize) throws IOException {
       super(name);
       this.path = path;
@@ -185,12 +187,12 @@ public class HdfsDirectory extends Direc
     @Override
     protected void readInternal(byte[] b, int offset, int length)
         throws IOException {
-      inputStream.read(getFilePointer(), b, offset, length);
+      inputStream.readFully(getFilePointer(), b, offset, length);
     }
     
     @Override
     protected void seekInternal(long pos) throws IOException {
-      inputStream.seek(pos);
+
     }
     
     @Override
@@ -208,13 +210,13 @@ public class HdfsDirectory extends Direc
     
     @Override
     public IndexInput clone() {
-      HdfsNormalIndexInput clone = (HdfsNormalIndexInput) super.clone();
+      HdfsIndexInput clone = (HdfsIndexInput) super.clone();
       clone.clone = true;
       return clone;
     }
   }
   
-  static class HdfsIndexOutput extends IndexOutput {
+  static class HdfsIndexOutput extends BufferedIndexOutput {
     
     private HdfsFileWriter writer;
     
@@ -224,33 +226,26 @@ public class HdfsDirectory extends Direc
     
     @Override
     public void close() throws IOException {
-      writer.close();
-    }
-    
-    @Override
-    public void flush() throws IOException {
-      writer.flush();
+      IOException priorE = null;
+      try {
+        super.close();
+      } catch (IOException ioe) {
+        priorE = ioe;
+      } finally {
+        IOUtils.closeWhileHandlingException(priorE, writer);
+      }
     }
-    
+
     @Override
-    public long getFilePointer() {
-      return writer.getPosition();
+    protected void flushBuffer(byte[] b, int offset, int len)
+        throws IOException {
+      writer.writeBytes(b, offset, len);
     }
-    
+
     @Override
-    public long length() {
+    public long length() throws IOException {
       return writer.length();
     }
-    
-    @Override
-    public void writeByte(byte b) throws IOException {
-      writer.writeByte(b);
-    }
-    
-    @Override
-    public void writeBytes(byte[] b, int offset, int length) throws IOException {
-      writer.writeBytes(b, offset, length);
-    }
   }
   
   @Override

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileWriter.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileWriter.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileWriter.java Mon Oct 21 18:58:24 2013
@@ -17,6 +17,7 @@ package org.apache.solr.store.hdfs;
  * limitations under the License.
  */
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.EnumSet;
 
@@ -31,7 +32,7 @@ import org.apache.lucene.store.DataOutpu
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HdfsFileWriter extends DataOutput {
+public class HdfsFileWriter extends DataOutput implements Closeable {
   public static Logger LOG = LoggerFactory.getLogger(HdfsFileWriter.class);
   
   public static final String HDFS_SYNC_BLOCK = "solr.hdfs.sync.block";

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java Mon Oct 21 18:58:24 2013
@@ -17,6 +17,10 @@
 
 package org.apache.solr.update;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.IndexDocument;
 import org.apache.lucene.index.Term;
@@ -28,11 +32,6 @@ import org.apache.solr.request.SolrQuery
 import org.apache.solr.schema.IndexSchema;
 import org.apache.solr.schema.SchemaField;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
 /**
  *
  */
@@ -194,15 +193,17 @@ public class AddUpdateCommand extends Up
   private List<SolrInputDocument> flatten(SolrInputDocument root) {
     List<SolrInputDocument> unwrappedDocs = new ArrayList<SolrInputDocument>();
     recUnwrapp(unwrappedDocs, root);
-    Collections.reverse(unwrappedDocs);
     return unwrappedDocs;
   }
 
   private void recUnwrapp(List<SolrInputDocument> unwrappedDocs, SolrInputDocument currentDoc) {
-    unwrappedDocs.add(currentDoc);
-    for (SolrInputDocument child : currentDoc.getChildDocuments()) {
-      recUnwrapp(unwrappedDocs, child);
+    List<SolrInputDocument> children = currentDoc.getChildDocuments();
+    if (children != null) {
+      for (SolrInputDocument child : children) {
+        recUnwrapp(unwrappedDocs, child);
+      }
     }
+    unwrappedDocs.add(currentDoc);
   }
 
 

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java Mon Oct 21 18:58:24 2013
@@ -42,22 +42,18 @@ import org.apache.solr.util.IOUtils;
 /** @lucene.experimental */
 public class HdfsUpdateLog extends UpdateLog {
   
-  private FileSystem fs;
-  private Path tlogDir;
-  private String confDir;
+  private volatile FileSystem fs;
+  private volatile Path tlogDir;
+  private final String confDir;
 
   public HdfsUpdateLog() {
-    
+    this.confDir = null;
   }
   
   public HdfsUpdateLog(String confDir) {
     this.confDir = confDir;
   }
   
-  public FileSystem getFs() {
-    return fs;
-  }
-  
   // HACK
   // while waiting for HDFS-3107, instead of quickly
   // dropping, we slowly apply
@@ -118,6 +114,14 @@ public class HdfsUpdateLog extends Updat
     }
     
     try {
+      if (fs != null) {
+        fs.close();
+      }
+    } catch (IOException e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, e);
+    }
+    
+    try {
       fs = FileSystem.newInstance(new Path(dataDir).toUri(), getConf());
     } catch (IOException e) {
       throw new SolrException(ErrorCode.SERVER_ERROR, e);

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/PeerSync.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/PeerSync.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/PeerSync.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/PeerSync.java Mon Oct 21 18:58:24 2013
@@ -215,8 +215,7 @@ public class PeerSync  {
 
     if (startingVersions != null) {
       if (startingVersions.size() == 0) {
-        // no frame of reference to tell of we've missed updates
-        log.warn("no frame of reference to tell of we've missed updates");
+        log.warn("no frame of reference to tell if we've missed updates");
         return false;
       }
       Collections.sort(startingVersions, absComparator);

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Mon Oct 21 18:58:24 2013
@@ -19,30 +19,21 @@ package org.apache.solr.update;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ExecutorService;
 
+import org.apache.solr.client.solrj.SolrServer;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.HttpSolrServer;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.UpdateRequestExt;
+import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.Diagnostics;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.util.AdjustableSemaphore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,469 +41,207 @@ import org.slf4j.LoggerFactory;
 public class SolrCmdDistributor {
   private static final int MAX_RETRIES_ON_FORWARD = 15;
   public static Logger log = LoggerFactory.getLogger(SolrCmdDistributor.class);
-
-  static AdjustableSemaphore semaphore = new AdjustableSemaphore(8);
-  
-  CompletionService<Request> completionService;
-  Set<Future<Request>> pending;
-  
-  int maxBufferedAddsPerServer = 10;
-  int maxBufferedDeletesPerServer = 10;
-
-  private Response response = new Response();
-  
-  private final Map<Node,List<AddRequest>> adds = new HashMap<Node,List<AddRequest>>();
-  private final Map<Node,List<DeleteRequest>> deletes = new HashMap<Node,List<DeleteRequest>>();
-  private UpdateShardHandler updateShardHandler;
   
-  class AddRequest {
-    AddUpdateCommand cmd;
-    ModifiableSolrParams params;
-  }
+  private StreamingSolrServers servers;
   
-  class DeleteRequest {
-    DeleteUpdateCommand cmd;
-    ModifiableSolrParams params;
-  }
+  private List<Error> allErrors = new ArrayList<Error>();
+  private List<Error> errors = new ArrayList<Error>();
   
   public static interface AbortCheck {
     public boolean abortCheck();
   }
   
-  public SolrCmdDistributor(int numHosts, UpdateShardHandler updateShardHandler) {
-    int maxPermits = Math.max(16, numHosts * 16);
-    // limits how many tasks can actually execute at once
-    if (maxPermits != semaphore.getMaxPermits()) {
-      semaphore.setMaxPermits(maxPermits);
-    }
-    
-    this.updateShardHandler = updateShardHandler;
-    completionService = new ExecutorCompletionService<Request>(updateShardHandler.getCmdDistribExecutor());
-    pending = new HashSet<Future<Request>>();
+  public SolrCmdDistributor(ExecutorService updateExecutor) {
+    servers = new StreamingSolrServers(updateExecutor);
   }
   
   public void finish() {
+    try {
+      servers.blockUntilFinished();
+      doRetriesIfNeeded();
+    } finally {
+      servers.shutdown();
+    }
+  }
 
-    flushAdds(1);
-    flushDeletes(1);
+  private void doRetriesIfNeeded() {
+    // NOTE: retries will be forwards to a single url
+    
+    List<Error> errors = new ArrayList<Error>(this.errors);
+    errors.addAll(servers.getErrors());
+    allErrors.addAll(errors);
+    boolean blockUntilFinishedAgain = false;
+    for (Error err : errors) {
+      String oldNodeUrl = err.req.node.getUrl();
+      
+      // if there is a retry url, we want to retry...
+      boolean isRetry = err.req.node.checkRetry();
+      boolean doRetry = false;
+      int rspCode = err.statusCode;
+      
+      if (testing_errorHook != null) Diagnostics.call(testing_errorHook, err.e);
+      
+      // this can happen in certain situations such as shutdown
+      if (isRetry) {
+        if (rspCode == 404 || rspCode == 403 || rspCode == 503
+            || rspCode == 500) {
+          doRetry = true;
+        }
+        
+        // if its an ioexception, lets try again
+        if (err.e instanceof IOException) {
+          doRetry = true;
+        } else if (err.e instanceof SolrServerException) {
+          if (((SolrServerException) err.e).getRootCause() instanceof IOException) {
+            doRetry = true;
+          }
+        }
+      }
+      
+      if (isRetry && err.req.retries < MAX_RETRIES_ON_FORWARD && doRetry) {
+        err.req.retries++;
 
-    checkResponses(true);
-  }
-  
-  public void distribDelete(DeleteUpdateCommand cmd, List<Node> urls, ModifiableSolrParams params) throws IOException {
-    checkResponses(false);
+        SolrException.log(SolrCmdDistributor.log, "forwarding update to "
+            + oldNodeUrl + " failed - retrying ... ");
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          log.warn(null, e);
+        }
+        
+        submit(err.req);
+        blockUntilFinishedAgain = true;
+      }
+    }
+    
+    servers.clearErrors();
+    this.errors.clear();
     
-    if (cmd.isDeleteById()) {
-      doDelete(cmd, urls, params);
-    } else {
-      doDelete(cmd, urls, params);
+    if (blockUntilFinishedAgain) {
+      servers.blockUntilFinished();
+      doRetriesIfNeeded();
     }
   }
   
-  public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
-    checkResponses(false);
+  public void distribDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
+    distribDelete(cmd, nodes, params, false);
+  }
+  
+  public void distribDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean sync) throws IOException {
     
-    // make sure any pending deletes are flushed
-    flushDeletes(1);
-
-    // TODO: this is brittle
-    // need to make a clone since these commands may be reused
-    AddUpdateCommand clone = new AddUpdateCommand(null);
-
-    clone.solrDoc = cmd.solrDoc;
-    clone.commitWithin = cmd.commitWithin;
-    clone.overwrite = cmd.overwrite;
-    clone.setVersion(cmd.getVersion());
-    AddRequest addRequest = new AddRequest();
-    addRequest.cmd = clone;
-    addRequest.params = params;
-
     for (Node node : nodes) {
-      List<AddRequest> alist = adds.get(node);
-      if (alist == null) {
-        alist = new ArrayList<AddRequest>(2);
-        adds.put(node, alist);
+      UpdateRequest uReq = new UpdateRequest();
+      uReq.setParams(params);
+      if (cmd.isDeleteById()) {
+        uReq.deleteById(cmd.getId(), cmd.getVersion());
+      } else {
+        uReq.deleteByQuery(cmd.query);
       }
-      alist.add(addRequest);
+      
+      submit(new Req(node, uReq, sync));
     }
-
-    flushAdds(maxBufferedAddsPerServer);
   }
-
-  /**
-   * Synchronous (blocking) add to specified node. Any error returned from node is propagated.
-   */
-  public void syncAdd(AddUpdateCommand cmd, Node node, ModifiableSolrParams params) throws IOException {
-    log.info("SYNCADD on {} : {}", node, cmd.getPrintableId());
-    checkResponses(false);
-    // flush all pending deletes
-    flushDeletes(1);
-    // flush all pending adds
-    flushAdds(1);
-    // finish with the pending requests
-    checkResponses(false);
-
-    UpdateRequestExt ureq = new UpdateRequestExt();
-    ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
-    ureq.setParams(params);
-    syncRequest(node, ureq);
-  }
-
-  public void syncDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
-    log.info("SYNCDELETE on {} : ", nodes, cmd);
-    checkResponses(false);
-    // flush all pending adds
-    flushAdds(1);
-    // flush all pending deletes
-    flushDeletes(1);
-    // finish pending requests
-    checkResponses(false);
-
-    DeleteUpdateCommand clonedCmd = clone(cmd);
-    DeleteRequest deleteRequest = new DeleteRequest();
-    deleteRequest.cmd = clonedCmd;
-    deleteRequest.params = params;
-
-    UpdateRequestExt ureq = new UpdateRequestExt();
-    if (cmd.isDeleteById()) {
-      ureq.deleteById(cmd.getId(), cmd.getVersion());
-    } else {
-      ureq.deleteByQuery(cmd.query);
-    }
-    ureq.setParams(params);
-    for (Node node : nodes) {
-      syncRequest(node, ureq);
-    }
+  
+  public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
+    distribAdd(cmd, nodes, params, false);
   }
+  
+  public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous) throws IOException {
 
-  private void syncRequest(Node node, UpdateRequestExt ureq) {
-    Request sreq = new Request();
-    sreq.node = node;
-    sreq.ureq = ureq;
-
-    String url = node.getUrl();
-    String fullUrl;
-    if (!url.startsWith("http://") && !url.startsWith("https://")) {
-      fullUrl = "http://" + url;
-    } else {
-      fullUrl = url;
-    }
-
-    HttpSolrServer server = new HttpSolrServer(fullUrl,
-        updateShardHandler.getHttpClient());
-
-    try {
-      sreq.ursp = server.request(ureq);
-    } catch (Exception e) {
-      throw new SolrException(ErrorCode.SERVER_ERROR, "Failed synchronous update on shard " + sreq.node + " update: " + ureq , e);
+    for (Node node : nodes) {
+      UpdateRequest uReq = new UpdateRequest();
+      uReq.setParams(params);
+      uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+      submit(new Req(node, uReq, synchronous));
     }
+    
   }
 
   public void distribCommit(CommitUpdateCommand cmd, List<Node> nodes,
       ModifiableSolrParams params) throws IOException {
     
-    // make sure we are ordered
-    flushAdds(1);
-    flushDeletes(1);
-
-    
-    // Wait for all outstanding responses to make sure that a commit
-    // can't sneak in ahead of adds or deletes we already sent.
-    // We could do this on a per-server basis, but it's more complex
-    // and this solution will lead to commits happening closer together.
-    checkResponses(true);
-    
-    // currently, we dont try to piggy back on outstanding adds or deletes
-    
-    UpdateRequestExt ureq = new UpdateRequestExt();
-    ureq.setParams(params);
-    
-    addCommit(ureq, cmd);
-    
-    log.info("Distrib commit to:" + nodes + " params:" + params);
+    // we need to do any retries before commit...
+    servers.blockUntilFinished();
+    doRetriesIfNeeded();
     
-    for (Node node : nodes) {
-      submit(ureq, node);
-    }
-    
-    // if the command wanted to block until everything was committed,
-    // then do that here.
+    UpdateRequest uReq = new UpdateRequest();
+    uReq.setParams(params);
     
-    if (cmd.waitSearcher) {
-      checkResponses(true);
-    }
-  }
-  
-  private void doDelete(DeleteUpdateCommand cmd, List<Node> nodes,
-      ModifiableSolrParams params) {
+    addCommit(uReq, cmd);
     
-    flushAdds(1);
+    log.debug("Distrib commit to:" + nodes + " params:" + params);
     
-    DeleteUpdateCommand clonedCmd = clone(cmd);
-    DeleteRequest deleteRequest = new DeleteRequest();
-    deleteRequest.cmd = clonedCmd;
-    deleteRequest.params = params;
     for (Node node : nodes) {
-      List<DeleteRequest> dlist = deletes.get(node);
-      
-      if (dlist == null) {
-        dlist = new ArrayList<DeleteRequest>(2);
-        deletes.put(node, dlist);
-      }
-      dlist.add(deleteRequest);
+      submit(new Req(node, uReq, false));
     }
     
-    flushDeletes(maxBufferedDeletesPerServer);
   }
   
-  void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
+  void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
     if (cmd == null) return;
     ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE
         : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes);
   }
-  
-  boolean flushAdds(int limit) {
-    // check for pending deletes
-  
-    Set<Node> removeNodes = new HashSet<Node>();
-    Set<Node> nodes = adds.keySet();
- 
-    for (Node node : nodes) {
-      List<AddRequest> alist = adds.get(node);
-      if (alist == null || alist.size() < limit) continue;
-  
-      UpdateRequestExt ureq = new UpdateRequestExt();
-      
-      ModifiableSolrParams combinedParams = new ModifiableSolrParams();
-      
-      for (AddRequest aReq : alist) {
-        AddUpdateCommand cmd = aReq.cmd;
-        combinedParams.add(aReq.params);
-       
-        ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
-      }
-      
-      if (ureq.getParams() == null) ureq.setParams(new ModifiableSolrParams());
-      ureq.getParams().add(combinedParams);
 
-      removeNodes.add(node);
-      
-      submit(ureq, node);
-    }
-    
-    for (Node node : removeNodes) {
-      adds.remove(node);
-    }
-    
-    return true;
-  }
-  
-  boolean flushDeletes(int limit) {
-    // check for pending deletes
- 
-    Set<Node> removeNodes = new HashSet<Node>();
-    Set<Node> nodes = deletes.keySet();
-    for (Node node : nodes) {
-      List<DeleteRequest> dlist = deletes.get(node);
-      if (dlist == null || dlist.size() < limit) continue;
-      UpdateRequestExt ureq = new UpdateRequestExt();
+  private void submit(Req req) {
+    if (req.synchronous) {
+      servers.blockUntilFinished();
+      doRetriesIfNeeded();
       
-      ModifiableSolrParams combinedParams = new ModifiableSolrParams();
-      
-      for (DeleteRequest dReq : dlist) {
-        DeleteUpdateCommand cmd = dReq.cmd;
-        combinedParams.add(dReq.params);
-        if (cmd.isDeleteById()) {
-          ureq.deleteById(cmd.getId(), cmd.getVersion());
-        } else {
-          ureq.deleteByQuery(cmd.query);
-        }
-        
-        if (ureq.getParams() == null) ureq
-            .setParams(new ModifiableSolrParams());
-        ureq.getParams().add(combinedParams);
+      HttpSolrServer server = new HttpSolrServer(req.node.getUrl(),
+          servers.getHttpClient());
+      try {
+        server.request(req.uReq);
+      } catch (Exception e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Failed synchronous update on shard " + req.node + " update: " + req.uReq , e);
+      } finally {
+        server.shutdown();
       }
       
-      removeNodes.add(node);
-      submit(ureq, node);
+      return;
     }
     
-    for (Node node : removeNodes) {
-      deletes.remove(node);
+    SolrServer solrServer = servers.getSolrServer(req);
+    try {
+      NamedList<Object> rsp = solrServer.request(req.uReq);
+    } catch (Exception e) {
+      SolrException.log(log, e);
+      Error error = new Error();
+      error.e = e;
+      error.req = req;
+      if (e instanceof SolrException) {
+        error.statusCode = ((SolrException) e).code();
+      }
+      errors.add(error);
     }
-    
-    return true;
-  }
-  
-  private DeleteUpdateCommand clone(DeleteUpdateCommand cmd) {
-    DeleteUpdateCommand c = (DeleteUpdateCommand)cmd.clone();
-    // TODO: shouldnt the clone do this?
-    c.setFlags(cmd.getFlags());
-    c.setVersion(cmd.getVersion());
-    return c;
   }
   
-  public static class Request {
+  public static class Req {
     public Node node;
-    UpdateRequestExt ureq;
-    NamedList<Object> ursp;
-    int rspCode;
-    public Exception exception;
-    int retries;
-  }
-  
-  void submit(UpdateRequestExt ureq, Node node) {
-    Request sreq = new Request();
-    sreq.node = node;
-    sreq.ureq = ureq;
-    submit(sreq);
-  }
-  
-  public void submit(final Request sreq) {
-
-    final String url = sreq.node.getUrl();
-
-    Callable<Request> task = new Callable<Request>() {
-      @Override
-      public Request call() throws Exception {
-        Request clonedRequest = null;
-        try {
-          clonedRequest = new Request();
-          clonedRequest.node = sreq.node;
-          clonedRequest.ureq = sreq.ureq;
-          clonedRequest.retries = sreq.retries;
-          
-          String fullUrl;
-          if (!url.startsWith("http://") && !url.startsWith("https://")) {
-            fullUrl = "http://" + url;
-          } else {
-            fullUrl = url;
-          }
-  
-          HttpSolrServer server = new HttpSolrServer(fullUrl,
-              updateShardHandler.getHttpClient());
-          
-          if (Thread.currentThread().isInterrupted()) {
-            clonedRequest.rspCode = 503;
-            clonedRequest.exception = new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Shutting down.");
-            return clonedRequest;
-          }
-          
-          clonedRequest.ursp = server.request(clonedRequest.ureq);
-          
-          // currently no way to get the request body.
-        } catch (Exception e) {
-          clonedRequest.exception = e;
-          if (e instanceof SolrException) {
-            clonedRequest.rspCode = ((SolrException) e).code();
-          } else {
-            clonedRequest.rspCode = -1;
-          }
-        } finally {
-          semaphore.release();
-        }
-        return clonedRequest;
-      }
-    };
-    try {
-      semaphore.acquire();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Update thread interrupted", e);
+    public UpdateRequest uReq;
+    public int retries;
+    public boolean synchronous;
+    
+    public Req(Node node, UpdateRequest uReq, boolean synchronous) {
+      this.node = node;
+      this.uReq = uReq;
+      this.synchronous = synchronous;
     }
-    try {
-      pending.add(completionService.submit(task));
-    } catch (RejectedExecutionException e) {
-      semaphore.release();
-      throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Shutting down", e);
-    }
-    
   }
+    
 
   public static Diagnostics.Callable testing_errorHook;  // called on error when forwarding request.  Currently data=[this, Request]
 
-  void checkResponses(boolean block) {
-
-    while (pending != null && pending.size() > 0) {
-      try {
-        Future<Request> future = block ? completionService.take()
-            : completionService.poll();
-        if (future == null) return;
-        pending.remove(future);
-        
-        try {
-          Request sreq = future.get();
-          if (sreq.rspCode != 0) {
-            // error during request
-
-            if (testing_errorHook != null) Diagnostics.call(testing_errorHook, this, sreq);
-
-            // if there is a retry url, we want to retry...
-            boolean isRetry = sreq.node.checkRetry();
-            boolean doRetry = false;
-            int rspCode = sreq.rspCode;
-            
-            // this can happen in certain situations such as shutdown
-            if (isRetry) {
-              if (rspCode == 404 || rspCode == 403 || rspCode == 503
-                  || rspCode == 500) {
-                doRetry = true;
-              }
-              
-              // if its an ioexception, lets try again
-              if (sreq.exception instanceof IOException) {
-                doRetry = true;
-              } else if (sreq.exception instanceof SolrServerException) {
-                if (((SolrServerException) sreq.exception).getRootCause() instanceof IOException) {
-                  doRetry = true;
-                }
-              }
-            }
-            
-            if (isRetry && sreq.retries < MAX_RETRIES_ON_FORWARD && doRetry) {
-              sreq.retries++;
-              sreq.rspCode = 0;
-              sreq.exception = null;
-              SolrException.log(SolrCmdDistributor.log, "forwarding update to " + sreq.node.getUrl() + " failed - retrying ... ");
-              Thread.sleep(500);
-              submit(sreq);
-            } else {
-              Exception e = sreq.exception;
-              Error error = new Error();
-              error.e = e;
-              error.node = sreq.node;
-              response.errors.add(error);
-              response.sreq = sreq;
-              SolrException.log(SolrCmdDistributor.log, "shard update error "
-                  + sreq.node, sreq.exception);
-            }
-          }
-          
-        } catch (ExecutionException e) {
-          // shouldn't happen since we catch exceptions ourselves
-          SolrException.log(SolrCore.log,
-              "error sending update request to shard", e);
-        }
-        
-      } catch (InterruptedException e) {
-        throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
-            "interrupted waiting for shard update response", e);
-      }
-    }
-  }
   
   public static class Response {
-    public Request sreq;
     public List<Error> errors = new ArrayList<Error>();
   }
   
   public static class Error {
-    public Node node;
     public Exception e;
-  }
-
-  public Response getResponse() {
-    return response;
+    public int statusCode;
+    public Req req;
   }
   
   public static abstract class Node {
@@ -595,6 +324,64 @@ public class SolrCmdDistributor {
     }
   }
   
+  // RetryNodes are used in the case of 'forward to leader' where we want
+  // to try the latest leader on a fail in the case the leader just went down.
+  public static class RetryNode extends StdNode {
+    
+    private ZkStateReader zkStateReader;
+    private String collection;
+    private String shardId;
+    
+    public RetryNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId) {
+      super(nodeProps);
+      this.zkStateReader = zkStateReader;
+      this.collection = collection;
+      this.shardId = shardId;
+    }
+
+    @Override
+    public boolean checkRetry() {
+      ZkCoreNodeProps leaderProps;
+      try {
+        leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderRetry(
+            collection, shardId));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return false;
+      }
+      
+      this.nodeProps = leaderProps;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result
+          + ((collection == null) ? 0 : collection.hashCode());
+      result = prime * result + ((shardId == null) ? 0 : shardId.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (!super.equals(obj)) return false;
+      if (getClass() != obj.getClass()) return false;
+      RetryNode other = (RetryNode) obj;
+      if (nodeProps.getCoreUrl() == null) {
+        if (other.nodeProps.getCoreUrl() != null) return false;
+      } else if (!nodeProps.getCoreUrl().equals(other.nodeProps.getCoreUrl())) return false;
+
+      return true;
+    }
+  }
+
+  public List<Error> getErrors() {
+    return allErrors;
+  }
 }
 
 

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java Mon Oct 21 18:58:24 2013
@@ -17,7 +17,6 @@
 
 package org.apache.solr.update;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.lucene.index.*;
 import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
 import org.apache.lucene.util.InfoStream;
@@ -34,7 +33,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.util.List;
@@ -166,8 +164,8 @@ public class SolrIndexConfig {
       }
     }
     mergedSegmentWarmerInfo = getPluginInfo(prefix + "/mergedSegmentWarmer", solrConfig, def.mergedSegmentWarmerInfo);
-    if (mergedSegmentWarmerInfo != null && solrConfig.reopenReaders == false) {
-      throw new IllegalArgumentException("Supplying a mergedSegmentWarmer will do nothing since reopenReaders is false");
+    if (mergedSegmentWarmerInfo != null && solrConfig.nrtMode == false) {
+      throw new IllegalArgumentException("Supplying a mergedSegmentWarmer will do nothing since nrtMode is false");
     }
   }
 

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java Mon Oct 21 18:58:24 2013
@@ -31,12 +31,11 @@ import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.CharsRef;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.OpenBitSet;
+import org.apache.solr.common.cloud.CompositeIdRouter;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.HashBasedRouter;
-import org.apache.solr.common.util.Hash;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.schema.SchemaField;
-import org.apache.solr.schema.StrField;
 import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.util.RefCounted;
 import org.slf4j.Logger;
@@ -59,10 +58,11 @@ public class SolrIndexSplitter {
   HashBasedRouter hashRouter;
   int numPieces;
   int currPartition = 0;
+  String routeFieldName;
+  String splitKey;
 
   public SolrIndexSplitter(SplitIndexCommand cmd) {
     searcher = cmd.getReq().getSearcher();
-    field = searcher.getSchema().getUniqueKeyField();
     ranges = cmd.ranges;
     paths = cmd.paths;
     cores = cmd.cores;
@@ -75,6 +75,15 @@ public class SolrIndexSplitter {
       numPieces = ranges.size();
       rangesArr = ranges.toArray(new DocRouter.Range[ranges.size()]);
     }
+    routeFieldName = cmd.routeFieldName;
+    if (routeFieldName == null) {
+      field = searcher.getSchema().getUniqueKeyField();
+    } else  {
+      field = searcher.getSchema().getField(routeFieldName);
+    }
+    if (cmd.splitKey != null) {
+      splitKey = getRouteKey(cmd.splitKey);
+    }
   }
 
   public void split() throws IOException {
@@ -170,11 +179,20 @@ public class SolrIndexSplitter {
       idRef = field.getType().indexedToReadable(term, idRef);
       String idString = idRef.toString();
 
+      if (splitKey != null) {
+        // todo have composite routers support these kind of things instead
+        String part1 = getRouteKey(idString);
+        if (part1 == null)
+          continue;
+        if (!splitKey.equals(part1))  {
+          continue;
+        }
+      }
+
       int hash = 0;
       if (hashRouter != null) {
-        hash = hashRouter.sliceHash(idString, null, null);
+        hash = hashRouter.sliceHash(idString, null, null, null);
       }
-      // int hash = Hash.murmurhash3_x86_32(ref, ref.offset, ref.length, 0);
 
       docsEnum = termsEnum.docs(liveDocs, docsEnum, DocsEnum.FLAG_NONE);
       for (;;) {
@@ -196,6 +214,22 @@ public class SolrIndexSplitter {
     return docSets;
   }
 
+  private String getRouteKey(String idString) {
+    int idx = idString.indexOf(CompositeIdRouter.separator);
+    if (idx <= 0) return null;
+    String part1 = idString.substring(0, idx);
+    int commaIdx = part1.indexOf(CompositeIdRouter.bitsSeparator);
+    if (commaIdx > 0) {
+      if (commaIdx + 1 < part1.length())  {
+        char ch = part1.charAt(commaIdx + 1);
+        if (ch >= '0' && ch <= '9') {
+          part1 = part1.substring(0, commaIdx);
+        }
+      }
+    }
+    return part1;
+  }
+
 
   // change livedocs on the reader to delete those docs we don't want
   static class LiveDocsReader extends FilterAtomicReader {

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java Mon Oct 21 18:58:24 2013
@@ -24,7 +24,7 @@ import org.apache.solr.request.SolrQuery
 import java.util.List;
 
 /**
- * A merge indexes command encapsulated in an object.
+ * A split index command encapsulated in an object.
  *
  * @since solr 1.4
  *
@@ -35,13 +35,17 @@ public class SplitIndexCommand extends U
   public List<SolrCore> cores;  // either paths or cores should be specified
   public List<DocRouter.Range> ranges;
   public DocRouter router;
+  public String routeFieldName;
+  public String splitKey;
 
-  public SplitIndexCommand(SolrQueryRequest req, List<String> paths,  List<SolrCore> cores, List<DocRouter.Range> ranges, DocRouter router) {
+  public SplitIndexCommand(SolrQueryRequest req, List<String> paths, List<SolrCore> cores, List<DocRouter.Range> ranges, DocRouter router, String routeFieldName, String splitKey) {
     super(req);
     this.paths = paths;
     this.cores = cores;
     this.ranges = ranges;
     this.router = router;
+    this.routeFieldName = routeFieldName;
+    this.splitKey = splitKey;
   }
 
   @Override
@@ -56,6 +60,12 @@ public class SplitIndexCommand extends U
     sb.append(",cores=" + cores);
     sb.append(",ranges=" + ranges);
     sb.append(",router=" + router);
+    if (routeFieldName != null) {
+      sb.append(",routeFieldName=" + routeFieldName);
+    }
+    if (splitKey != null) {
+      sb.append(",split.key=" + splitKey);
+    }
     sb.append('}');
     return sb.toString();
   }

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/UpdateCommand.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/UpdateCommand.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/UpdateCommand.java Mon Oct 21 18:58:24 2013
@@ -17,7 +17,6 @@
 
 package org.apache.solr.update;
 
-import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.request.SolrQueryRequest;
 
 

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/VersionInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/VersionInfo.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/VersionInfo.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/VersionInfo.java Mon Oct 21 18:58:24 2013
@@ -49,7 +49,7 @@ public class VersionInfo {
    */
   public static SchemaField getAndCheckVersionField(IndexSchema schema) 
     throws SolrException {
-    final String errPrefix = VERSION_FIELD + "field must exist in schema, using indexed=\"true\" stored=\"true\" and multiValued=\"false\"";
+    final String errPrefix = VERSION_FIELD + " field must exist in schema, using indexed=\"true\" stored=\"true\" and multiValued=\"false\"";
     SchemaField sf = schema.getFieldOrNull(VERSION_FIELD);
 
     if (null == sf) {

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java Mon Oct 21 18:58:24 2013
@@ -217,7 +217,7 @@ public class AddSchemaFieldsUpdateProces
       String fieldType = fieldTypeObj.toString();
 
       Collection<String> valueClasses
-          = FieldMutatingUpdateProcessorFactory.oneOrMany(typeMappingNamedList, VALUE_CLASS_PARAM);
+          = typeMappingNamedList.removeConfigArgs(VALUE_CLASS_PARAM);
       if (valueClasses.isEmpty()) {
         throw new SolrException(SERVER_ERROR, 
             "Each '" + TYPE_MAPPING_PARAM + "' <lst/> must contain at least one '" + VALUE_CLASS_PARAM + "' <str>");

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/CloneFieldUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/CloneFieldUpdateProcessorFactory.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/CloneFieldUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/CloneFieldUpdateProcessorFactory.java Mon Oct 21 18:58:24 2013
@@ -176,7 +176,7 @@ public class CloneFieldUpdateProcessorFa
     } else {
       // source better be one or more strings
       srcInclusions.fieldName = new HashSet<String>
-        (FieldMutatingUpdateProcessorFactory.oneOrMany(args, "source"));
+        (args.removeConfigArgs("source"));
     }
 
     

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Mon Oct 21 18:58:24 2013
@@ -17,8 +17,10 @@ package org.apache.solr.update.processor
  * limitations under the License.
  */
 
+import org.apache.http.client.HttpClient;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.CharsRef;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
 import org.apache.solr.client.solrj.impl.HttpSolrServer;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
 import org.apache.solr.cloud.CloudDescriptor;
@@ -43,7 +45,9 @@ import org.apache.solr.common.params.Upd
 import org.apache.solr.common.util.Hash;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
 import org.apache.solr.handler.component.RealTimeGetComponent;
+import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrRequestInfo;
 import org.apache.solr.response.SolrQueryResponse;
@@ -53,8 +57,9 @@ import org.apache.solr.update.AddUpdateC
 import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.DeleteUpdateCommand;
 import org.apache.solr.update.SolrCmdDistributor;
+import org.apache.solr.update.SolrCmdDistributor.Error;
 import org.apache.solr.update.SolrCmdDistributor.Node;
-import org.apache.solr.update.SolrCmdDistributor.Response;
+import org.apache.solr.update.SolrCmdDistributor.RetryNode;
 import org.apache.solr.update.SolrCmdDistributor.StdNode;
 import org.apache.solr.update.UpdateCommand;
 import org.apache.solr.update.UpdateHandler;
@@ -68,11 +73,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 
 import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
 
@@ -104,6 +111,17 @@ public class DistributedUpdateProcessor 
       }
     }
   }
+  
+  private final HttpClient client;
+  {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
+    params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 20);
+    params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 15000);
+    params.set(HttpClientUtil.PROP_SO_TIMEOUT, 60000);
+    params.set(HttpClientUtil.PROP_USE_RETRY, false);
+    client = HttpClientUtil.createClient(params);
+  }
 
   public static final String COMMIT_END_POINT = "commit_end_point";
   public static final String LOG_REPLAY = "log_replay";
@@ -142,10 +160,7 @@ public class DistributedUpdateProcessor 
   private boolean isSubShardLeader = false;
   private List<Node> nodes;
 
-  private int numNodes;
-
   private UpdateCommand updateCommand;  // the current command this processor is working on.
-
   
   public DistributedUpdateProcessor(SolrQueryRequest req,
       SolrQueryResponse rsp, UpdateRequestProcessor next) {
@@ -171,8 +186,7 @@ public class DistributedUpdateProcessor 
     this.zkEnabled  = coreDesc.getCoreContainer().isZooKeeperAware();
     zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
     if (zkEnabled) {
-      numNodes =  zkController.getZkStateReader().getClusterState().getLiveNodes().size();
-      cmdDistrib = new SolrCmdDistributor(numNodes, coreDesc.getCoreContainer().getZkController().getUpdateShardHandler());
+      cmdDistrib = new SolrCmdDistributor(coreDesc.getCoreContainer().getUpdateExecutor());
     }
     //this.rsp = reqInfo != null ? reqInfo.getRsp() : null;
 
@@ -181,6 +195,7 @@ public class DistributedUpdateProcessor 
     if (cloudDesc != null) {
       collection = cloudDesc.getCollectionName();
     }
+
   }
 
 
@@ -199,7 +214,6 @@ public class DistributedUpdateProcessor 
       String coreName = req.getCore().getName();
 
       ClusterState cstate = zkController.getClusterState();
-      numNodes = cstate.getLiveNodes().size();
       DocCollection coll = cstate.getCollection(collection);
       Slice slice = coll.getRouter().getTargetSlice(id, doc, req.getParams(), coll);
 
@@ -301,7 +315,7 @@ public class DistributedUpdateProcessor 
     // Am I the leader of a shard in "construction" state?
     String myShardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
     Slice mySlice = coll.getSlice(myShardId);
-    if (Slice.CONSTRUCTION.equals(mySlice.getState())) {
+    if (Slice.CONSTRUCTION.equals(mySlice.getState()) || Slice.RECOVERY.equals(mySlice.getState())) {
       Replica myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId);
       boolean amILeader = myLeader.getName().equals(
           req.getCore().getCoreDescriptor().getCloudDescriptor()
@@ -326,7 +340,7 @@ public class DistributedUpdateProcessor 
     Collection<Slice> allSlices = coll.getSlices();
     List<Node> nodes = null;
     for (Slice aslice : allSlices) {
-      if (Slice.CONSTRUCTION.equals(aslice.getState()))  {
+      if (Slice.CONSTRUCTION.equals(aslice.getState()) || Slice.RECOVERY.equals(aslice.getState()))  {
         DocRouter.Range myRange = coll.getSlice(shardId).getRange();
         if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
         boolean isSubset = aslice.getRange() != null && aslice.getRange().isSubsetOf(myRange);
@@ -358,9 +372,9 @@ public class DistributedUpdateProcessor 
     if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay
       String fromShard = req.getParams().get("distrib.from.parent");
       if (fromShard != null) {
-        if (!Slice.CONSTRUCTION.equals(mySlice.getState()))  {
+        if (Slice.ACTIVE.equals(mySlice.getState()))  {
           throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
-              "Request says it is coming from parent shard leader but we are not in construction state");
+              "Request says it is coming from parent shard leader but we are in active state");
         }
         // shard splitting case -- check ranges to see if we are a sub-shard
         Slice fromSlice = zkController.getClusterState().getCollection(collection).getSlice(fromShard);
@@ -451,7 +465,7 @@ public class DistributedUpdateProcessor 
             zkController.getBaseUrl(), req.getCore().getName()));
         params.set("distrib.from.parent", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
         for (Node subShardLeader : subShardLeaders) {
-          cmdDistrib.syncAdd(cmd, subShardLeader, params);
+          cmdDistrib.distribAdd(cmd, Collections.singletonList(subShardLeader), params, true);
         }
       }
     }
@@ -497,16 +511,16 @@ public class DistributedUpdateProcessor 
     // send in a background thread
 
     cmdDistrib.finish();
-    Response response = cmdDistrib.getResponse();
+    List<Error> errors = cmdDistrib.getErrors();
     // TODO - we may need to tell about more than one error...
     
     // if its a forward, any fail is a problem - 
     // otherwise we assume things are fine if we got it locally
     // until we start allowing min replication param
-    if (response.errors.size() > 0) {
+    if (errors.size() > 0) {
       // if one node is a RetryNode, this was a forward request
-      if (response.errors.get(0).node instanceof RetryNode) {
-        rsp.setException(response.errors.get(0).e);
+      if (errors.get(0).req.node instanceof RetryNode) {
+        rsp.setException(errors.get(0).e);
       }
       // else
       // for now we don't error - we assume if it was added locally, we
@@ -519,33 +533,46 @@ public class DistributedUpdateProcessor 
     // legit
 
     // TODO: we should do this in the background it would seem
-    for (SolrCmdDistributor.Error error : response.errors) {
-      if (error.node instanceof RetryNode) {
+    for (final SolrCmdDistributor.Error error : errors) {
+      if (error.req.node instanceof RetryNode) {
         // we don't try to force a leader to recover
         // when we cannot forward to it
         continue;
       }
       // TODO: we should force their state to recovering ??
-      // TODO: could be sent in parallel
       // TODO: do retries??
       // TODO: what if its is already recovering? Right now recoveries queue up -
       // should they?
-      String recoveryUrl = error.node.getBaseUrl();
-      HttpSolrServer server;
-      log.info("try and ask " + recoveryUrl + " to recover");
-      try {
-        server = new HttpSolrServer(recoveryUrl);
-        server.setSoTimeout(15000);
-        server.setConnectionTimeout(15000);
-        
-        RequestRecovery recoverRequestCmd = new RequestRecovery();
-        recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
-        recoverRequestCmd.setCoreName(error.node.getCoreName());
-        
-        server.request(recoverRequestCmd);
-      } catch (Exception e) {
-        log.info("Could not tell a replica to recover", e);
-      }
+      final String recoveryUrl = error.req.node.getBaseUrl();
+      
+      Thread thread = new Thread() {
+        {
+          setDaemon(true);
+        }
+        @Override
+        public void run() {
+          log.info("try and ask " + recoveryUrl + " to recover");
+          HttpSolrServer server = new HttpSolrServer(recoveryUrl);
+          try {
+            server.setSoTimeout(60000);
+            server.setConnectionTimeout(15000);
+            
+            RequestRecovery recoverRequestCmd = new RequestRecovery();
+            recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
+            recoverRequestCmd.setCoreName(error.req.node.getCoreName());
+            try {
+              server.request(recoverRequestCmd);
+            } catch (Throwable t) {
+              SolrException.log(log, recoveryUrl
+                  + ": Could not tell a replica to recover", t);
+            }
+          } finally {
+            server.shutdown();
+          }
+        }
+      };
+      ExecutorService executor = req.getCore().getCoreDescriptor().getCoreContainer().getUpdateExecutor();
+      executor.execute(thread);
       
     }
   }
@@ -838,7 +865,7 @@ public class DistributedUpdateProcessor 
         params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
             zkController.getBaseUrl(), req.getCore().getName()));
         params.set("distrib.from.parent", cloudDesc.getShardId());
-        cmdDistrib.syncDelete(cmd, subShardLeaders, params);
+        cmdDistrib.distribDelete(cmd, subShardLeaders, params, true);
       }
     }
 
@@ -1061,7 +1088,7 @@ public class DistributedUpdateProcessor 
       if (leaderLogic) {
         List<Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), null, null);
         if (subShardLeaders != null)  {
-          cmdDistrib.syncDelete(cmd, subShardLeaders, params);
+          cmdDistrib.distribDelete(cmd, subShardLeaders, params, true);
         }
         if (replicas != null) {
           cmdDistrib.distribDelete(cmd, replicas, params);
@@ -1286,61 +1313,6 @@ public class DistributedUpdateProcessor 
     }
     return urls;
   }
-  
-  // RetryNodes are used in the case of 'forward to leader' where we want
-  // to try the latest leader on a fail in the case the leader just went down.
-  public static class RetryNode extends StdNode {
-    
-    private ZkStateReader zkStateReader;
-    private String collection;
-    private String shardId;
-    
-    public RetryNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId) {
-      super(nodeProps);
-      this.zkStateReader = zkStateReader;
-      this.collection = collection;
-      this.shardId = shardId;
-    }
-
-    @Override
-    public boolean checkRetry() {
-      ZkCoreNodeProps leaderProps;
-      try {
-        leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderRetry(
-            collection, shardId));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        return false;
-      }
-      
-      this.nodeProps = leaderProps;
-
-      return true;
-    }
-
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = super.hashCode();
-      result = prime * result
-          + ((collection == null) ? 0 : collection.hashCode());
-      result = prime * result + ((shardId == null) ? 0 : shardId.hashCode());
-      return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) return true;
-      if (!super.equals(obj)) return false;
-      if (getClass() != obj.getClass()) return false;
-      RetryNode other = (RetryNode) obj;
-      if (nodeProps.getCoreUrl() == null) {
-        if (other.nodeProps.getCoreUrl() != null) return false;
-      } else if (!nodeProps.getCoreUrl().equals(other.nodeProps.getCoreUrl())) return false;
-
-      return true;
-    }
-  }
 
   /**
    * Returns a boolean indicating whether or not the caller should behave as

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/FieldMutatingUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/FieldMutatingUpdateProcessorFactory.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/FieldMutatingUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/FieldMutatingUpdateProcessorFactory.java Mon Oct 21 18:58:24 2013
@@ -17,8 +17,8 @@
 
 package org.apache.solr.update.processor;
 
-import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -27,10 +27,9 @@ import java.util.Set;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
-import org.apache.solr.core.SolrCore;
 import org.apache.solr.common.SolrException;
-import static org.apache.solr.common.SolrException.ErrorCode.*;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
 import org.apache.solr.util.plugin.SolrCoreAware;
 
 
@@ -133,18 +132,18 @@ public abstract class FieldMutatingUpdat
   protected final FieldMutatingUpdateProcessor.FieldNameSelector getSelector() {
     if (null != selector) return selector;
 
-    throw new SolrException(SERVER_ERROR, "selector was never initialized, "+
-                            " inform(SolrCore) never called???");
+    throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+        "selector was never initialized, inform(SolrCore) never called???");
   }
 
   public static SelectorParams parseSelectorParams(NamedList args) {
     SelectorParams params = new SelectorParams();
     
-    params.fieldName = new HashSet<String>(oneOrMany(args, "fieldName"));
-    params.typeName = new HashSet<String>(oneOrMany(args, "typeName"));
+    params.fieldName = new HashSet<String>(args.removeConfigArgs("fieldName"));
+    params.typeName = new HashSet<String>(args.removeConfigArgs("typeName"));
 
     // we can compile the patterns now
-    Collection<String> patterns = oneOrMany(args, "fieldRegex");
+    Collection<String> patterns = args.removeConfigArgs("fieldRegex");
     if (! patterns.isEmpty()) {
       params.fieldRegex = new ArrayList<Pattern>(patterns.size());
       for (String s : patterns) {
@@ -152,16 +151,17 @@ public abstract class FieldMutatingUpdat
           params.fieldRegex.add(Pattern.compile(s));
         } catch (PatternSyntaxException e) {
           throw new SolrException
-            (SERVER_ERROR, "Invalid 'fieldRegex' pattern: " + s, e);
+            (SolrException.ErrorCode.SERVER_ERROR,
+                "Invalid 'fieldRegex' pattern: " + s, e);
         }
       }
     }
     
     // resolve this into actual Class objects later
-    params.typeClass = oneOrMany(args, "typeClass");
+    params.typeClass = args.removeConfigArgs("typeClass");
 
-    // getBooleanArg() returns null if the arg is not specified
-    params.fieldNameMatchesSchemaField = getBooleanArg(args, "fieldNameMatchesSchemaField");
+    // Returns null if the arg is not specified
+    params.fieldNameMatchesSchemaField = args.removeBooleanArg("fieldNameMatchesSchemaField");
     
     return params;
   }
@@ -171,17 +171,17 @@ public abstract class FieldMutatingUpdat
     List<Object> excList = args.getAll("exclude");
     for (Object excObj : excList) {
       if (null == excObj) {
-        throw new SolrException
-            (SERVER_ERROR, "'exclude' init param can not be null");
+        throw new SolrException (SolrException.ErrorCode.SERVER_ERROR,
+            "'exclude' init param can not be null");
       }
       if (! (excObj instanceof NamedList) ) {
-        throw new SolrException
-            (SERVER_ERROR, "'exclude' init param must be <lst/>");
+        throw new SolrException (SolrException.ErrorCode.SERVER_ERROR,
+            "'exclude' init param must be <lst/>");
       }
       NamedList exc = (NamedList) excObj;
       exclusions.add(parseSelectorParams(exc));
       if (0 < exc.size()) {
-        throw new SolrException(SERVER_ERROR,
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
             "Unexpected 'exclude' init sub-param(s): '" +
                 args.getName(0) + "'");
       }
@@ -207,9 +207,8 @@ public abstract class FieldMutatingUpdat
     exclusions = parseSelectorExclusionParams(args);
 
     if (0 < args.size()) {
-      throw new SolrException(SERVER_ERROR, 
-                              "Unexpected init param(s): '" + 
-                              args.getName(0) + "'");
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Unexpected init param(s): '" + args.getName(0) + "'");
     }
 
   }
@@ -243,67 +242,4 @@ public abstract class FieldMutatingUpdat
     return FieldMutatingUpdateProcessor.SELECT_ALL_FIELDS;
 
   }
-
-  /**
-   * Removes all instance of the key from NamedList, returning the Set of 
-   * Strings that key referred to.  Throws an error if the key didn't refer
-   * to one or more strings (or arrays of strings)
-   * @exception SolrException invalid arr/str structure.
-   */
-  public static Collection<String> oneOrMany(final NamedList args, final String key) {
-    List<String> result = new ArrayList<String>(args.size() / 2);
-    final String err = "init arg '" + key + "' must be a string "
-      + "(ie: 'str'), or an array (ie: 'arr') containing strings; found: ";
-    
-    for (Object o = args.remove(key); null != o; o = args.remove(key)) {
-      if (o instanceof String) {
-        result.add((String)o);
-        continue;
-      }
-      
-      if (o instanceof Object[]) {
-        o = Arrays.asList((Object[]) o);
-      }
-      
-      if (o instanceof Collection) {
-        for (Object item : (Collection)o) {
-          if (! (item instanceof String)) {
-            throw new SolrException(SERVER_ERROR, err + item.getClass());
-          }
-          result.add((String)item);
-        }
-        continue;
-      }
-      
-      // who knows what the hell we have
-      throw new SolrException(SERVER_ERROR, err + o.getClass());
-    }
-    
-    return result;
-  }
-
-  /**
-   * Removes the first instance of the key from NamedList, returning the Boolean
-   * that key referred to, or null if the key is not specified.
-   * @exception SolrException invalid type or structure
-   */
-  public static Boolean getBooleanArg(final NamedList args, final String key) {
-    Boolean bool;
-    List values = args.getAll(key);
-    if (0 == values.size()) {
-      return null;
-    }
-    if (values.size() > 1) {
-      throw new SolrException(SERVER_ERROR, "Only one '" + key + "' is allowed");
-    }
-    Object o = args.remove(key);
-    if (o instanceof Boolean) {
-      bool = (Boolean)o;
-    } else if (o instanceof CharSequence) {
-      bool = Boolean.parseBoolean(o.toString());
-    } else {
-      throw new SolrException(SERVER_ERROR, "'" + key + "' must have type 'bool' or 'str'; found " + o.getClass());
-    }
-    return bool;
-  }
 }

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/ParseBooleanFieldUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/ParseBooleanFieldUpdateProcessorFactory.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/ParseBooleanFieldUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/ParseBooleanFieldUpdateProcessorFactory.java Mon Oct 21 18:58:24 2013
@@ -112,7 +112,7 @@ public class ParseBooleanFieldUpdateProc
       }
     }
 
-    Collection<String> trueValuesParam = oneOrMany(args, TRUE_VALUES_PARAM);
+    Collection<String> trueValuesParam = args.removeConfigArgs(TRUE_VALUES_PARAM);
     if ( ! trueValuesParam.isEmpty()) {
       trueValues.clear();
       for (String trueVal : trueValuesParam) {
@@ -120,7 +120,7 @@ public class ParseBooleanFieldUpdateProc
       }
     }
 
-    Collection<String> falseValuesParam = oneOrMany(args, FALSE_VALUES_PARAM);
+    Collection<String> falseValuesParam = args.removeConfigArgs(FALSE_VALUES_PARAM);
     if ( ! falseValuesParam.isEmpty()) {
       falseValues.clear();
       for (String val : falseValuesParam) {

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/ParseDateFieldUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/ParseDateFieldUpdateProcessorFactory.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/ParseDateFieldUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/ParseDateFieldUpdateProcessorFactory.java Mon Oct 21 18:58:24 2013
@@ -150,7 +150,7 @@ public class ParseDateFieldUpdateProcess
       defaultTimeZone = DateTimeZone.forID(defaultTimeZoneParam.toString());
     }
 
-    Collection<String> formatsParam = oneOrMany(args, FORMATS_PARAM);
+    Collection<String> formatsParam = args.removeConfigArgs(FORMATS_PARAM);
     if (null != formatsParam) {
       for (String value : formatsParam) {
         formats.put(value, DateTimeFormat.forPattern(value).withZone(defaultTimeZone).withLocale(locale));

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/StatelessScriptUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/StatelessScriptUpdateProcessorFactory.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/StatelessScriptUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/StatelessScriptUpdateProcessorFactory.java Mon Oct 21 18:58:24 2013
@@ -173,7 +173,7 @@ public class StatelessScriptUpdateProces
   @Override
   public void init(NamedList args) {
     Collection<String> scripts = 
-      FieldMutatingUpdateProcessorFactory.oneOrMany(args, SCRIPT_ARG);
+      args.removeConfigArgs(SCRIPT_ARG);
     if (scripts.isEmpty()) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
                               "StatelessScriptUpdateProcessorFactory must be " +

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/UniqFieldsUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/UniqFieldsUpdateProcessorFactory.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/UniqFieldsUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/UniqFieldsUpdateProcessorFactory.java Mon Oct 21 18:58:24 2013
@@ -23,6 +23,9 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.Map;
+
+import org.apache.solr.core.SolrCore;
 
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.util.NamedList;
@@ -30,77 +33,52 @@ import org.apache.solr.request.SolrQuery
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.update.AddUpdateCommand;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
- * A non-duplicate processor. Removes duplicates in the specified fields.
- * 
- * <pre class="prettyprint" >
- * &lt;updateRequestProcessorChain name="uniq-fields"&gt;
- *   &lt;processor class="org.apache.solr.update.processor.UniqFieldsUpdateProcessorFactory"&gt;
- *     &lt;lst name="fields"&gt;
- *       &lt;str&gt;uniq&lt;/str&gt;
- *       &lt;str&gt;uniq2&lt;/str&gt;
- *       &lt;str&gt;uniq3&lt;/str&gt;
- *     &lt;/lst&gt;      
- *   &lt;/processor&gt;
- *   &lt;processor class="solr.RunUpdateProcessorFactory" /&gt;
- * &lt;/updateRequestProcessorChain&gt;</pre>
+ * Removes duplicate values found in fields matching the specified conditions.  
+ * The existing field values are iterated in order, and values are removed when 
+ * they are equal to a value that has already been seen for this field.
+ * <p>
+ * By default this processor matches no fields.
+ * </p>
  * 
+ * <p>
+ * In the example configuration below, if a document initially contains the values 
+ * <code>"Steve","Lucy","Jim",Steve","Alice","Bob","Alice"</code> in a field named 
+ * <code>foo_uniq</code> then using this processor will result in the final list of 
+ * field values being <code>"Steve","Lucy","Jim","Alice","Bob"</code>
+ * </p>
+ * <pre class="prettyprint">
+ *  &lt;processor class="solr.UniqFieldsUpdateProcessorFactory"&gt;
+ *    &lt;str name="fieldRegex"&gt;.*_uniq&lt;/str&gt;
+ *  &lt;/processor&gt;
+ * </pre> 
  */
-public class UniqFieldsUpdateProcessorFactory extends UpdateRequestProcessorFactory {
+public class UniqFieldsUpdateProcessorFactory extends FieldValueSubsetUpdateProcessorFactory {
 
-  private Set<String> fields;
+  public final static Logger log = LoggerFactory.getLogger(UniqFieldsUpdateProcessorFactory.class);
 
-  @SuppressWarnings("unchecked")
-  @Override
-  public void init(@SuppressWarnings("rawtypes") NamedList args) {
-    NamedList<String> flst = (NamedList<String>)args.get("fields");
-    if(flst != null){
-      fields = new HashSet<String>();
-      for(int i = 0; i < flst.size(); i++){
-        fields.add(flst.getVal(i));
-      }
-    }
-  }
-  
   @Override
-  public UpdateRequestProcessor getInstance(SolrQueryRequest req,
-                                            SolrQueryResponse rsp,
-                                            UpdateRequestProcessor next) {
-    return new UniqFieldsUpdateProcessor(next, fields);
-  }
-  
-  public class UniqFieldsUpdateProcessor extends UpdateRequestProcessor {
+  public FieldMutatingUpdateProcessor.FieldNameSelector 
+    getDefaultSelector(final SolrCore core) {
     
-    private final Set<String> fields;
+    return FieldMutatingUpdateProcessor.SELECT_NO_FIELDS;
+  }
 
-    public UniqFieldsUpdateProcessor(UpdateRequestProcessor next, 
-                                              Set<String> fields) {
-      super(next);
-      this.fields = fields;
-    }
-    
-    @Override
-    public void processAdd(AddUpdateCommand cmd) throws IOException {
-      if(fields != null){
-        SolrInputDocument solrInputDocument = cmd.getSolrInputDocument();
-        List<Object> uniqList = new ArrayList<Object>();
-        for (String field : fields) {
-          uniqList.clear();
-          Collection<Object> col = solrInputDocument.getFieldValues(field);
-          if (col != null) {
-            for (Object o : col) {
-              if(!uniqList.contains(o))
-                uniqList.add(o);
-            }
-            solrInputDocument.remove(field);
-            for (Object o : uniqList) {
-              solrInputDocument.addField(field, o);
-            }
-          }    
-        }
+  @Override
+  @SuppressWarnings("unchecked")
+  public Collection pickSubset(Collection values) {
+    Set<Object> uniqs = new HashSet<Object>();
+    List<Object> result = new ArrayList<Object>(values.size());
+    for (Object o : values) {
+      if (!uniqs.contains(o)) {
+        uniqs.add(o);
+        result.add(o);
       }
-      super.processAdd(cmd);
     }
+    return result;
   }
 }
 

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java Mon Oct 21 18:58:24 2013
@@ -112,6 +112,8 @@ public final class UpdateRequestProcesso
       (null != info.name ? info.name : "") + "\"" + 
       (info.isDefault() ? " (default)" : "");
 
+    log.info("creating " + infomsg);
+
     // wrap in an ArrayList so we know we know we can do fast index lookups 
     // and that add(int,Object) is supported
     List<UpdateRequestProcessorFactory> list = new ArrayList

Modified: lucene/dev/branches/lucene4956/solr/core/src/test-files/solr/collection1/conf/schema-docValuesFaceting.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/test-files/solr/collection1/conf/schema-docValuesFaceting.xml?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/test-files/solr/collection1/conf/schema-docValuesFaceting.xml (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/test-files/solr/collection1/conf/schema-docValuesFaceting.xml Mon Oct 21 18:58:24 2013
@@ -26,17 +26,16 @@
   <fields>
     <field name="id"    type="string" indexed="true"  stored="true"  docValues="false" multiValued="false" required="true"/>
     <field name="id_dv" type="string" indexed="false" stored="false" docValues="true"  multiValued="false" required="true"/>
-    <!-- TODO: improve this test so we don't have to make all these DV types multivalued (for missing values) -->
     <dynamicField name="*_i"     type="int"    indexed="true"  stored="false" docValues="false"/>
-    <dynamicField name="*_i_dv"  type="int"    indexed="false" stored="false" docValues="true"  multiValued="true"/>  
+    <dynamicField name="*_i_dv"  type="int"    indexed="false" stored="false" docValues="true"/>  
     <dynamicField name="*_is"    type="int"    indexed="true"  stored="false" docValues="false" multiValued="true"/>
     <dynamicField name="*_is_dv" type="int"    indexed="false" stored="false" docValues="true"  multiValued="true"/>
-    <dynamicField name="*_s"     type="string" indexed="true"  stored="false" docValues="false" multiValued="true"/>
-    <dynamicField name="*_s_dv"  type="string" indexed="false" stored="false" docValues="true"  multiValued="true"/>
+    <dynamicField name="*_s"     type="string" indexed="true"  stored="false" docValues="false"/>
+    <dynamicField name="*_s_dv"  type="string" indexed="false" stored="false" docValues="true"/>
     <dynamicField name="*_ss"    type="string" indexed="true"  stored="false" docValues="false" multiValued="true"/>
     <dynamicField name="*_ss_dv" type="string" indexed="false" stored="false" docValues="true"  multiValued="true"/>
     <dynamicField name="*_f"     type="float"  indexed="true"  stored="false" docValues="false"/>
-    <dynamicField name="*_f_dv"  type="float"  indexed="false" stored="false" docValues="true"  multiValued="true"/>
+    <dynamicField name="*_f_dv"  type="float"  indexed="false" stored="false" docValues="true"/>
   </fields>
 
   <defaultSearchField>id</defaultSearchField>