You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/25 20:49:30 UTC

svn commit: r1235888 [4/12] - in /lucene/dev/trunk: dev-tools/eclipse/ dev-tools/maven/ solr/ solr/cloud-dev/ solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/da...

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java Wed Jan 25 19:49:26 2012
@@ -38,8 +38,11 @@ import org.apache.solr.schema.SchemaFiel
 import org.apache.solr.search.ReturnFields;
 import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.update.DocumentBuilder;
+import org.apache.solr.update.PeerSync;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.util.RefCounted;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.xml.transform.Transformer;
 import java.io.IOException;
@@ -47,14 +50,10 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
-/**
- * TODO!
- * 
- *
- * @since solr 1.3
- */
+
 public class RealTimeGetComponent extends SearchComponent
 {
+  public static Logger log = LoggerFactory.getLogger(UpdateLog.class);
   public static final String COMPONENT_NAME = "get";
 
   @Override
@@ -76,6 +75,18 @@ public class RealTimeGetComponent extend
       return;
     }
 
+    String val = params.get("getVersions");
+    if (val != null) {
+      processGetVersions(rb);
+      return;
+    }
+
+    val = params.get("getUpdates");
+    if (val != null) {
+      processGetUpdates(rb);
+      return;
+    }
+
     String id[] = params.getParams("id");
     String ids[] = params.getParams("ids");
 
@@ -142,7 +153,7 @@ public class RealTimeGetComponent extend
 
        // didn't find it in the update log, so it should be in the newest searcher opened
        if (searcher == null) {
-         searcherHolder =  req.getCore().getNewestSearcher(false);
+         searcherHolder = req.getCore().getRealtimeSearcher();
          searcher = searcherHolder.get();
        }
 
@@ -247,4 +258,112 @@ public class RealTimeGetComponent extend
   public URL[] getDocs() {
     return null;
   }
+
+
+
+  ///////////////////////////////////////////////////////////////////////////////////
+  // Returns last versions added to index
+  ///////////////////////////////////////////////////////////////////////////////////
+
+
+  public void processGetVersions(ResponseBuilder rb) throws IOException
+  {
+    SolrQueryRequest req = rb.req;
+    SolrQueryResponse rsp = rb.rsp;
+    SolrParams params = req.getParams();
+
+    if (!params.getBool(COMPONENT_NAME, true)) {
+      return;
+    }
+
+    int nVersions = params.getInt("getVersions", -1);
+    if (nVersions == -1) return;
+
+    String sync = params.get("sync");
+    if (sync != null) {
+      processSync(rb, nVersions, sync);
+      return;
+    }
+
+    UpdateLog ulog = req.getCore().getUpdateHandler().getUpdateLog();
+    if (ulog == null) return;
+
+    UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
+    try {
+      rb.rsp.add("versions", recentUpdates.getVersions(nVersions));
+    } finally {
+      recentUpdates.close();  // cache this somehow?
+    }
+  }
+
+  
+  public void processSync(ResponseBuilder rb, int nVersions, String sync) {
+    List<String> replicas = StrUtils.splitSmart(sync, ",", true);
+    
+    
+    PeerSync peerSync = new PeerSync(rb.req.getCore(), replicas, nVersions);
+    boolean success = peerSync.sync();
+    
+    // TODO: more complex response?
+    rb.rsp.add("sync", success);
+  }
+  
+
+  public void processGetUpdates(ResponseBuilder rb) throws IOException
+  {
+    SolrQueryRequest req = rb.req;
+    SolrQueryResponse rsp = rb.rsp;
+    SolrParams params = req.getParams();
+
+    if (!params.getBool(COMPONENT_NAME, true)) {
+      return;
+    }
+
+    String versionsStr = params.get("getUpdates");
+    if (versionsStr == null) return;
+
+    UpdateLog ulog = req.getCore().getUpdateHandler().getUpdateLog();
+    if (ulog == null) return;
+
+    List<String> versions = StrUtils.splitSmart(versionsStr, ",", true);
+
+    // TODO: get this from cache instead of rebuilding?
+    UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
+
+    List<Object> updates = new ArrayList<Object>(versions.size());
+
+    long minVersion = Long.MAX_VALUE;
+    
+    try {
+      for (String versionStr : versions) {
+        long version = Long.parseLong(versionStr);
+        try {
+          Object o = recentUpdates.lookup(version);
+          if (o == null) continue;
+
+          if (version > 0) {
+            minVersion = Math.min(minVersion, version);
+          }
+          
+          // TODO: do any kind of validation here?
+          updates.add(o);
+
+        } catch (SolrException e) {
+          log.warn("Exception reading log for updates", e);
+        } catch (ClassCastException e) {
+          log.warn("Exception reading log for updates", e);
+        }
+      }
+
+      // Must return all delete-by-query commands that occur after the first add requested
+      // since they may apply.
+      updates.addAll( recentUpdates.getDeleteByQuery(minVersion));
+
+      rb.rsp.add("updates", updates);
+
+    } finally {
+      recentUpdates.close();  // cache this somehow?
+    }
+  }
+
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java Wed Jan 25 19:49:26 2012
@@ -23,6 +23,7 @@ import org.apache.solr.common.params.Com
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.common.util.RTimer;
+import org.apache.solr.core.CloseHook;
 import org.apache.solr.core.PluginInfo;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.RequestHandlerBase;
@@ -133,11 +134,19 @@ public class SearchHandler extends Reque
       log.info("Adding  debug component:" + dbgCmp);
     }
     if(shfInfo ==null) {
-      Map m = new HashMap();
-      m.put("class",HttpShardHandlerFactory.class.getName());
-      shfInfo = new PluginInfo("shardHandlerFactory", m,null,Collections.<PluginInfo>emptyList());
+      shardHandlerFactory = core.getCoreDescriptor().getCoreContainer().getShardHandlerFactory();
+    } else {
+      shardHandlerFactory = core.createInitInstance(shfInfo, ShardHandlerFactory.class, null, null);
     }
-    shardHandlerFactory = core.createInitInstance(shfInfo, ShardHandlerFactory.class, null, null);
+    core.addCloseHook(new CloseHook() {
+      @Override
+      public void preClose(SolrCore core) {
+        shardHandlerFactory.close();
+      }
+      @Override
+      public void postClose(SolrCore core) {
+      }
+    });
   }
 
   public List<SearchComponent> getComponents() {
@@ -247,7 +256,7 @@ public class SearchHandler extends Reque
             for (String shard : sreq.actualShards) {
               ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);
               params.remove(ShardParams.SHARDS);      // not a top-level request
-              params.remove("distrib");               // not a top-level request
+              params.set("distrib", "false");               // not a top-level request
               params.remove("indent");
               params.remove(CommonParams.HEADER_ECHO_PARAMS);
               params.set(ShardParams.IS_SHARD, true);  // a sub (shard) request

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java Wed Jan 25 19:49:26 2012
@@ -20,4 +20,6 @@ package org.apache.solr.handler.componen
 public abstract class ShardHandlerFactory {
 
   public abstract ShardHandler getShardHandler();
+
+  public abstract void close();
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/request/SimpleFacets.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/request/SimpleFacets.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/request/SimpleFacets.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/request/SimpleFacets.java Wed Jan 25 19:49:26 2012
@@ -39,6 +39,7 @@ import org.apache.solr.schema.*;
 import org.apache.solr.search.*;
 import org.apache.solr.util.BoundedTreeSet;
 import org.apache.solr.util.DateMathParser;
+import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.handler.component.ResponseBuilder;
 import org.apache.solr.util.LongPriorityQueue;
 
@@ -327,6 +328,7 @@ public class SimpleFacets {
           Integer.MAX_VALUE,
           10, TimeUnit.SECONDS, // terminate idle threads after 10 sec
           new SynchronousQueue<Runnable>()  // directly hand off tasks
+          , new DefaultSolrThreadFactory("facetExectutor")
   );
   
   /**

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/FunctionRangeQParserPlugin.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/FunctionRangeQParserPlugin.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/FunctionRangeQParserPlugin.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/FunctionRangeQParserPlugin.java Wed Jan 25 19:49:26 2012
@@ -80,43 +80,3 @@ public class FunctionRangeQParserPlugin 
 
 }
 
-// This class works as either a normal constant score query, or as a PostFilter using a collector
-class FunctionRangeQuery extends SolrConstantScoreQuery implements PostFilter {
-  final ValueSourceRangeFilter rangeFilt;
-
-  public FunctionRangeQuery(ValueSourceRangeFilter filter) {
-    super(filter);
-    this.rangeFilt = filter;
-  }
-
-  @Override
-  public DelegatingCollector getFilterCollector(IndexSearcher searcher) {
-    Map fcontext = ValueSource.newContext(searcher);
-    return new FunctionRangeCollector(fcontext);
-  }
-
-  class FunctionRangeCollector extends DelegatingCollector {
-    final Map fcontext;
-    ValueSourceScorer scorer;
-    int maxdoc;
-
-    public FunctionRangeCollector(Map fcontext) {
-      this.fcontext = fcontext;
-    }
-
-    @Override
-    public void collect(int doc) throws IOException {
-      if (doc<maxdoc && scorer.matches(doc)) {
-        delegate.collect(doc);
-      }
-    }
-
-    @Override
-    public void setNextReader(IndexReader.AtomicReaderContext context) throws IOException {
-      maxdoc = context.reader.maxDoc();
-      FunctionValues dv = rangeFilt.getValueSource().getValues(fcontext, context);
-      scorer = dv.getRangeScorer(context.reader, rangeFilt.getLowerVal(), rangeFilt.getUpperVal(), rangeFilt.isIncludeLower(), rangeFilt.isIncludeUpper());
-      super.setNextReader(context);
-    }
-  }
-}

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/FunctionRangeQuery.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/FunctionRangeQuery.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/FunctionRangeQuery.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/FunctionRangeQuery.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.search;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.queries.function.FunctionValues;
+import org.apache.lucene.queries.function.ValueSource;
+import org.apache.lucene.queries.function.ValueSourceScorer;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.solr.search.function.ValueSourceRangeFilter;
+
+import java.io.IOException;
+import java.util.Map;
+
+// This class works as either a normal constant score query, or as a PostFilter using a collector
+public class FunctionRangeQuery extends SolrConstantScoreQuery implements PostFilter {
+  final ValueSourceRangeFilter rangeFilt;
+
+  public FunctionRangeQuery(ValueSourceRangeFilter filter) {
+    super(filter);
+    this.rangeFilt = filter;
+  }
+
+  @Override
+  public DelegatingCollector getFilterCollector(IndexSearcher searcher) {
+    Map fcontext = ValueSource.newContext(searcher);
+    return new FunctionRangeCollector(fcontext);
+  }
+
+  class FunctionRangeCollector extends DelegatingCollector {
+    final Map fcontext;
+    ValueSourceScorer scorer;
+    int maxdoc;
+
+    public FunctionRangeCollector(Map fcontext) {
+      this.fcontext = fcontext;
+    }
+
+    @Override
+    public void collect(int doc) throws IOException {
+      if (doc<maxdoc && scorer.matches(doc)) {
+        delegate.collect(doc);
+      }
+    }
+
+    @Override
+    public void setNextReader(IndexReader.AtomicReaderContext context) throws IOException {
+      maxdoc = context.reader.maxDoc();
+      FunctionValues dv = rangeFilt.getValueSource().getValues(fcontext, context);
+      scorer = dv.getRangeScorer(context.reader, rangeFilt.getLowerVal(), rangeFilt.getUpperVal(), rangeFilt.isIncludeLower(), rangeFilt.isIncludeUpper());
+      super.setNextReader(context);
+    }
+  }
+}

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java Wed Jan 25 19:49:26 2012
@@ -108,7 +108,7 @@ public class SolrIndexSearcher extends I
   private final Collection<String> fieldNames;
   private Collection<String> storedHighlightFieldNames;
   private DirectoryFactory directoryFactory;
-  
+
   public SolrIndexSearcher(SolrCore core, String path, IndexSchema schema, SolrIndexConfig config, String name, boolean enableCache, DirectoryFactory directoryFactory) throws IOException {
     // we don't need to reserve the directory because we get it from the factory
     this(core, schema,name, core.getIndexReaderFactory().newReader(directoryFactory.get(path, config.lockType)), true, enableCache, false, directoryFactory);
@@ -133,6 +133,8 @@ public class SolrIndexSearcher extends I
     if (dir instanceof FSDirectory) {
       FSDirectory fsDirectory = (FSDirectory) dir;
       indexDir = fsDirectory.getDirectory().getAbsolutePath();
+    } else {
+      log.warn("WARNING: Directory impl does not support setting indexDir: " + dir.getClass().getName());
     }
 
     this.closeReader = closeReader;
@@ -569,6 +571,37 @@ public class SolrIndexSearcher extends I
     return id == DocIdSetIterator.NO_MORE_DOCS ? -1 : id;
   }
 
+  /** lookup the docid by the unique key field, and return the id *within* the leaf reader in the low 32 bits, and the index of the leaf reader in the high 32 bits.
+   * -1 is returned if not found.
+   * @lucene.internal
+   */
+  public long lookupId(BytesRef idBytes) throws IOException {
+    String field = schema.getUniqueKeyField().getName();
+    final AtomicReaderContext[] leaves = leafContexts;
+
+
+    for (int i=0; i<leaves.length; i++) {
+      final AtomicReaderContext leaf = leaves[i];
+      final IndexReader reader = leaf.reader;
+
+      final Fields fields = reader.fields();
+      if (fields == null) continue;
+
+      final Bits liveDocs = reader.getLiveDocs();
+      
+      final DocsEnum docs = reader.termDocsEnum(liveDocs, field, idBytes, false);
+
+      if (docs == null) continue;
+      int id = docs.nextDoc();
+      if (id == DocIdSetIterator.NO_MORE_DOCS) continue;
+      assert docs.nextDoc() == DocIdSetIterator.NO_MORE_DOCS;
+
+      return (((long)i) << 32) | id;
+    }
+
+    return -1;
+  }
+
 
   /**
    * Compute and cache the DocSet that matches a query.

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java Wed Jan 25 19:49:26 2012
@@ -62,10 +62,10 @@ public class SolrDispatchFilter implemen
 {
   final Logger log = LoggerFactory.getLogger(SolrDispatchFilter.class);
 
-  protected CoreContainer cores;
+  protected volatile CoreContainer cores;
+
   protected String pathPrefix = null; // strip this from the beginning of a path
   protected String abortErrorMessage = null;
-  protected String solrConfigFilename = null;
   protected final Map<SolrConfig, SolrRequestParsers> parsers = new WeakHashMap<SolrConfig, SolrRequestParsers>();
   protected final SolrRequestParsers adminRequestParser;
   
@@ -100,6 +100,10 @@ public class SolrDispatchFilter implemen
 
     log.info("SolrDispatchFilter.init() done");
   }
+  
+  public CoreContainer getCores() {
+    return cores;
+  }
 
   /** Method to override to change how CoreContainer initialization is performed. */
   protected CoreContainer.Initializer createInitializer() {
@@ -118,7 +122,13 @@ public class SolrDispatchFilter implemen
       ((HttpServletResponse)response).sendError( 500, abortErrorMessage );
       return;
     }
-
+    
+    if (this.cores == null) {
+      ((HttpServletResponse)response).sendError( 403, "Server is shutting down" );
+      return;
+    }
+    CoreContainer cores = this.cores;
+    
     if( request instanceof HttpServletRequest) {
       HttpServletRequest req = (HttpServletRequest)request;
       HttpServletResponse resp = (HttpServletResponse)response;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java Wed Jan 25 19:49:26 2012
@@ -45,13 +45,20 @@ public class AddUpdateCommand extends Up
    public int commitWithin = -1;
    
    public AddUpdateCommand(SolrQueryRequest req) {
-     super("add", req);
+     super(req);
    }
 
+  @Override
+  public String name() {
+    return "add";
+  }
+
    /** Reset state to reuse this object with a different document in the same request */
    public void clear() {
      solrDoc = null;
      indexedId = null;
+     updateTerm = null;
+     version = 0;
    }
 
    public SolrInputDocument getSolrInputDocument() {
@@ -91,6 +98,10 @@ public class AddUpdateCommand extends Up
      return indexedId;
    }
 
+   public void setIndexedId(BytesRef indexedId) {
+     this.indexedId = indexedId;
+   }
+
    public String getPrintableId() {
      IndexSchema schema = req.getSchema();
      SchemaField sf = schema.getUniqueKeyField();
@@ -105,10 +116,11 @@ public class AddUpdateCommand extends Up
 
    @Override
   public String toString() {
-     StringBuilder sb = new StringBuilder(commandName);
-     sb.append(':');
-     if (indexedId !=null) sb.append("id=").append(indexedId);
+     StringBuilder sb = new StringBuilder(super.toString());
+     if (indexedId != null) sb.append(",id=").append(indexedId);
      if (!overwrite) sb.append(",overwrite=").append(overwrite);
+     if (commitWithin != -1) sb.append(",commitWithin=").append(commitWithin);
+     sb.append('}');
      return sb.toString();
    }
  }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CommitUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CommitUpdateCommand.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CommitUpdateCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CommitUpdateCommand.java Wed Jan 25 19:49:26 2012
@@ -37,16 +37,21 @@ public class CommitUpdateCommand extends
   public int maxOptimizeSegments = 1;
 
   public CommitUpdateCommand(SolrQueryRequest req, boolean optimize) {
-    super("commit", req);
+    super(req);
     this.optimize=optimize;
   }
+
+  @Override
+  public String name() {
+    return "commit";
+  }
+
   @Override
   public String toString() {
-    return prepareCommit ? "prepareCommit" :
-        ("commit(optimize="+optimize
+    return super.toString() + ",optimize="+optimize
             +",waitSearcher="+waitSearcher
             +",expungeDeletes="+expungeDeletes
             +",softCommit="+softCommit
-            +')');
+            +'}';
   }
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java Wed Jan 25 19:49:26 2012
@@ -20,14 +20,21 @@ package org.apache.solr.update;
 import java.io.IOException;
 
 import org.apache.lucene.index.IndexWriter;
+import org.apache.solr.cloud.RecoveryStrategy;
 import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.SolrCore;
 
 public final class DefaultSolrCoreState extends SolrCoreState {
+ 
+  private final Object recoveryLock = new Object();
   private int refCnt = 1;
   private SolrIndexWriter indexWriter = null;
   private DirectoryFactory directoryFactory;
 
+  private boolean recoveryRunning;
+  private RecoveryStrategy recoveryStrat;
+  private boolean closed = false;
+  
   public DefaultSolrCoreState(DirectoryFactory directoryFactory) {
     this.directoryFactory = directoryFactory;
   }
@@ -50,14 +57,23 @@ public final class DefaultSolrCoreState 
   }
 
   @Override
-  public synchronized void decref() throws IOException {
-    refCnt--;
-    if (refCnt == 0) {
-      if (indexWriter != null) {
-        indexWriter.close();
+  public  void decref(IndexWriterCloser closer) throws IOException {
+    boolean cancelRecovery = false;
+    synchronized (this) {
+      refCnt--;
+      if (refCnt == 0) {
+        if (closer != null) {
+          closer.closeWriter(indexWriter);
+        } else if (indexWriter != null) {
+          indexWriter.close();
+        }
+        directoryFactory.close();
+        closed = true;
+        cancelRecovery = true;
       }
-      directoryFactory.close();
     }
+    // don't wait for this in the sync block
+    if (cancelRecovery) cancelRecovery();
   }
 
   @Override
@@ -85,5 +101,43 @@ public final class DefaultSolrCoreState 
   public DirectoryFactory getDirectoryFactory() {
     return directoryFactory;
   }
+
+  @Override
+  public void doRecovery(SolrCore core) {
+    cancelRecovery();
+    synchronized (recoveryLock) {
+      while (recoveryRunning) {
+        try {
+          recoveryLock.wait(1000);
+        } catch (InterruptedException e) {
+
+        }
+        if (closed) return;
+      }
+      
+      recoveryStrat = new RecoveryStrategy(core);
+      recoveryStrat.start();
+      recoveryRunning = true;
+    }
+    
+  }
+  
+  @Override
+  public void cancelRecovery() {
+    synchronized (recoveryLock) {
+      if (recoveryStrat != null) {
+        recoveryStrat.close();
+        
+        try {
+          recoveryStrat.join();
+        } catch (InterruptedException e) {
+          
+        }
+        
+        recoveryRunning = false;
+        recoveryLock.notifyAll();
+      }
+    }
+  }
   
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java Wed Jan 25 19:49:26 2012
@@ -18,6 +18,8 @@
 package org.apache.solr.update;
 
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.CharsRef;
+import org.apache.solr.common.SolrInputField;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.schema.IndexSchema;
 import org.apache.solr.schema.SchemaField;
@@ -28,18 +30,28 @@ import org.apache.solr.schema.SchemaFiel
 public class DeleteUpdateCommand extends UpdateCommand {
   public String id;    // external (printable) id, for delete-by-id
   public String query; // query string for delete-by-query
-  private BytesRef indexedId;
+  public BytesRef indexedId;
   public int commitWithin = -1;
 
 
   public DeleteUpdateCommand(SolrQueryRequest req) {
-    super("delete", req);
+    super(req);
+  }
+
+  @Override
+  public String name() {
+    return "delete";
+  }
+
+  public boolean isDeleteById() {
+    return query == null;
   }
 
   public void clear() {
     id = null;
     query = null;
     indexedId = null;
+    version = 0;
   }
 
   /** Returns the indexed ID for this delete.  The returned BytesRef is retained across multiple calls, and should not be modified. */
@@ -55,14 +67,46 @@ public class DeleteUpdateCommand extends
     return indexedId;
   }
 
+  public String getId() {
+    if (id == null && indexedId != null) {
+      IndexSchema schema = req.getSchema();
+      SchemaField sf = schema.getUniqueKeyField();
+      if (sf != null) {
+        CharsRef ref = new CharsRef();
+        sf.getType().indexedToReadable(indexedId, ref);
+        id = ref.toString();
+      }
+    }
+    return id;
+  }
+
+  public String getQuery() {
+    return query;
+  }
+
+  public void setQuery(String query) {
+    this.query = query;
+  }
+
+  public void setIndexedId(BytesRef indexedId) {
+    this.indexedId = indexedId;
+    this.id = null;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+    this.indexedId = null;
+  }
 
   @Override
   public String toString() {
-    StringBuilder sb = new StringBuilder(commandName);
-    sb.append(':');
-    if (id!=null) sb.append("id=").append(id);
-    else sb.append("query=`").append(query).append('`');
+    StringBuilder sb = new StringBuilder(super.toString());
+    if (id!=null) sb.append(",id=").append(getId());
+    if (indexedId!=null) sb.append(",indexedId=").append(getId());
+    if (query != null) sb.append(",query=`").append(query).append('`');
     sb.append(",commitWithin=").append(commitWithin);
-    return sb.toString();
+     sb.append('}');
+     return sb.toString();
   }
+
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java Wed Jan 25 19:49:26 2012
@@ -32,6 +32,7 @@ import org.apache.lucene.document.Docume
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.Term;
+import org.apache.lucene.queries.function.ValueSource;
 import org.apache.lucene.queryparser.classic.ParseException;
 import org.apache.lucene.search.BooleanClause;
 import org.apache.lucene.search.BooleanClause.Occur;
@@ -45,8 +46,11 @@ import org.apache.solr.common.util.Named
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.core.SolrConfig.UpdateHandlerInfo;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.FunctionRangeQuery;
 import org.apache.solr.search.QParser;
-import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.search.QueryUtils;
+import org.apache.solr.search.function.ValueSourceRangeFilter;
 
 /**
  *  TODO: add soft commitWithin support
@@ -54,8 +58,8 @@ import org.apache.solr.search.SolrIndexS
  * <code>DirectUpdateHandler2</code> implements an UpdateHandler where documents are added
  * directly to the main Lucene index as opposed to adding to a separate smaller index.
  */
-public class DirectUpdateHandler2 extends UpdateHandler {
-  protected SolrCoreState solrCoreState;
+public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState.IndexWriterCloser {
+  protected final SolrCoreState solrCoreState;
   protected final Lock commitLock = new ReentrantLock();
 
   // stats
@@ -97,7 +101,7 @@ public class DirectUpdateHandler2 extend
   public DirectUpdateHandler2(SolrCore core, UpdateHandler updateHandler) throws IOException {
     super(core);
     if (updateHandler instanceof DirectUpdateHandler2) {
-      this.solrCoreState = ((DirectUpdateHandler2)updateHandler).solrCoreState;
+      this.solrCoreState = ((DirectUpdateHandler2) updateHandler).solrCoreState;
     } else {
       // the impl has changed, so we cannot use the old state - decref it
       updateHandler.decref();
@@ -115,7 +119,9 @@ public class DirectUpdateHandler2 extend
     softCommitTracker = new CommitTracker("Soft", core, softCommitDocsUpperBound, softCommitTimeUpperBound, true, true);
     
     this.ulog = updateHandler.getUpdateLog();
-    this.ulog.init(this, core);
+    if (this.ulog != null) {
+      this.ulog.init(this, core);
+    }
   }
 
   private void deleteAll() throws IOException {
@@ -170,14 +176,17 @@ public class DirectUpdateHandler2 extend
         // allow duplicates
         writer.addDocument(cmd.getLuceneDocument());
       }
+
       // Add to the transaction log *after* successfully adding to the index, if there was no error.
       // This ordering ensures that if we log it, it's definitely been added to the the index.
       // This also ensures that if a commit sneaks in-between, that we know everything in a particular
       // log version was definitely committed.
-      ulog.add(cmd);
+      if (ulog != null) ulog.add(cmd);
 
-      softCommitTracker.addedDocument( -1 ); // TODO: support commitWithin with soft update
-      commitTracker.addedDocument( cmd.commitWithin );
+      if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
+        commitTracker.addedDocument( cmd.commitWithin );
+        softCommitTracker.addedDocument( -1 ); // TODO: support commitWithin with soft update
+      }
 
       rc = 1;
     } finally {
@@ -207,14 +216,16 @@ public class DirectUpdateHandler2 extend
     writer.deleteDocuments(deleteTerm);
     // SolrCore.verbose("deleteDocuments",deleteTerm,"DONE");
 
-    ulog.delete(cmd);
- 
-    if (commitTracker.getTimeUpperBound() > 0) {
-      commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
-    } 
-    
-    if (softCommitTracker.getTimeUpperBound() > 0) {
-      softCommitTracker.scheduleCommitWithin(softCommitTracker.getTimeUpperBound());
+    if (ulog != null) ulog.delete(cmd);
+
+    if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
+      if (commitTracker.getTimeUpperBound() > 0) {
+        commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
+      }
+
+      if (softCommitTracker.getTimeUpperBound() > 0) {
+        softCommitTracker.scheduleCommitWithin(softCommitTracker.getTimeUpperBound());
+      }
     }
   }
 
@@ -227,24 +238,51 @@ public class DirectUpdateHandler2 extend
     try {
       Query q;
       try {
+        // TODO: move this higher in the stack?
         QParser parser = QParser.getParser(cmd.query, "lucene", cmd.req);
         q = parser.getQuery();
+        q = QueryUtils.makeQueryable(q);
+
+        // peer-sync can cause older deleteByQueries to be executed and could
+        // delete newer documents.  We prevent this by adding a clause restricting
+        // version.
+        if ((cmd.getFlags() & UpdateCommand.PEER_SYNC) != 0) {
+          BooleanQuery bq = new BooleanQuery();
+          bq.add(q, Occur.MUST);
+          SchemaField sf = core.getSchema().getField(VersionInfo.VERSION_FIELD);
+          ValueSource vs = sf.getType().getValueSource(sf, null);
+          ValueSourceRangeFilter filt = new ValueSourceRangeFilter(vs, null, Long.toString(Math.abs(cmd.version)), true, true);
+          FunctionRangeQuery range = new FunctionRangeQuery(filt);
+          bq.add(range, Occur.MUST);
+          q = bq;
+        }
+
+
+
       } catch (ParseException e) {
         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
       }
       
       boolean delAll = MatchAllDocsQuery.class == q.getClass();
-      
+
       commitTracker.deletedDocument(cmd.commitWithin);
+      
+      //
+      // synchronized to prevent deleteByQuery from running during the "open new searcher"
+      // part of a commit.  DBQ needs to signal that a fresh reader will be needed for
+      // a realtime view of the index.  When a new searcher is opened after a DBQ, that
+      // flag can be cleared.  If those thing happen concurrently, it's not thread safe.
+      //
+      synchronized (this) {
+        if (delAll) {
+          deleteAll();
+        } else {
+          solrCoreState.getIndexWriter(core).deleteDocuments(q);
+        }
 
-      if (delAll) {
-        deleteAll();
-      } else {
-        solrCoreState.getIndexWriter(core).deleteDocuments(q);
+        if (ulog != null) ulog.deleteByQuery(cmd);
       }
 
-      ulog.deleteByQuery(cmd);
-
       madeIt = true;
       
       if (commitTracker.getTimeUpperBound() > 0) {
@@ -345,7 +383,7 @@ public class DirectUpdateHandler2 extend
 
       if (!cmd.softCommit) {
         synchronized (this) { // sync is currently needed to prevent preCommit from being called between preSoft and postSoft... see postSoft comments.
-          ulog.preCommit(cmd);
+          if (ulog != null) ulog.preCommit(cmd);
         }
 
         // SolrCore.verbose("writer.commit() start writer=",writer);
@@ -363,23 +401,23 @@ public class DirectUpdateHandler2 extend
       }
 
 
-        if (cmd.softCommit) {
-          // ulog.preSoftCommit();
-          synchronized (this) {
-            ulog.preSoftCommit(cmd);
-            core.getSearcher(true,false,waitSearcher, true);
-            ulog.postSoftCommit(cmd);
-          }
-          // ulog.postSoftCommit();
-        } else {
-          synchronized (this) {
-            ulog.preSoftCommit(cmd);
-            core.getSearcher(true,false,waitSearcher);
-            ulog.postSoftCommit(cmd);
-          }
-          ulog.postCommit(cmd); // postCommit currently means new searcher has also been opened
+      if (cmd.softCommit) {
+        // ulog.preSoftCommit();
+        synchronized (this) {
+          if (ulog != null) ulog.preSoftCommit(cmd);
+          core.getSearcher(true, false, waitSearcher, true);
+          if (ulog != null) ulog.postSoftCommit(cmd);
         }
-
+        // ulog.postSoftCommit();
+      } else {
+        synchronized (this) {
+          if (ulog != null) ulog.preSoftCommit(cmd);
+          core.getSearcher(true, false, waitSearcher);
+          if (ulog != null) ulog.postSoftCommit(cmd);
+        }
+        if (ulog != null) ulog.postCommit(cmd); // postCommit currently means new searcher has
+                              // also been opened
+      }
 
       // reset commit tracking
 
@@ -418,25 +456,6 @@ public class DirectUpdateHandler2 extend
   }
 
   @Override
-  public SolrIndexSearcher reopenSearcher(SolrIndexSearcher previousSearcher) throws IOException {
-    
-    IndexReader currentReader = previousSearcher.getIndexReader();
-    IndexReader newReader;
-
-    IndexWriter writer = solrCoreState.getIndexWriter(core);
-    // SolrCore.verbose("start reopen from",previousSearcher,"writer=",writer);
-    newReader = IndexReader.openIfChanged(currentReader, writer, true);
-    // SolrCore.verbose("reopen result", newReader);
-    
-    if (newReader == null) {
-      currentReader.incRef();
-      newReader = currentReader;
-    }
-
-    return new SolrIndexSearcher(core, schema, "main", newReader, true, true, true, core.getDirectoryFactory());
-  }
-  
-  @Override
   public void newIndexWriter() throws IOException {
     solrCoreState.newIndexWriter(core);
   }
@@ -490,12 +509,44 @@ public class DirectUpdateHandler2 extend
 
     numDocsPending.set(0);
 
-    solrCoreState.decref();
-    
-    log.info("closed " + this);
+    solrCoreState.decref(this);
   }
 
 
+  public static boolean commitOnClose = true;  // TODO: make this a real config option?
+
+  // IndexWriterCloser interface method - called from solrCoreState.decref(this)
+  @Override
+  public void closeWriter(IndexWriter writer) throws IOException {
+    commitLock.lock();
+    try {
+      if (!commitOnClose) {
+        if (writer != null) {
+          writer.rollback();
+        }
+
+        // we shouldn't close the transaction logs either, but leaving them open
+        // means we can't delete them on windows.
+        if (ulog != null) ulog.close();
+
+        return;
+      }
+
+      if (writer != null) {
+        writer.close();
+      }
+
+      // if the writer hits an exception, it's OK (and perhaps desirable)
+      // to not close the ulog?
+
+      // Closing the log currently deletes the log file.
+      // If this changes, we should record this as a "commit".
+      if (ulog != null) ulog.close();
+    } finally {
+      commitLock.unlock();
+    }
+  }
+
   /////////////////////////////////////////////////////////////////////
   // SolrInfoMBean stuff: Statistics and Module Info
   /////////////////////////////////////////////////////////////////////
@@ -567,14 +618,15 @@ public class DirectUpdateHandler2 extend
     return "DirectUpdateHandler2" + getStatistics();
   }
   
-  public SolrCoreState getIndexWriterProvider() {
+  @Override
+  public SolrCoreState getSolrCoreState() {
     return solrCoreState;
   }
 
   @Override
   public void decref() {
     try {
-      solrCoreState.decref();
+      solrCoreState.decref(this);
     } catch (IOException e) {
       throw new SolrException(ErrorCode.SERVER_ERROR, "", e);
     }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/MergeIndexesCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/MergeIndexesCommand.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/MergeIndexesCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/MergeIndexesCommand.java Wed Jan 25 19:49:26 2012
@@ -31,20 +31,25 @@ public class MergeIndexesCommand extends
   public IndexReader[] readers;
 
   public MergeIndexesCommand(IndexReader[] readers, SolrQueryRequest req) {
-    super("mergeIndexes", req);
+    super(req);
     this.readers = readers;
   }
 
   @Override
+  public String name() {
+    return "mergeIndexes";
+  }
+
+  @Override
   public String toString() {
-    StringBuilder sb = new StringBuilder(commandName);
-    sb.append(':');
+    StringBuilder sb = new StringBuilder(super.toString());
     if (readers != null && readers.length > 0) {
       sb.append(readers[0].directory());
       for (int i = 1; i < readers.length; i++) {
         sb.append(",").append(readers[i].directory());
       }
     }
+    sb.append('}');
     return sb.toString();
   }
 }

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,429 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.httpclient.NoHttpResponseException;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.handler.component.ShardResponse;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.update.processor.DistributedUpdateProcessorFactory;
+import org.apache.solr.update.processor.RunUpdateProcessorFactory;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** @lucene.experimental */
+public class PeerSync  {
+  public static Logger log = LoggerFactory.getLogger(PeerSync.class);
+  public boolean debug = log.isDebugEnabled();
+
+  private List<String> replicas;
+  private int nUpdates;
+
+  private UpdateHandler uhandler;
+  private UpdateLog ulog;
+  private ShardHandlerFactory shardHandlerFactory;
+  private ShardHandler shardHandler;
+
+  private UpdateLog.RecentUpdates recentUpdates;
+
+  private List<Long> ourUpdates;
+  private Set<Long> ourUpdateSet;
+  private Set<Long> requestedUpdateSet;
+  private long ourLowThreshold;  // 20th percentile
+  private long ourHighThreshold; // 80th percentile
+
+  // comparator that sorts by absolute value, putting highest first
+  private static Comparator<Long> absComparator = new Comparator<Long>() {
+    @Override
+    public int compare(Long o1, Long o2) {
+      long l1 = Math.abs(o1);
+      long l2 = Math.abs(o2);
+      if (l1 >l2) return -1;
+      if (l1 < l2) return 1;
+      return 0;
+    }
+  };
+
+  // comparator that sorts update records by absolute value of version, putting lowest first
+  private static Comparator<Object> updateRecordComparator = new Comparator<Object>() {
+    @Override
+    public int compare(Object o1, Object o2) {
+      if (!(o1 instanceof List)) return 1;
+      if (!(o2 instanceof List)) return -1;
+
+      List lst1 = (List)o1;
+      List lst2 = (List)o2;
+
+      long l1 = Math.abs((Long)lst1.get(1));
+      long l2 = Math.abs((Long)lst2.get(1));
+
+      if (l1 >l2) return 1;
+      if (l1 < l2) return -1;
+      return 0;
+    }
+  };
+
+
+  private static class SyncShardRequest extends ShardRequest {
+    List<Long> reportedVersions;
+    List<Long> requestedUpdates;
+    Exception updateException;
+  }
+
+
+  public PeerSync(SolrCore core, List<String> replicas, int nUpdates) {
+    this.replicas = replicas;
+    this.nUpdates = nUpdates;
+
+    uhandler = core.getUpdateHandler();
+    ulog = uhandler.getUpdateLog();
+    shardHandlerFactory = core.getCoreDescriptor().getCoreContainer().getShardHandlerFactory();
+    shardHandler = shardHandlerFactory.getShardHandler();
+  }
+
+  public long percentile(List<Long> arr, float frac) {
+    int elem = (int) (arr.size() * frac);
+    return Math.abs(arr.get(elem));
+  }
+
+  /** Returns true if peer sync was successful, meaning that this core may not be considered to have the latest updates.
+   *  A commit is not performed.
+   */
+  public boolean sync() {
+    if (ulog == null) {
+      return false;
+    }
+
+    // fire off the requests before getting our own recent updates (for better concurrency)
+    for (String replica : replicas) {
+      requestVersions(replica);
+    }
+
+    recentUpdates = ulog.getRecentUpdates();
+    try {
+      ourUpdates = recentUpdates.getVersions(nUpdates);
+    } finally {
+      recentUpdates.close();
+    }
+    
+    
+    Collections.sort(ourUpdates, absComparator);
+
+    if (ourUpdates.size() > 0) {
+      ourLowThreshold = percentile(ourUpdates, 0.8f);
+      ourHighThreshold = percentile(ourUpdates, 0.2f);
+    }  else {
+      // we have no versions and hence no frame of reference to tell if we can use a peers
+      // updates to bring us into sync
+      return false;
+    }
+
+    ourUpdateSet = new HashSet<Long>(ourUpdates);
+    requestedUpdateSet = new HashSet<Long>(ourUpdates);
+
+    for(;;) {
+      ShardResponse srsp = shardHandler.takeCompletedOrError();
+      if (srsp == null) break;
+      boolean success = handleResponse(srsp);
+      if (!success) {
+        shardHandler.cancelAll();
+        return false;
+      }
+    }
+
+    return true;
+  }
+  
+  private void requestVersions(String replica) {
+    SyncShardRequest sreq = new SyncShardRequest();
+    sreq.purpose = 1;
+    // TODO: this sucks
+    if (replica.startsWith("http://"))
+      replica = replica.substring(7);
+    sreq.shards = new String[]{replica};
+    sreq.actualShards = sreq.shards;
+    sreq.params = new ModifiableSolrParams();
+    sreq.params.set("qt","/get");
+    sreq.params.set("distrib",false);
+    sreq.params.set("getVersions",nUpdates);
+    shardHandler.submit(sreq, replica, sreq.params);
+  }
+
+  private boolean handleResponse(ShardResponse srsp) {
+    if (srsp.getException() != null) {
+
+      // TODO: look at this more thoroughly - we don't want
+      // to fail on connection exceptions, but it may make sense
+      // to determine this based on the number of fails
+      if (srsp.getException() instanceof SolrServerException) {
+        Throwable solrException = ((SolrServerException) srsp.getException())
+            .getRootCause();
+        if (solrException instanceof ConnectException
+            || solrException instanceof NoHttpResponseException) {
+          return true;
+        }
+      }
+      // TODO: at least log???
+      // srsp.getException().printStackTrace(System.out);
+      
+      return false;
+    }
+
+    ShardRequest sreq = srsp.getShardRequest();
+    if (sreq.purpose == 1) {
+      return handleVersions(srsp);
+    } else {
+      return handleUpdates(srsp);
+    }
+  }
+  
+  private boolean handleVersions(ShardResponse srsp) {
+    // we retrieved the last N updates from the replica
+    List<Long> otherVersions = (List<Long>)srsp.getSolrResponse().getResponse().get("versions");
+    // TODO: how to handle short lists?
+
+    SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
+    sreq.reportedVersions =  otherVersions;
+
+    if (otherVersions.size() == 0) {
+      return true;
+    }
+
+    Collections.sort(otherVersions, absComparator);
+
+    long otherHigh = percentile(otherVersions, .2f);
+    long otherLow = percentile(otherVersions, .8f);
+
+    if (ourHighThreshold < otherLow) {
+      // Small overlap between version windows and ours is older
+      // This means that we might miss updates if we attempted to use this method.
+      // Since there exists just one replica that is so much newer, we must
+      // fail the sync.
+      return false;
+    }
+
+    if (ourLowThreshold > otherHigh) {
+      // Small overlap between windows and ours is newer.
+      // Using this list to sync would result in requesting/replaying results we don't need
+      // and possibly bringing deleted docs back to life.
+      return true;
+    }
+    
+    List<Long> toRequest = new ArrayList<Long>();
+    for (Long otherVersion : otherVersions) {
+      // stop when the entries get old enough that reorders may lead us to see updates we don't need
+      if (Math.abs(otherVersion) < ourLowThreshold) break;
+
+      if (ourUpdateSet.contains(otherVersion) || requestedUpdateSet.contains(otherVersion)) {
+        // we either have this update, or already requested it
+        continue;
+      }
+
+      toRequest.add(otherVersion);
+      requestedUpdateSet.add(otherVersion);
+    }
+
+    sreq.requestedUpdates = toRequest;
+
+    if (toRequest.isEmpty()) {
+      // we had (or already requested) all the updates referenced by the replica
+      return true;
+    }
+
+    return requestUpdates(srsp, toRequest);
+  }
+
+  private boolean requestUpdates(ShardResponse srsp, List<Long> toRequest) {
+    String replica = srsp.getShardRequest().shards[0];
+
+    log.info("Requesting updates from " + replica + " versions=" + toRequest);
+
+
+
+    // reuse our original request object
+    ShardRequest sreq = srsp.getShardRequest();
+
+    sreq.purpose = 0;
+    sreq.params = new ModifiableSolrParams();
+    sreq.params.set("qt","/get");
+    sreq.params.set("distrib",false);
+    sreq.params.set("getUpdates", StrUtils.join(toRequest, ','));
+    sreq.responses.clear();  // needs to be zeroed for correct correlation to occur
+
+    shardHandler.submit(sreq, sreq.shards[0], sreq.params);
+
+    return true;
+  }
+
+
+  private boolean handleUpdates(ShardResponse srsp) {
+    // we retrieved the last N updates from the replica
+    List<Object> updates = (List<Object>)srsp.getSolrResponse().getResponse().get("updates");
+
+    SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
+    if (updates.size() < sreq.requestedUpdates.size()) {
+      log.error("PeerSync: Requested " + sreq.requestedUpdates.size() + " updates from " + sreq.shards[0] + " but retrieved " + updates.size());
+      return false;
+    }
+
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(DistributedUpdateProcessor.SEEN_LEADER, true);
+    SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
+    SolrQueryResponse rsp = new SolrQueryResponse();
+
+    RunUpdateProcessorFactory runFac = new RunUpdateProcessorFactory();
+    DistributedUpdateProcessorFactory magicFac = new DistributedUpdateProcessorFactory();
+    runFac.init(new NamedList());
+    magicFac.init(new NamedList());
+
+    UpdateRequestProcessor proc = magicFac.getInstance(req, rsp, runFac.getInstance(req, rsp, null));
+
+    Collections.sort(updates, updateRecordComparator);
+
+    Object o = null;
+    long lastVersion = 0;
+    try {
+      // Apply oldest updates first
+      for (Object obj : updates) {
+        // should currently be a List<Oper,Ver,Doc/Id>
+        o = obj;
+        List<Object> entry = (List<Object>)o;
+
+        int oper = (Integer)entry.get(0);
+        long version = (Long) entry.get(1);
+        if (version == lastVersion && version != 0) continue;
+        lastVersion = version;
+
+        switch (oper) {
+          case UpdateLog.ADD:
+          {
+            // byte[] idBytes = (byte[]) entry.get(2);
+            SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
+            AddUpdateCommand cmd = new AddUpdateCommand(req);
+            // cmd.setIndexedId(new BytesRef(idBytes));
+            cmd.solrDoc = sdoc;
+            cmd.setVersion(version);
+            cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+            proc.processAdd(cmd);
+            break;
+          }
+          case UpdateLog.DELETE:
+          {
+            byte[] idBytes = (byte[]) entry.get(2);
+            DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+            cmd.setIndexedId(new BytesRef(idBytes));
+            cmd.setVersion(version);
+            cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+            proc.processDelete(cmd);
+            break;
+          }
+
+          case UpdateLog.DELETE_BY_QUERY:
+          {
+            String query = (String)entry.get(2);
+            DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+            cmd.query = query;
+            cmd.setVersion(version);
+            cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+            proc.processDelete(cmd);
+            break;
+          }
+
+          default:
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,  "Unknown Operation! " + oper);
+        }
+
+      }
+
+    }
+    catch (IOException e) {
+      // TODO: should this be handled separately as a problem with us?
+      // I guess it probably already will by causing replication to be kicked off.
+      sreq.updateException = e;
+      log.error("Error applying updates from " + sreq.shards + " ,update=" + o, e);
+      return false;
+    }
+    catch (Exception e) {
+      sreq.updateException = e;
+      log.error("Error applying updates from " + sreq.shards + " ,update=" + o, e);
+      return false;
+    }
+    finally {
+      try {
+        proc.finish();
+      } catch (Exception e) {
+        sreq.updateException = e;
+        log.error("Error applying updates from " + sreq.shards + " ,finish()", e);
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+
+
+  /** Requests and applies recent updates from peers */
+  public static void sync(SolrCore core, List<String> replicas, int nUpdates) {
+    UpdateHandler uhandler = core.getUpdateHandler();
+
+    ShardHandlerFactory shardHandlerFactory = core.getCoreDescriptor().getCoreContainer().getShardHandlerFactory();
+
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+   
+    for (String replica : replicas) {
+      ShardRequest sreq = new ShardRequest();
+      sreq.shards = new String[]{replica};
+      sreq.params = new ModifiableSolrParams();
+      sreq.params.set("qt","/get");
+      sreq.params.set("distrib", false);
+      sreq.params.set("getVersions",nUpdates);
+      shardHandler.submit(sreq, replica, sreq.params);
+    }
+    
+    for (String replica : replicas) {
+      ShardResponse srsp = shardHandler.takeCompletedOrError();
+    }
+
+
+  }
+  
+}
\ No newline at end of file

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/RollbackUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/RollbackUpdateCommand.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/RollbackUpdateCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/RollbackUpdateCommand.java Wed Jan 25 19:49:26 2012
@@ -26,7 +26,16 @@ import org.apache.solr.request.SolrQuery
 public class RollbackUpdateCommand extends UpdateCommand {
 
   public RollbackUpdateCommand(SolrQueryRequest req) {
-    super("rollback", req);
+    super(req);
   }
 
+  @Override
+  public String name() {
+    return "rollback";
+  }
+
+  @Override
+  public String toString() {
+    return super.toString() + '}';
+  }
 }

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,484 @@
+package org.apache.solr.update;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequestExt;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+
+
+
+
+public class SolrCmdDistributor {
+  // TODO: shut this thing down
+  // TODO: this cannot be per instance...
+  static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0,
+      Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+      new DefaultSolrThreadFactory("cmdDistribExecutor"));
+
+  static HttpClient client;
+  
+  static {
+    MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
+    mgr.getParams().setDefaultMaxConnectionsPerHost(8);
+    mgr.getParams().setMaxTotalConnections(200);
+    client = new HttpClient(mgr);
+  }
+  
+  CompletionService<Request> completionService;
+  Set<Future<Request>> pending;
+  
+  int maxBufferedAddsPerServer = 10;
+  int maxBufferedDeletesPerServer = 10;
+
+  private Response response = new Response();
+  
+  private final Map<Node,List<AddRequest>> adds = new HashMap<Node,List<AddRequest>>();
+  private final Map<Node,List<DeleteRequest>> deletes = new HashMap<Node,List<DeleteRequest>>();
+  
+  class AddRequest {
+    AddUpdateCommand cmd;
+    ModifiableSolrParams params;
+  }
+  
+  class DeleteRequest {
+    DeleteUpdateCommand cmd;
+    ModifiableSolrParams params;
+  }
+  
+  public SolrCmdDistributor() {
+   
+  }
+  
+  public void finish() {
+
+    // piggyback on any outstanding adds or deletes if possible.
+    flushAdds(1, null, null);
+    flushDeletes(1, null, null);
+
+    checkResponses(true);
+  }
+  
+  public void distribDelete(DeleteUpdateCommand cmd, List<Node> urls, ModifiableSolrParams params) throws IOException {
+    checkResponses(false);
+    
+    if (cmd.isDeleteById()) {
+      doDelete(cmd, urls, params);
+    } else {
+      doDelete(cmd, urls, params);
+    }
+  }
+  
+  public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams commitParams) throws IOException {
+    checkResponses(false);
+    
+    // make sure any pending deletes are flushed
+    flushDeletes(1, null, null);
+    
+    // TODO: this is brittle
+    // need to make a clone since these commands may be reused
+    AddUpdateCommand clone = new AddUpdateCommand(null);
+    
+    clone.solrDoc = cmd.solrDoc;
+    clone.commitWithin = cmd.commitWithin;
+    clone.overwrite = cmd.overwrite;
+    clone.setVersion(cmd.getVersion());
+    AddRequest addRequest = new AddRequest();
+    addRequest.cmd = clone;
+    addRequest.params = commitParams;
+
+    for (Node node : nodes) {
+      List<AddRequest> alist = adds.get(node);
+      if (alist == null) {
+        alist = new ArrayList<AddRequest>(2);
+        adds.put(node, alist);
+      }
+      alist.add(addRequest);
+    }
+    
+    flushAdds(maxBufferedAddsPerServer, null, null);
+  }
+  
+  public void distribCommit(CommitUpdateCommand cmd, List<Node> nodes,
+      ModifiableSolrParams params) throws IOException {
+    // Wait for all outstanding responses to make sure that a commit
+    // can't sneak in ahead of adds or deletes we already sent.
+    // We could do this on a per-server basis, but it's more complex
+    // and this solution will lead to commits happening closer together.
+    checkResponses(true);
+    
+    // currently, we dont try to piggy back on outstanding adds or deletes
+    
+    UpdateRequestExt ureq = new UpdateRequestExt();
+    ureq.setParams(params);
+    
+    addCommit(ureq, cmd);
+    
+    for (Node node : nodes) {
+      submit(ureq, node);
+    }
+    
+    // if the command wanted to block until everything was committed,
+    // then do that here.
+    
+    if (cmd.waitSearcher) {
+      checkResponses(true);
+    }
+  }
+  
+  private void doDelete(DeleteUpdateCommand cmd, List<Node> nodes,
+      ModifiableSolrParams params) throws IOException {
+    
+    flushAdds(1, null, null);
+    
+    DeleteUpdateCommand clonedCmd = clone(cmd);
+    DeleteRequest deleteRequest = new DeleteRequest();
+    deleteRequest.cmd = clonedCmd;
+    deleteRequest.params = params;
+    for (Node node : nodes) {
+      List<DeleteRequest> dlist = deletes.get(node);
+      
+      if (dlist == null) {
+        dlist = new ArrayList<DeleteRequest>(2);
+        deletes.put(node, dlist);
+      }
+      dlist.add(deleteRequest);
+    }
+    
+    flushDeletes(maxBufferedDeletesPerServer, null, null);
+  }
+  
+  void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
+    if (cmd == null) return;
+    ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE
+        : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher);
+  }
+  
+  boolean flushAdds(int limit, CommitUpdateCommand ccmd, ModifiableSolrParams commitParams) {
+    // check for pending deletes
+  
+    Set<Node> removeNodes = new HashSet<Node>();
+    Set<Node> nodes = adds.keySet();
+ 
+    for (Node node : nodes) {
+      List<AddRequest> alist = adds.get(node);
+      if (alist == null || alist.size() < limit) return false;
+  
+      UpdateRequestExt ureq = new UpdateRequestExt();
+      
+      addCommit(ureq, ccmd);
+      
+      ModifiableSolrParams combinedParams = new ModifiableSolrParams();
+      
+      for (AddRequest aReq : alist) {
+        AddUpdateCommand cmd = aReq.cmd;
+        combinedParams.add(aReq.params);
+       
+        ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+      }
+      
+      if (commitParams != null) combinedParams.add(commitParams);
+      if (ureq.getParams() == null) ureq.setParams(new ModifiableSolrParams());
+      ureq.getParams().add(combinedParams);
+
+      removeNodes.add(node);
+      
+      submit(ureq, node);
+    }
+    
+    for (Node node : removeNodes) {
+      adds.remove(node);
+    }
+    
+    return true;
+  }
+  
+  boolean flushDeletes(int limit, CommitUpdateCommand ccmd, ModifiableSolrParams commitParams) {
+    // check for pending deletes
+ 
+    Set<Node> removeNodes = new HashSet<Node>();
+    Set<Node> nodes = deletes.keySet();
+    for (Node node : nodes) {
+      List<DeleteRequest> dlist = deletes.get(node);
+      if (dlist == null || dlist.size() < limit) return false;
+      UpdateRequestExt ureq = new UpdateRequestExt();
+      
+      addCommit(ureq, ccmd);
+      
+      ModifiableSolrParams combinedParams = new ModifiableSolrParams();
+      
+      for (DeleteRequest dReq : dlist) {
+        DeleteUpdateCommand cmd = dReq.cmd;
+        combinedParams.add(dReq.params);
+        if (cmd.isDeleteById()) {
+          ureq.deleteById(cmd.getId(), cmd.getVersion());
+        } else {
+          ureq.deleteByQuery(cmd.query);
+        }
+        
+        if (commitParams != null) combinedParams.add(commitParams);
+        if (ureq.getParams() == null) ureq
+            .setParams(new ModifiableSolrParams());
+        ureq.getParams().add(combinedParams);
+      }
+      
+      removeNodes.add(node);
+      submit(ureq, node);
+    }
+    
+    for (Node node : removeNodes) {
+      deletes.remove(node);
+    }
+    
+    return true;
+  }
+  
+  private DeleteUpdateCommand clone(DeleteUpdateCommand cmd) {
+    DeleteUpdateCommand c = (DeleteUpdateCommand)cmd.clone();
+    // TODO: shouldnt the clone do this?
+    c.setFlags(cmd.getFlags());
+    c.setVersion(cmd.getVersion());
+    return c;
+  }
+  
+  public static class Request {
+    public Node node;
+    UpdateRequestExt ureq;
+    NamedList<Object> ursp;
+    int rspCode;
+    public Exception exception;
+    int retries;
+  }
+  
+  void submit(UpdateRequestExt ureq, Node node) {
+    Request sreq = new Request();
+    sreq.node = node;
+    sreq.ureq = ureq;
+    submit(sreq);
+  }
+  
+  public void submit(final Request sreq) {
+    if (completionService == null) {
+      completionService = new ExecutorCompletionService<Request>(commExecutor);
+      pending = new HashSet<Future<Request>>();
+    }
+    final String url = sreq.node.getUrl();
+
+    Callable<Request> task = new Callable<Request>() {
+      @Override
+      public Request call() throws Exception {
+        Request clonedRequest = new Request();
+        clonedRequest.node = sreq.node;
+        clonedRequest.ureq = sreq.ureq;
+        clonedRequest.retries = sreq.retries;
+        
+        try {
+          String fullUrl;
+          if (!url.startsWith("http://") && !url.startsWith("https://")) {
+            fullUrl = "http://" + url;
+          } else {
+            fullUrl = url;
+          }
+  
+          CommonsHttpSolrServer server = new CommonsHttpSolrServer(fullUrl,
+              client);
+          
+          clonedRequest.ursp = server.request(clonedRequest.ureq);
+          
+          // currently no way to get the request body.
+        } catch (Exception e) {
+          clonedRequest.exception = e;
+          if (e instanceof SolrException) {
+            clonedRequest.rspCode = ((SolrException) e).code();
+          } else {
+            clonedRequest.rspCode = -1;
+          }
+        }
+        return clonedRequest;
+      }
+    };
+    
+    pending.add(completionService.submit(task));
+    
+  }
+
+  void checkResponses(boolean block) {
+
+    while (pending != null && pending.size() > 0) {
+      try {
+        Future<Request> future = block ? completionService.take()
+            : completionService.poll();
+        if (future == null) return;
+        pending.remove(future);
+        
+        try {
+          Request sreq = future.get();
+          if (sreq.rspCode != 0) {
+            // error during request
+            
+            // if there is a retry url, we want to retry...
+            // TODO: but we really should only retry on connection errors...
+            if (sreq.retries < 5 && sreq.node.checkRetry()) {
+              sreq.retries++;
+              sreq.rspCode = 0;
+              sreq.exception = null;
+              Thread.sleep(500);
+              submit(sreq);
+              checkResponses(block);
+            } else {
+              Exception e = sreq.exception;
+              Error error = new Error();
+              error.e = e;
+              error.node = sreq.node;
+              response.errors.add(error);
+              response.sreq = sreq;
+              SolrException.log(SolrCore.log, "shard update error "
+                  + sreq.node, sreq.exception);
+            }
+          }
+          
+        } catch (ExecutionException e) {
+          // shouldn't happen since we catch exceptions ourselves
+          SolrException.log(SolrCore.log,
+              "error sending update request to shard", e);
+        }
+        
+      } catch (InterruptedException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+            "interrupted waiting for shard update response", e);
+      }
+    }
+  }
+  
+  public static class Response {
+    public Request sreq;
+    public List<Error> errors = new ArrayList<Error>();
+  }
+  
+  public static class Error {
+    public Node node;
+    public Exception e;
+  }
+
+  public Response getResponse() {
+    return response;
+  }
+  
+  public static abstract class Node {
+    public abstract String getUrl();
+    public abstract boolean checkRetry();
+    public abstract String getCoreName();
+    public abstract String getBaseUrl();
+    public abstract ZkCoreNodeProps getNodeProps();
+  }
+
+  public static class StdNode extends Node {
+    protected String url;
+    protected String baseUrl;
+    protected String coreName;
+    private ZkCoreNodeProps nodeProps;
+
+    public StdNode(ZkCoreNodeProps nodeProps) {
+      this.url = nodeProps.getCoreUrl();
+      this.baseUrl = nodeProps.getBaseUrl();
+      this.coreName = nodeProps.getCoreName();
+      this.nodeProps = nodeProps;
+    }
+    
+    @Override
+    public String getUrl() {
+      return url;
+    }
+    
+    @Override
+    public String toString() {
+      return this.getClass().getSimpleName() + ": " + url;
+    }
+
+    @Override
+    public boolean checkRetry() {
+      return false;
+    }
+
+    @Override
+    public String getBaseUrl() {
+      return baseUrl;
+    }
+
+    @Override
+    public String getCoreName() {
+      return coreName;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((baseUrl == null) ? 0 : baseUrl.hashCode());
+      result = prime * result + ((coreName == null) ? 0 : coreName.hashCode());
+      result = prime * result + ((url == null) ? 0 : url.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (obj == null) return false;
+      if (getClass() != obj.getClass()) return false;
+      StdNode other = (StdNode) obj;
+      if (baseUrl == null) {
+        if (other.baseUrl != null) return false;
+      } else if (!baseUrl.equals(other.baseUrl)) return false;
+      if (coreName == null) {
+        if (other.coreName != null) return false;
+      } else if (!coreName.equals(other.coreName)) return false;
+      if (url == null) {
+        if (other.url != null) return false;
+      } else if (!url.equals(other.url)) return false;
+      return true;
+    }
+
+    public ZkCoreNodeProps getNodeProps() {
+      return nodeProps;
+    }
+  }
+}

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCoreState.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCoreState.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCoreState.java Wed Jan 25 19:49:26 2012
@@ -49,11 +49,12 @@ public abstract class SolrCoreState {
   
   /**
    * Decrement the number of references to this state. When then number of
-   * references hits 0, the state will close.
+   * references hits 0, the state will close.  If an optional closer is
+   * passed, that will be used to close the writer.
    * 
    * @throws IOException
    */
-  public abstract void decref() throws IOException;
+  public abstract void decref(IndexWriterCloser closer) throws IOException;
   
   /**
    * Increment the number of references to this state.
@@ -73,5 +74,14 @@ public abstract class SolrCoreState {
    * @return the {@link DirectoryFactory} that should be used.
    */
   public abstract DirectoryFactory getDirectoryFactory();
+
+
+  public interface IndexWriterCloser {
+    public void closeWriter(IndexWriter writer) throws IOException;
+  }
+
+  public abstract void doRecovery(SolrCore core);
   
+  public abstract void cancelRecovery();
+
 }