You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/25 20:49:30 UTC
svn commit: r1235888 [4/12] - in /lucene/dev/trunk: dev-tools/eclipse/
dev-tools/maven/ solr/ solr/cloud-dev/
solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/
solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/da...
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java Wed Jan 25 19:49:26 2012
@@ -38,8 +38,11 @@ import org.apache.solr.schema.SchemaFiel
import org.apache.solr.search.ReturnFields;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.DocumentBuilder;
+import org.apache.solr.update.PeerSync;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.RefCounted;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.xml.transform.Transformer;
import java.io.IOException;
@@ -47,14 +50,10 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.List;
-/**
- * TODO!
- *
- *
- * @since solr 1.3
- */
+
public class RealTimeGetComponent extends SearchComponent
{
+ public static Logger log = LoggerFactory.getLogger(UpdateLog.class);
public static final String COMPONENT_NAME = "get";
@Override
@@ -76,6 +75,18 @@ public class RealTimeGetComponent extend
return;
}
+ String val = params.get("getVersions");
+ if (val != null) {
+ processGetVersions(rb);
+ return;
+ }
+
+ val = params.get("getUpdates");
+ if (val != null) {
+ processGetUpdates(rb);
+ return;
+ }
+
String id[] = params.getParams("id");
String ids[] = params.getParams("ids");
@@ -142,7 +153,7 @@ public class RealTimeGetComponent extend
// didn't find it in the update log, so it should be in the newest searcher opened
if (searcher == null) {
- searcherHolder = req.getCore().getNewestSearcher(false);
+ searcherHolder = req.getCore().getRealtimeSearcher();
searcher = searcherHolder.get();
}
@@ -247,4 +258,112 @@ public class RealTimeGetComponent extend
public URL[] getDocs() {
return null;
}
+
+
+
+ ///////////////////////////////////////////////////////////////////////////////////
+ // Returns last versions added to index
+ ///////////////////////////////////////////////////////////////////////////////////
+
+
+ public void processGetVersions(ResponseBuilder rb) throws IOException
+ {
+ SolrQueryRequest req = rb.req;
+ SolrQueryResponse rsp = rb.rsp;
+ SolrParams params = req.getParams();
+
+ if (!params.getBool(COMPONENT_NAME, true)) {
+ return;
+ }
+
+ int nVersions = params.getInt("getVersions", -1);
+ if (nVersions == -1) return;
+
+ String sync = params.get("sync");
+ if (sync != null) {
+ processSync(rb, nVersions, sync);
+ return;
+ }
+
+ UpdateLog ulog = req.getCore().getUpdateHandler().getUpdateLog();
+ if (ulog == null) return;
+
+ UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
+ try {
+ rb.rsp.add("versions", recentUpdates.getVersions(nVersions));
+ } finally {
+ recentUpdates.close(); // cache this somehow?
+ }
+ }
+
+
+ public void processSync(ResponseBuilder rb, int nVersions, String sync) {
+ List<String> replicas = StrUtils.splitSmart(sync, ",", true);
+
+
+ PeerSync peerSync = new PeerSync(rb.req.getCore(), replicas, nVersions);
+ boolean success = peerSync.sync();
+
+ // TODO: more complex response?
+ rb.rsp.add("sync", success);
+ }
+
+
+ public void processGetUpdates(ResponseBuilder rb) throws IOException
+ {
+ SolrQueryRequest req = rb.req;
+ SolrQueryResponse rsp = rb.rsp;
+ SolrParams params = req.getParams();
+
+ if (!params.getBool(COMPONENT_NAME, true)) {
+ return;
+ }
+
+ String versionsStr = params.get("getUpdates");
+ if (versionsStr == null) return;
+
+ UpdateLog ulog = req.getCore().getUpdateHandler().getUpdateLog();
+ if (ulog == null) return;
+
+ List<String> versions = StrUtils.splitSmart(versionsStr, ",", true);
+
+ // TODO: get this from cache instead of rebuilding?
+ UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
+
+ List<Object> updates = new ArrayList<Object>(versions.size());
+
+ long minVersion = Long.MAX_VALUE;
+
+ try {
+ for (String versionStr : versions) {
+ long version = Long.parseLong(versionStr);
+ try {
+ Object o = recentUpdates.lookup(version);
+ if (o == null) continue;
+
+ if (version > 0) {
+ minVersion = Math.min(minVersion, version);
+ }
+
+ // TODO: do any kind of validation here?
+ updates.add(o);
+
+ } catch (SolrException e) {
+ log.warn("Exception reading log for updates", e);
+ } catch (ClassCastException e) {
+ log.warn("Exception reading log for updates", e);
+ }
+ }
+
+ // Must return all delete-by-query commands that occur after the first add requested
+ // since they may apply.
+ updates.addAll( recentUpdates.getDeleteByQuery(minVersion));
+
+ rb.rsp.add("updates", updates);
+
+ } finally {
+ recentUpdates.close(); // cache this somehow?
+ }
+ }
+
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java Wed Jan 25 19:49:26 2012
@@ -23,6 +23,7 @@ import org.apache.solr.common.params.Com
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.util.RTimer;
+import org.apache.solr.core.CloseHook;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.RequestHandlerBase;
@@ -133,11 +134,19 @@ public class SearchHandler extends Reque
log.info("Adding debug component:" + dbgCmp);
}
if(shfInfo ==null) {
- Map m = new HashMap();
- m.put("class",HttpShardHandlerFactory.class.getName());
- shfInfo = new PluginInfo("shardHandlerFactory", m,null,Collections.<PluginInfo>emptyList());
+ shardHandlerFactory = core.getCoreDescriptor().getCoreContainer().getShardHandlerFactory();
+ } else {
+ shardHandlerFactory = core.createInitInstance(shfInfo, ShardHandlerFactory.class, null, null);
}
- shardHandlerFactory = core.createInitInstance(shfInfo, ShardHandlerFactory.class, null, null);
+ core.addCloseHook(new CloseHook() {
+ @Override
+ public void preClose(SolrCore core) {
+ shardHandlerFactory.close();
+ }
+ @Override
+ public void postClose(SolrCore core) {
+ }
+ });
}
public List<SearchComponent> getComponents() {
@@ -247,7 +256,7 @@ public class SearchHandler extends Reque
for (String shard : sreq.actualShards) {
ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);
params.remove(ShardParams.SHARDS); // not a top-level request
- params.remove("distrib"); // not a top-level request
+ params.set("distrib", "false"); // not a top-level request
params.remove("indent");
params.remove(CommonParams.HEADER_ECHO_PARAMS);
params.set(ShardParams.IS_SHARD, true); // a sub (shard) request
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java Wed Jan 25 19:49:26 2012
@@ -20,4 +20,6 @@ package org.apache.solr.handler.componen
public abstract class ShardHandlerFactory {
public abstract ShardHandler getShardHandler();
+
+ public abstract void close();
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/request/SimpleFacets.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/request/SimpleFacets.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/request/SimpleFacets.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/request/SimpleFacets.java Wed Jan 25 19:49:26 2012
@@ -39,6 +39,7 @@ import org.apache.solr.schema.*;
import org.apache.solr.search.*;
import org.apache.solr.util.BoundedTreeSet;
import org.apache.solr.util.DateMathParser;
+import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.handler.component.ResponseBuilder;
import org.apache.solr.util.LongPriorityQueue;
@@ -327,6 +328,7 @@ public class SimpleFacets {
Integer.MAX_VALUE,
10, TimeUnit.SECONDS, // terminate idle threads after 10 sec
new SynchronousQueue<Runnable>() // directly hand off tasks
+ , new DefaultSolrThreadFactory("facetExectutor")
);
/**
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/FunctionRangeQParserPlugin.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/FunctionRangeQParserPlugin.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/FunctionRangeQParserPlugin.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/FunctionRangeQParserPlugin.java Wed Jan 25 19:49:26 2012
@@ -80,43 +80,3 @@ public class FunctionRangeQParserPlugin
}
-// This class works as either a normal constant score query, or as a PostFilter using a collector
-class FunctionRangeQuery extends SolrConstantScoreQuery implements PostFilter {
- final ValueSourceRangeFilter rangeFilt;
-
- public FunctionRangeQuery(ValueSourceRangeFilter filter) {
- super(filter);
- this.rangeFilt = filter;
- }
-
- @Override
- public DelegatingCollector getFilterCollector(IndexSearcher searcher) {
- Map fcontext = ValueSource.newContext(searcher);
- return new FunctionRangeCollector(fcontext);
- }
-
- class FunctionRangeCollector extends DelegatingCollector {
- final Map fcontext;
- ValueSourceScorer scorer;
- int maxdoc;
-
- public FunctionRangeCollector(Map fcontext) {
- this.fcontext = fcontext;
- }
-
- @Override
- public void collect(int doc) throws IOException {
- if (doc<maxdoc && scorer.matches(doc)) {
- delegate.collect(doc);
- }
- }
-
- @Override
- public void setNextReader(IndexReader.AtomicReaderContext context) throws IOException {
- maxdoc = context.reader.maxDoc();
- FunctionValues dv = rangeFilt.getValueSource().getValues(fcontext, context);
- scorer = dv.getRangeScorer(context.reader, rangeFilt.getLowerVal(), rangeFilt.getUpperVal(), rangeFilt.isIncludeLower(), rangeFilt.isIncludeUpper());
- super.setNextReader(context);
- }
- }
-}
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/FunctionRangeQuery.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/FunctionRangeQuery.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/FunctionRangeQuery.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/FunctionRangeQuery.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.search;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.queries.function.FunctionValues;
+import org.apache.lucene.queries.function.ValueSource;
+import org.apache.lucene.queries.function.ValueSourceScorer;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.solr.search.function.ValueSourceRangeFilter;
+
+import java.io.IOException;
+import java.util.Map;
+
+// This class works as either a normal constant score query, or as a PostFilter using a collector
+public class FunctionRangeQuery extends SolrConstantScoreQuery implements PostFilter {
+ final ValueSourceRangeFilter rangeFilt;
+
+ public FunctionRangeQuery(ValueSourceRangeFilter filter) {
+ super(filter);
+ this.rangeFilt = filter;
+ }
+
+ @Override
+ public DelegatingCollector getFilterCollector(IndexSearcher searcher) {
+ Map fcontext = ValueSource.newContext(searcher);
+ return new FunctionRangeCollector(fcontext);
+ }
+
+ class FunctionRangeCollector extends DelegatingCollector {
+ final Map fcontext;
+ ValueSourceScorer scorer;
+ int maxdoc;
+
+ public FunctionRangeCollector(Map fcontext) {
+ this.fcontext = fcontext;
+ }
+
+ @Override
+ public void collect(int doc) throws IOException {
+ if (doc<maxdoc && scorer.matches(doc)) {
+ delegate.collect(doc);
+ }
+ }
+
+ @Override
+ public void setNextReader(IndexReader.AtomicReaderContext context) throws IOException {
+ maxdoc = context.reader.maxDoc();
+ FunctionValues dv = rangeFilt.getValueSource().getValues(fcontext, context);
+ scorer = dv.getRangeScorer(context.reader, rangeFilt.getLowerVal(), rangeFilt.getUpperVal(), rangeFilt.isIncludeLower(), rangeFilt.isIncludeUpper());
+ super.setNextReader(context);
+ }
+ }
+}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java Wed Jan 25 19:49:26 2012
@@ -108,7 +108,7 @@ public class SolrIndexSearcher extends I
private final Collection<String> fieldNames;
private Collection<String> storedHighlightFieldNames;
private DirectoryFactory directoryFactory;
-
+
public SolrIndexSearcher(SolrCore core, String path, IndexSchema schema, SolrIndexConfig config, String name, boolean enableCache, DirectoryFactory directoryFactory) throws IOException {
// we don't need to reserve the directory because we get it from the factory
this(core, schema,name, core.getIndexReaderFactory().newReader(directoryFactory.get(path, config.lockType)), true, enableCache, false, directoryFactory);
@@ -133,6 +133,8 @@ public class SolrIndexSearcher extends I
if (dir instanceof FSDirectory) {
FSDirectory fsDirectory = (FSDirectory) dir;
indexDir = fsDirectory.getDirectory().getAbsolutePath();
+ } else {
+ log.warn("WARNING: Directory impl does not support setting indexDir: " + dir.getClass().getName());
}
this.closeReader = closeReader;
@@ -569,6 +571,37 @@ public class SolrIndexSearcher extends I
return id == DocIdSetIterator.NO_MORE_DOCS ? -1 : id;
}
+ /** lookup the docid by the unique key field, and return the id *within* the leaf reader in the low 32 bits, and the index of the leaf reader in the high 32 bits.
+ * -1 is returned if not found.
+ * @lucene.internal
+ */
+ public long lookupId(BytesRef idBytes) throws IOException {
+ String field = schema.getUniqueKeyField().getName();
+ final AtomicReaderContext[] leaves = leafContexts;
+
+
+ for (int i=0; i<leaves.length; i++) {
+ final AtomicReaderContext leaf = leaves[i];
+ final IndexReader reader = leaf.reader;
+
+ final Fields fields = reader.fields();
+ if (fields == null) continue;
+
+ final Bits liveDocs = reader.getLiveDocs();
+
+ final DocsEnum docs = reader.termDocsEnum(liveDocs, field, idBytes, false);
+
+ if (docs == null) continue;
+ int id = docs.nextDoc();
+ if (id == DocIdSetIterator.NO_MORE_DOCS) continue;
+ assert docs.nextDoc() == DocIdSetIterator.NO_MORE_DOCS;
+
+ return (((long)i) << 32) | id;
+ }
+
+ return -1;
+ }
+
/**
* Compute and cache the DocSet that matches a query.
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java Wed Jan 25 19:49:26 2012
@@ -62,10 +62,10 @@ public class SolrDispatchFilter implemen
{
final Logger log = LoggerFactory.getLogger(SolrDispatchFilter.class);
- protected CoreContainer cores;
+ protected volatile CoreContainer cores;
+
protected String pathPrefix = null; // strip this from the beginning of a path
protected String abortErrorMessage = null;
- protected String solrConfigFilename = null;
protected final Map<SolrConfig, SolrRequestParsers> parsers = new WeakHashMap<SolrConfig, SolrRequestParsers>();
protected final SolrRequestParsers adminRequestParser;
@@ -100,6 +100,10 @@ public class SolrDispatchFilter implemen
log.info("SolrDispatchFilter.init() done");
}
+
+ public CoreContainer getCores() {
+ return cores;
+ }
/** Method to override to change how CoreContainer initialization is performed. */
protected CoreContainer.Initializer createInitializer() {
@@ -118,7 +122,13 @@ public class SolrDispatchFilter implemen
((HttpServletResponse)response).sendError( 500, abortErrorMessage );
return;
}
-
+
+ if (this.cores == null) {
+ ((HttpServletResponse)response).sendError( 403, "Server is shutting down" );
+ return;
+ }
+ CoreContainer cores = this.cores;
+
if( request instanceof HttpServletRequest) {
HttpServletRequest req = (HttpServletRequest)request;
HttpServletResponse resp = (HttpServletResponse)response;
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java Wed Jan 25 19:49:26 2012
@@ -45,13 +45,20 @@ public class AddUpdateCommand extends Up
public int commitWithin = -1;
public AddUpdateCommand(SolrQueryRequest req) {
- super("add", req);
+ super(req);
}
+ @Override
+ public String name() {
+ return "add";
+ }
+
/** Reset state to reuse this object with a different document in the same request */
public void clear() {
solrDoc = null;
indexedId = null;
+ updateTerm = null;
+ version = 0;
}
public SolrInputDocument getSolrInputDocument() {
@@ -91,6 +98,10 @@ public class AddUpdateCommand extends Up
return indexedId;
}
+ public void setIndexedId(BytesRef indexedId) {
+ this.indexedId = indexedId;
+ }
+
public String getPrintableId() {
IndexSchema schema = req.getSchema();
SchemaField sf = schema.getUniqueKeyField();
@@ -105,10 +116,11 @@ public class AddUpdateCommand extends Up
@Override
public String toString() {
- StringBuilder sb = new StringBuilder(commandName);
- sb.append(':');
- if (indexedId !=null) sb.append("id=").append(indexedId);
+ StringBuilder sb = new StringBuilder(super.toString());
+ if (indexedId != null) sb.append(",id=").append(indexedId);
if (!overwrite) sb.append(",overwrite=").append(overwrite);
+ if (commitWithin != -1) sb.append(",commitWithin=").append(commitWithin);
+ sb.append('}');
return sb.toString();
}
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CommitUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CommitUpdateCommand.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CommitUpdateCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CommitUpdateCommand.java Wed Jan 25 19:49:26 2012
@@ -37,16 +37,21 @@ public class CommitUpdateCommand extends
public int maxOptimizeSegments = 1;
public CommitUpdateCommand(SolrQueryRequest req, boolean optimize) {
- super("commit", req);
+ super(req);
this.optimize=optimize;
}
+
+ @Override
+ public String name() {
+ return "commit";
+ }
+
@Override
public String toString() {
- return prepareCommit ? "prepareCommit" :
- ("commit(optimize="+optimize
+ return super.toString() + ",optimize="+optimize
+",waitSearcher="+waitSearcher
+",expungeDeletes="+expungeDeletes
+",softCommit="+softCommit
- +')');
+ +'}';
}
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java Wed Jan 25 19:49:26 2012
@@ -20,14 +20,21 @@ package org.apache.solr.update;
import java.io.IOException;
import org.apache.lucene.index.IndexWriter;
+import org.apache.solr.cloud.RecoveryStrategy;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.SolrCore;
public final class DefaultSolrCoreState extends SolrCoreState {
+
+ private final Object recoveryLock = new Object();
private int refCnt = 1;
private SolrIndexWriter indexWriter = null;
private DirectoryFactory directoryFactory;
+ private boolean recoveryRunning;
+ private RecoveryStrategy recoveryStrat;
+ private boolean closed = false;
+
public DefaultSolrCoreState(DirectoryFactory directoryFactory) {
this.directoryFactory = directoryFactory;
}
@@ -50,14 +57,23 @@ public final class DefaultSolrCoreState
}
@Override
- public synchronized void decref() throws IOException {
- refCnt--;
- if (refCnt == 0) {
- if (indexWriter != null) {
- indexWriter.close();
+ public void decref(IndexWriterCloser closer) throws IOException {
+ boolean cancelRecovery = false;
+ synchronized (this) {
+ refCnt--;
+ if (refCnt == 0) {
+ if (closer != null) {
+ closer.closeWriter(indexWriter);
+ } else if (indexWriter != null) {
+ indexWriter.close();
+ }
+ directoryFactory.close();
+ closed = true;
+ cancelRecovery = true;
}
- directoryFactory.close();
}
+ // don't wait for this in the sync block
+ if (cancelRecovery) cancelRecovery();
}
@Override
@@ -85,5 +101,43 @@ public final class DefaultSolrCoreState
public DirectoryFactory getDirectoryFactory() {
return directoryFactory;
}
+
+ @Override
+ public void doRecovery(SolrCore core) {
+ cancelRecovery();
+ synchronized (recoveryLock) {
+ while (recoveryRunning) {
+ try {
+ recoveryLock.wait(1000);
+ } catch (InterruptedException e) {
+
+ }
+ if (closed) return;
+ }
+
+ recoveryStrat = new RecoveryStrategy(core);
+ recoveryStrat.start();
+ recoveryRunning = true;
+ }
+
+ }
+
+ @Override
+ public void cancelRecovery() {
+ synchronized (recoveryLock) {
+ if (recoveryStrat != null) {
+ recoveryStrat.close();
+
+ try {
+ recoveryStrat.join();
+ } catch (InterruptedException e) {
+
+ }
+
+ recoveryRunning = false;
+ recoveryLock.notifyAll();
+ }
+ }
+ }
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java Wed Jan 25 19:49:26 2012
@@ -18,6 +18,8 @@
package org.apache.solr.update;
import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.CharsRef;
+import org.apache.solr.common.SolrInputField;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
@@ -28,18 +30,28 @@ import org.apache.solr.schema.SchemaFiel
public class DeleteUpdateCommand extends UpdateCommand {
public String id; // external (printable) id, for delete-by-id
public String query; // query string for delete-by-query
- private BytesRef indexedId;
+ public BytesRef indexedId;
public int commitWithin = -1;
public DeleteUpdateCommand(SolrQueryRequest req) {
- super("delete", req);
+ super(req);
+ }
+
+ @Override
+ public String name() {
+ return "delete";
+ }
+
+ public boolean isDeleteById() {
+ return query == null;
}
public void clear() {
id = null;
query = null;
indexedId = null;
+ version = 0;
}
/** Returns the indexed ID for this delete. The returned BytesRef is retained across multiple calls, and should not be modified. */
@@ -55,14 +67,46 @@ public class DeleteUpdateCommand extends
return indexedId;
}
+ public String getId() {
+ if (id == null && indexedId != null) {
+ IndexSchema schema = req.getSchema();
+ SchemaField sf = schema.getUniqueKeyField();
+ if (sf != null) {
+ CharsRef ref = new CharsRef();
+ sf.getType().indexedToReadable(indexedId, ref);
+ id = ref.toString();
+ }
+ }
+ return id;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ public void setQuery(String query) {
+ this.query = query;
+ }
+
+ public void setIndexedId(BytesRef indexedId) {
+ this.indexedId = indexedId;
+ this.id = null;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ this.indexedId = null;
+ }
@Override
public String toString() {
- StringBuilder sb = new StringBuilder(commandName);
- sb.append(':');
- if (id!=null) sb.append("id=").append(id);
- else sb.append("query=`").append(query).append('`');
+ StringBuilder sb = new StringBuilder(super.toString());
+ if (id!=null) sb.append(",id=").append(getId());
+ if (indexedId!=null) sb.append(",indexedId=").append(getId());
+ if (query != null) sb.append(",query=`").append(query).append('`');
sb.append(",commitWithin=").append(commitWithin);
- return sb.toString();
+ sb.append('}');
+ return sb.toString();
}
+
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java Wed Jan 25 19:49:26 2012
@@ -32,6 +32,7 @@ import org.apache.lucene.document.Docume
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
+import org.apache.lucene.queries.function.ValueSource;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanClause.Occur;
@@ -45,8 +46,11 @@ import org.apache.solr.common.util.Named
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.SolrConfig.UpdateHandlerInfo;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.FunctionRangeQuery;
import org.apache.solr.search.QParser;
-import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.search.QueryUtils;
+import org.apache.solr.search.function.ValueSourceRangeFilter;
/**
* TODO: add soft commitWithin support
@@ -54,8 +58,8 @@ import org.apache.solr.search.SolrIndexS
* <code>DirectUpdateHandler2</code> implements an UpdateHandler where documents are added
* directly to the main Lucene index as opposed to adding to a separate smaller index.
*/
-public class DirectUpdateHandler2 extends UpdateHandler {
- protected SolrCoreState solrCoreState;
+public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState.IndexWriterCloser {
+ protected final SolrCoreState solrCoreState;
protected final Lock commitLock = new ReentrantLock();
// stats
@@ -97,7 +101,7 @@ public class DirectUpdateHandler2 extend
public DirectUpdateHandler2(SolrCore core, UpdateHandler updateHandler) throws IOException {
super(core);
if (updateHandler instanceof DirectUpdateHandler2) {
- this.solrCoreState = ((DirectUpdateHandler2)updateHandler).solrCoreState;
+ this.solrCoreState = ((DirectUpdateHandler2) updateHandler).solrCoreState;
} else {
// the impl has changed, so we cannot use the old state - decref it
updateHandler.decref();
@@ -115,7 +119,9 @@ public class DirectUpdateHandler2 extend
softCommitTracker = new CommitTracker("Soft", core, softCommitDocsUpperBound, softCommitTimeUpperBound, true, true);
this.ulog = updateHandler.getUpdateLog();
- this.ulog.init(this, core);
+ if (this.ulog != null) {
+ this.ulog.init(this, core);
+ }
}
private void deleteAll() throws IOException {
@@ -170,14 +176,17 @@ public class DirectUpdateHandler2 extend
// allow duplicates
writer.addDocument(cmd.getLuceneDocument());
}
+
// Add to the transaction log *after* successfully adding to the index, if there was no error.
// This ordering ensures that if we log it, it's definitely been added to the the index.
// This also ensures that if a commit sneaks in-between, that we know everything in a particular
// log version was definitely committed.
- ulog.add(cmd);
+ if (ulog != null) ulog.add(cmd);
- softCommitTracker.addedDocument( -1 ); // TODO: support commitWithin with soft update
- commitTracker.addedDocument( cmd.commitWithin );
+ if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
+ commitTracker.addedDocument( cmd.commitWithin );
+ softCommitTracker.addedDocument( -1 ); // TODO: support commitWithin with soft update
+ }
rc = 1;
} finally {
@@ -207,14 +216,16 @@ public class DirectUpdateHandler2 extend
writer.deleteDocuments(deleteTerm);
// SolrCore.verbose("deleteDocuments",deleteTerm,"DONE");
- ulog.delete(cmd);
-
- if (commitTracker.getTimeUpperBound() > 0) {
- commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
- }
-
- if (softCommitTracker.getTimeUpperBound() > 0) {
- softCommitTracker.scheduleCommitWithin(softCommitTracker.getTimeUpperBound());
+ if (ulog != null) ulog.delete(cmd);
+
+ if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
+ if (commitTracker.getTimeUpperBound() > 0) {
+ commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
+ }
+
+ if (softCommitTracker.getTimeUpperBound() > 0) {
+ softCommitTracker.scheduleCommitWithin(softCommitTracker.getTimeUpperBound());
+ }
}
}
@@ -227,24 +238,51 @@ public class DirectUpdateHandler2 extend
try {
Query q;
try {
+ // TODO: move this higher in the stack?
QParser parser = QParser.getParser(cmd.query, "lucene", cmd.req);
q = parser.getQuery();
+ q = QueryUtils.makeQueryable(q);
+
+ // peer-sync can cause older deleteByQueries to be executed and could
+ // delete newer documents. We prevent this by adding a clause restricting
+ // version.
+ if ((cmd.getFlags() & UpdateCommand.PEER_SYNC) != 0) {
+ BooleanQuery bq = new BooleanQuery();
+ bq.add(q, Occur.MUST);
+ SchemaField sf = core.getSchema().getField(VersionInfo.VERSION_FIELD);
+ ValueSource vs = sf.getType().getValueSource(sf, null);
+ ValueSourceRangeFilter filt = new ValueSourceRangeFilter(vs, null, Long.toString(Math.abs(cmd.version)), true, true);
+ FunctionRangeQuery range = new FunctionRangeQuery(filt);
+ bq.add(range, Occur.MUST);
+ q = bq;
+ }
+
+
+
} catch (ParseException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
}
boolean delAll = MatchAllDocsQuery.class == q.getClass();
-
+
commitTracker.deletedDocument(cmd.commitWithin);
+
+ //
+ // synchronized to prevent deleteByQuery from running during the "open new searcher"
+ // part of a commit. DBQ needs to signal that a fresh reader will be needed for
+ // a realtime view of the index. When a new searcher is opened after a DBQ, that
+ // flag can be cleared. If those thing happen concurrently, it's not thread safe.
+ //
+ synchronized (this) {
+ if (delAll) {
+ deleteAll();
+ } else {
+ solrCoreState.getIndexWriter(core).deleteDocuments(q);
+ }
- if (delAll) {
- deleteAll();
- } else {
- solrCoreState.getIndexWriter(core).deleteDocuments(q);
+ if (ulog != null) ulog.deleteByQuery(cmd);
}
- ulog.deleteByQuery(cmd);
-
madeIt = true;
if (commitTracker.getTimeUpperBound() > 0) {
@@ -345,7 +383,7 @@ public class DirectUpdateHandler2 extend
if (!cmd.softCommit) {
synchronized (this) { // sync is currently needed to prevent preCommit from being called between preSoft and postSoft... see postSoft comments.
- ulog.preCommit(cmd);
+ if (ulog != null) ulog.preCommit(cmd);
}
// SolrCore.verbose("writer.commit() start writer=",writer);
@@ -363,23 +401,23 @@ public class DirectUpdateHandler2 extend
}
- if (cmd.softCommit) {
- // ulog.preSoftCommit();
- synchronized (this) {
- ulog.preSoftCommit(cmd);
- core.getSearcher(true,false,waitSearcher, true);
- ulog.postSoftCommit(cmd);
- }
- // ulog.postSoftCommit();
- } else {
- synchronized (this) {
- ulog.preSoftCommit(cmd);
- core.getSearcher(true,false,waitSearcher);
- ulog.postSoftCommit(cmd);
- }
- ulog.postCommit(cmd); // postCommit currently means new searcher has also been opened
+ if (cmd.softCommit) {
+ // ulog.preSoftCommit();
+ synchronized (this) {
+ if (ulog != null) ulog.preSoftCommit(cmd);
+ core.getSearcher(true, false, waitSearcher, true);
+ if (ulog != null) ulog.postSoftCommit(cmd);
}
-
+ // ulog.postSoftCommit();
+ } else {
+ synchronized (this) {
+ if (ulog != null) ulog.preSoftCommit(cmd);
+ core.getSearcher(true, false, waitSearcher);
+ if (ulog != null) ulog.postSoftCommit(cmd);
+ }
+ if (ulog != null) ulog.postCommit(cmd); // postCommit currently means new searcher has
+ // also been opened
+ }
// reset commit tracking
@@ -418,25 +456,6 @@ public class DirectUpdateHandler2 extend
}
@Override
- public SolrIndexSearcher reopenSearcher(SolrIndexSearcher previousSearcher) throws IOException {
-
- IndexReader currentReader = previousSearcher.getIndexReader();
- IndexReader newReader;
-
- IndexWriter writer = solrCoreState.getIndexWriter(core);
- // SolrCore.verbose("start reopen from",previousSearcher,"writer=",writer);
- newReader = IndexReader.openIfChanged(currentReader, writer, true);
- // SolrCore.verbose("reopen result", newReader);
-
- if (newReader == null) {
- currentReader.incRef();
- newReader = currentReader;
- }
-
- return new SolrIndexSearcher(core, schema, "main", newReader, true, true, true, core.getDirectoryFactory());
- }
-
- @Override
public void newIndexWriter() throws IOException {
solrCoreState.newIndexWriter(core);
}
@@ -490,12 +509,44 @@ public class DirectUpdateHandler2 extend
numDocsPending.set(0);
- solrCoreState.decref();
-
- log.info("closed " + this);
+ solrCoreState.decref(this);
}
+ public static boolean commitOnClose = true; // TODO: make this a real config option?
+
+ // IndexWriterCloser interface method - called from solrCoreState.decref(this)
+ @Override
+ public void closeWriter(IndexWriter writer) throws IOException {
+ commitLock.lock();
+ try {
+ if (!commitOnClose) {
+ if (writer != null) {
+ writer.rollback();
+ }
+
+ // we shouldn't close the transaction logs either, but leaving them open
+ // means we can't delete them on windows.
+ if (ulog != null) ulog.close();
+
+ return;
+ }
+
+ if (writer != null) {
+ writer.close();
+ }
+
+ // if the writer hits an exception, it's OK (and perhaps desirable)
+ // to not close the ulog?
+
+ // Closing the log currently deletes the log file.
+ // If this changes, we should record this as a "commit".
+ if (ulog != null) ulog.close();
+ } finally {
+ commitLock.unlock();
+ }
+ }
+
/////////////////////////////////////////////////////////////////////
// SolrInfoMBean stuff: Statistics and Module Info
/////////////////////////////////////////////////////////////////////
@@ -567,14 +618,15 @@ public class DirectUpdateHandler2 extend
return "DirectUpdateHandler2" + getStatistics();
}
- public SolrCoreState getIndexWriterProvider() {
+ @Override
+ public SolrCoreState getSolrCoreState() {
return solrCoreState;
}
@Override
public void decref() {
try {
- solrCoreState.decref();
+ solrCoreState.decref(this);
} catch (IOException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "", e);
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/MergeIndexesCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/MergeIndexesCommand.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/MergeIndexesCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/MergeIndexesCommand.java Wed Jan 25 19:49:26 2012
@@ -31,20 +31,25 @@ public class MergeIndexesCommand extends
public IndexReader[] readers;
public MergeIndexesCommand(IndexReader[] readers, SolrQueryRequest req) {
- super("mergeIndexes", req);
+ super(req);
this.readers = readers;
}
@Override
+ public String name() {
+ return "mergeIndexes";
+ }
+
+ @Override
public String toString() {
- StringBuilder sb = new StringBuilder(commandName);
- sb.append(':');
+ StringBuilder sb = new StringBuilder(super.toString());
if (readers != null && readers.length > 0) {
sb.append(readers[0].directory());
for (int i = 1; i < readers.length; i++) {
sb.append(",").append(readers[i].directory());
}
}
+ sb.append('}');
return sb.toString();
}
}
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,429 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.httpclient.NoHttpResponseException;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.handler.component.ShardResponse;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.update.processor.DistributedUpdateProcessorFactory;
+import org.apache.solr.update.processor.RunUpdateProcessorFactory;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** @lucene.experimental */
+public class PeerSync {
+ public static Logger log = LoggerFactory.getLogger(PeerSync.class);
+ public boolean debug = log.isDebugEnabled();
+
+ private List<String> replicas;
+ private int nUpdates;
+
+ private UpdateHandler uhandler;
+ private UpdateLog ulog;
+ private ShardHandlerFactory shardHandlerFactory;
+ private ShardHandler shardHandler;
+
+ private UpdateLog.RecentUpdates recentUpdates;
+
+ private List<Long> ourUpdates;
+ private Set<Long> ourUpdateSet;
+ private Set<Long> requestedUpdateSet;
+ private long ourLowThreshold; // 20th percentile
+ private long ourHighThreshold; // 80th percentile
+
+ // comparator that sorts by absolute value, putting highest first
+ private static Comparator<Long> absComparator = new Comparator<Long>() {
+ @Override
+ public int compare(Long o1, Long o2) {
+ long l1 = Math.abs(o1);
+ long l2 = Math.abs(o2);
+ if (l1 >l2) return -1;
+ if (l1 < l2) return 1;
+ return 0;
+ }
+ };
+
+ // comparator that sorts update records by absolute value of version, putting lowest first
+ private static Comparator<Object> updateRecordComparator = new Comparator<Object>() {
+ @Override
+ public int compare(Object o1, Object o2) {
+ if (!(o1 instanceof List)) return 1;
+ if (!(o2 instanceof List)) return -1;
+
+ List lst1 = (List)o1;
+ List lst2 = (List)o2;
+
+ long l1 = Math.abs((Long)lst1.get(1));
+ long l2 = Math.abs((Long)lst2.get(1));
+
+ if (l1 >l2) return 1;
+ if (l1 < l2) return -1;
+ return 0;
+ }
+ };
+
+
+ private static class SyncShardRequest extends ShardRequest {
+ List<Long> reportedVersions;
+ List<Long> requestedUpdates;
+ Exception updateException;
+ }
+
+
+ public PeerSync(SolrCore core, List<String> replicas, int nUpdates) {
+ this.replicas = replicas;
+ this.nUpdates = nUpdates;
+
+ uhandler = core.getUpdateHandler();
+ ulog = uhandler.getUpdateLog();
+ shardHandlerFactory = core.getCoreDescriptor().getCoreContainer().getShardHandlerFactory();
+ shardHandler = shardHandlerFactory.getShardHandler();
+ }
+
+ public long percentile(List<Long> arr, float frac) {
+ int elem = (int) (arr.size() * frac);
+ return Math.abs(arr.get(elem));
+ }
+
+ /** Returns true if peer sync was successful, meaning that this core may not be considered to have the latest updates.
+ * A commit is not performed.
+ */
+ public boolean sync() {
+ if (ulog == null) {
+ return false;
+ }
+
+ // fire off the requests before getting our own recent updates (for better concurrency)
+ for (String replica : replicas) {
+ requestVersions(replica);
+ }
+
+ recentUpdates = ulog.getRecentUpdates();
+ try {
+ ourUpdates = recentUpdates.getVersions(nUpdates);
+ } finally {
+ recentUpdates.close();
+ }
+
+
+ Collections.sort(ourUpdates, absComparator);
+
+ if (ourUpdates.size() > 0) {
+ ourLowThreshold = percentile(ourUpdates, 0.8f);
+ ourHighThreshold = percentile(ourUpdates, 0.2f);
+ } else {
+ // we have no versions and hence no frame of reference to tell if we can use a peers
+ // updates to bring us into sync
+ return false;
+ }
+
+ ourUpdateSet = new HashSet<Long>(ourUpdates);
+ requestedUpdateSet = new HashSet<Long>(ourUpdates);
+
+ for(;;) {
+ ShardResponse srsp = shardHandler.takeCompletedOrError();
+ if (srsp == null) break;
+ boolean success = handleResponse(srsp);
+ if (!success) {
+ shardHandler.cancelAll();
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private void requestVersions(String replica) {
+ SyncShardRequest sreq = new SyncShardRequest();
+ sreq.purpose = 1;
+ // TODO: this sucks
+ if (replica.startsWith("http://"))
+ replica = replica.substring(7);
+ sreq.shards = new String[]{replica};
+ sreq.actualShards = sreq.shards;
+ sreq.params = new ModifiableSolrParams();
+ sreq.params.set("qt","/get");
+ sreq.params.set("distrib",false);
+ sreq.params.set("getVersions",nUpdates);
+ shardHandler.submit(sreq, replica, sreq.params);
+ }
+
+ private boolean handleResponse(ShardResponse srsp) {
+ if (srsp.getException() != null) {
+
+ // TODO: look at this more thoroughly - we don't want
+ // to fail on connection exceptions, but it may make sense
+ // to determine this based on the number of fails
+ if (srsp.getException() instanceof SolrServerException) {
+ Throwable solrException = ((SolrServerException) srsp.getException())
+ .getRootCause();
+ if (solrException instanceof ConnectException
+ || solrException instanceof NoHttpResponseException) {
+ return true;
+ }
+ }
+ // TODO: at least log???
+ // srsp.getException().printStackTrace(System.out);
+
+ return false;
+ }
+
+ ShardRequest sreq = srsp.getShardRequest();
+ if (sreq.purpose == 1) {
+ return handleVersions(srsp);
+ } else {
+ return handleUpdates(srsp);
+ }
+ }
+
+ private boolean handleVersions(ShardResponse srsp) {
+ // we retrieved the last N updates from the replica
+ List<Long> otherVersions = (List<Long>)srsp.getSolrResponse().getResponse().get("versions");
+ // TODO: how to handle short lists?
+
+ SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
+ sreq.reportedVersions = otherVersions;
+
+ if (otherVersions.size() == 0) {
+ return true;
+ }
+
+ Collections.sort(otherVersions, absComparator);
+
+ long otherHigh = percentile(otherVersions, .2f);
+ long otherLow = percentile(otherVersions, .8f);
+
+ if (ourHighThreshold < otherLow) {
+ // Small overlap between version windows and ours is older
+ // This means that we might miss updates if we attempted to use this method.
+ // Since there exists just one replica that is so much newer, we must
+ // fail the sync.
+ return false;
+ }
+
+ if (ourLowThreshold > otherHigh) {
+ // Small overlap between windows and ours is newer.
+ // Using this list to sync would result in requesting/replaying results we don't need
+ // and possibly bringing deleted docs back to life.
+ return true;
+ }
+
+ List<Long> toRequest = new ArrayList<Long>();
+ for (Long otherVersion : otherVersions) {
+ // stop when the entries get old enough that reorders may lead us to see updates we don't need
+ if (Math.abs(otherVersion) < ourLowThreshold) break;
+
+ if (ourUpdateSet.contains(otherVersion) || requestedUpdateSet.contains(otherVersion)) {
+ // we either have this update, or already requested it
+ continue;
+ }
+
+ toRequest.add(otherVersion);
+ requestedUpdateSet.add(otherVersion);
+ }
+
+ sreq.requestedUpdates = toRequest;
+
+ if (toRequest.isEmpty()) {
+ // we had (or already requested) all the updates referenced by the replica
+ return true;
+ }
+
+ return requestUpdates(srsp, toRequest);
+ }
+
+ private boolean requestUpdates(ShardResponse srsp, List<Long> toRequest) {
+ String replica = srsp.getShardRequest().shards[0];
+
+ log.info("Requesting updates from " + replica + " versions=" + toRequest);
+
+
+
+ // reuse our original request object
+ ShardRequest sreq = srsp.getShardRequest();
+
+ sreq.purpose = 0;
+ sreq.params = new ModifiableSolrParams();
+ sreq.params.set("qt","/get");
+ sreq.params.set("distrib",false);
+ sreq.params.set("getUpdates", StrUtils.join(toRequest, ','));
+ sreq.responses.clear(); // needs to be zeroed for correct correlation to occur
+
+ shardHandler.submit(sreq, sreq.shards[0], sreq.params);
+
+ return true;
+ }
+
+
+ private boolean handleUpdates(ShardResponse srsp) {
+ // we retrieved the last N updates from the replica
+ List<Object> updates = (List<Object>)srsp.getSolrResponse().getResponse().get("updates");
+
+ SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
+ if (updates.size() < sreq.requestedUpdates.size()) {
+ log.error("PeerSync: Requested " + sreq.requestedUpdates.size() + " updates from " + sreq.shards[0] + " but retrieved " + updates.size());
+ return false;
+ }
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(DistributedUpdateProcessor.SEEN_LEADER, true);
+ SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
+ SolrQueryResponse rsp = new SolrQueryResponse();
+
+ RunUpdateProcessorFactory runFac = new RunUpdateProcessorFactory();
+ DistributedUpdateProcessorFactory magicFac = new DistributedUpdateProcessorFactory();
+ runFac.init(new NamedList());
+ magicFac.init(new NamedList());
+
+ UpdateRequestProcessor proc = magicFac.getInstance(req, rsp, runFac.getInstance(req, rsp, null));
+
+ Collections.sort(updates, updateRecordComparator);
+
+ Object o = null;
+ long lastVersion = 0;
+ try {
+ // Apply oldest updates first
+ for (Object obj : updates) {
+ // should currently be a List<Oper,Ver,Doc/Id>
+ o = obj;
+ List<Object> entry = (List<Object>)o;
+
+ int oper = (Integer)entry.get(0);
+ long version = (Long) entry.get(1);
+ if (version == lastVersion && version != 0) continue;
+ lastVersion = version;
+
+ switch (oper) {
+ case UpdateLog.ADD:
+ {
+ // byte[] idBytes = (byte[]) entry.get(2);
+ SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
+ AddUpdateCommand cmd = new AddUpdateCommand(req);
+ // cmd.setIndexedId(new BytesRef(idBytes));
+ cmd.solrDoc = sdoc;
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+ proc.processAdd(cmd);
+ break;
+ }
+ case UpdateLog.DELETE:
+ {
+ byte[] idBytes = (byte[]) entry.get(2);
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.setIndexedId(new BytesRef(idBytes));
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+ proc.processDelete(cmd);
+ break;
+ }
+
+ case UpdateLog.DELETE_BY_QUERY:
+ {
+ String query = (String)entry.get(2);
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.query = query;
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+ proc.processDelete(cmd);
+ break;
+ }
+
+ default:
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
+ }
+
+ }
+
+ }
+ catch (IOException e) {
+ // TODO: should this be handled separately as a problem with us?
+ // I guess it probably already will by causing replication to be kicked off.
+ sreq.updateException = e;
+ log.error("Error applying updates from " + sreq.shards + " ,update=" + o, e);
+ return false;
+ }
+ catch (Exception e) {
+ sreq.updateException = e;
+ log.error("Error applying updates from " + sreq.shards + " ,update=" + o, e);
+ return false;
+ }
+ finally {
+ try {
+ proc.finish();
+ } catch (Exception e) {
+ sreq.updateException = e;
+ log.error("Error applying updates from " + sreq.shards + " ,finish()", e);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+
+
+ /** Requests and applies recent updates from peers */
+ public static void sync(SolrCore core, List<String> replicas, int nUpdates) {
+ UpdateHandler uhandler = core.getUpdateHandler();
+
+ ShardHandlerFactory shardHandlerFactory = core.getCoreDescriptor().getCoreContainer().getShardHandlerFactory();
+
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+
+ for (String replica : replicas) {
+ ShardRequest sreq = new ShardRequest();
+ sreq.shards = new String[]{replica};
+ sreq.params = new ModifiableSolrParams();
+ sreq.params.set("qt","/get");
+ sreq.params.set("distrib", false);
+ sreq.params.set("getVersions",nUpdates);
+ shardHandler.submit(sreq, replica, sreq.params);
+ }
+
+ for (String replica : replicas) {
+ ShardResponse srsp = shardHandler.takeCompletedOrError();
+ }
+
+
+ }
+
+}
\ No newline at end of file
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/RollbackUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/RollbackUpdateCommand.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/RollbackUpdateCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/RollbackUpdateCommand.java Wed Jan 25 19:49:26 2012
@@ -26,7 +26,16 @@ import org.apache.solr.request.SolrQuery
public class RollbackUpdateCommand extends UpdateCommand {
public RollbackUpdateCommand(SolrQueryRequest req) {
- super("rollback", req);
+ super(req);
}
+ @Override
+ public String name() {
+ return "rollback";
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + '}';
+ }
}
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,484 @@
+package org.apache.solr.update;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequestExt;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+
+
+
+
+public class SolrCmdDistributor {
+ // TODO: shut this thing down
+ // TODO: this cannot be per instance...
+ static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0,
+ Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new DefaultSolrThreadFactory("cmdDistribExecutor"));
+
+ static HttpClient client;
+
+ static {
+ MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
+ mgr.getParams().setDefaultMaxConnectionsPerHost(8);
+ mgr.getParams().setMaxTotalConnections(200);
+ client = new HttpClient(mgr);
+ }
+
+ CompletionService<Request> completionService;
+ Set<Future<Request>> pending;
+
+ int maxBufferedAddsPerServer = 10;
+ int maxBufferedDeletesPerServer = 10;
+
+ private Response response = new Response();
+
+ private final Map<Node,List<AddRequest>> adds = new HashMap<Node,List<AddRequest>>();
+ private final Map<Node,List<DeleteRequest>> deletes = new HashMap<Node,List<DeleteRequest>>();
+
+ class AddRequest {
+ AddUpdateCommand cmd;
+ ModifiableSolrParams params;
+ }
+
+ class DeleteRequest {
+ DeleteUpdateCommand cmd;
+ ModifiableSolrParams params;
+ }
+
+ public SolrCmdDistributor() {
+
+ }
+
+ public void finish() {
+
+ // piggyback on any outstanding adds or deletes if possible.
+ flushAdds(1, null, null);
+ flushDeletes(1, null, null);
+
+ checkResponses(true);
+ }
+
+ public void distribDelete(DeleteUpdateCommand cmd, List<Node> urls, ModifiableSolrParams params) throws IOException {
+ checkResponses(false);
+
+ if (cmd.isDeleteById()) {
+ doDelete(cmd, urls, params);
+ } else {
+ doDelete(cmd, urls, params);
+ }
+ }
+
+ public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams commitParams) throws IOException {
+ checkResponses(false);
+
+ // make sure any pending deletes are flushed
+ flushDeletes(1, null, null);
+
+ // TODO: this is brittle
+ // need to make a clone since these commands may be reused
+ AddUpdateCommand clone = new AddUpdateCommand(null);
+
+ clone.solrDoc = cmd.solrDoc;
+ clone.commitWithin = cmd.commitWithin;
+ clone.overwrite = cmd.overwrite;
+ clone.setVersion(cmd.getVersion());
+ AddRequest addRequest = new AddRequest();
+ addRequest.cmd = clone;
+ addRequest.params = commitParams;
+
+ for (Node node : nodes) {
+ List<AddRequest> alist = adds.get(node);
+ if (alist == null) {
+ alist = new ArrayList<AddRequest>(2);
+ adds.put(node, alist);
+ }
+ alist.add(addRequest);
+ }
+
+ flushAdds(maxBufferedAddsPerServer, null, null);
+ }
+
+ public void distribCommit(CommitUpdateCommand cmd, List<Node> nodes,
+ ModifiableSolrParams params) throws IOException {
+ // Wait for all outstanding responses to make sure that a commit
+ // can't sneak in ahead of adds or deletes we already sent.
+ // We could do this on a per-server basis, but it's more complex
+ // and this solution will lead to commits happening closer together.
+ checkResponses(true);
+
+ // currently, we dont try to piggy back on outstanding adds or deletes
+
+ UpdateRequestExt ureq = new UpdateRequestExt();
+ ureq.setParams(params);
+
+ addCommit(ureq, cmd);
+
+ for (Node node : nodes) {
+ submit(ureq, node);
+ }
+
+ // if the command wanted to block until everything was committed,
+ // then do that here.
+
+ if (cmd.waitSearcher) {
+ checkResponses(true);
+ }
+ }
+
+ private void doDelete(DeleteUpdateCommand cmd, List<Node> nodes,
+ ModifiableSolrParams params) throws IOException {
+
+ flushAdds(1, null, null);
+
+ DeleteUpdateCommand clonedCmd = clone(cmd);
+ DeleteRequest deleteRequest = new DeleteRequest();
+ deleteRequest.cmd = clonedCmd;
+ deleteRequest.params = params;
+ for (Node node : nodes) {
+ List<DeleteRequest> dlist = deletes.get(node);
+
+ if (dlist == null) {
+ dlist = new ArrayList<DeleteRequest>(2);
+ deletes.put(node, dlist);
+ }
+ dlist.add(deleteRequest);
+ }
+
+ flushDeletes(maxBufferedDeletesPerServer, null, null);
+ }
+
+ void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
+ if (cmd == null) return;
+ ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE
+ : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher);
+ }
+
+ boolean flushAdds(int limit, CommitUpdateCommand ccmd, ModifiableSolrParams commitParams) {
+ // check for pending deletes
+
+ Set<Node> removeNodes = new HashSet<Node>();
+ Set<Node> nodes = adds.keySet();
+
+ for (Node node : nodes) {
+ List<AddRequest> alist = adds.get(node);
+ if (alist == null || alist.size() < limit) return false;
+
+ UpdateRequestExt ureq = new UpdateRequestExt();
+
+ addCommit(ureq, ccmd);
+
+ ModifiableSolrParams combinedParams = new ModifiableSolrParams();
+
+ for (AddRequest aReq : alist) {
+ AddUpdateCommand cmd = aReq.cmd;
+ combinedParams.add(aReq.params);
+
+ ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+ }
+
+ if (commitParams != null) combinedParams.add(commitParams);
+ if (ureq.getParams() == null) ureq.setParams(new ModifiableSolrParams());
+ ureq.getParams().add(combinedParams);
+
+ removeNodes.add(node);
+
+ submit(ureq, node);
+ }
+
+ for (Node node : removeNodes) {
+ adds.remove(node);
+ }
+
+ return true;
+ }
+
+ boolean flushDeletes(int limit, CommitUpdateCommand ccmd, ModifiableSolrParams commitParams) {
+ // check for pending deletes
+
+ Set<Node> removeNodes = new HashSet<Node>();
+ Set<Node> nodes = deletes.keySet();
+ for (Node node : nodes) {
+ List<DeleteRequest> dlist = deletes.get(node);
+ if (dlist == null || dlist.size() < limit) return false;
+ UpdateRequestExt ureq = new UpdateRequestExt();
+
+ addCommit(ureq, ccmd);
+
+ ModifiableSolrParams combinedParams = new ModifiableSolrParams();
+
+ for (DeleteRequest dReq : dlist) {
+ DeleteUpdateCommand cmd = dReq.cmd;
+ combinedParams.add(dReq.params);
+ if (cmd.isDeleteById()) {
+ ureq.deleteById(cmd.getId(), cmd.getVersion());
+ } else {
+ ureq.deleteByQuery(cmd.query);
+ }
+
+ if (commitParams != null) combinedParams.add(commitParams);
+ if (ureq.getParams() == null) ureq
+ .setParams(new ModifiableSolrParams());
+ ureq.getParams().add(combinedParams);
+ }
+
+ removeNodes.add(node);
+ submit(ureq, node);
+ }
+
+ for (Node node : removeNodes) {
+ deletes.remove(node);
+ }
+
+ return true;
+ }
+
+ private DeleteUpdateCommand clone(DeleteUpdateCommand cmd) {
+ DeleteUpdateCommand c = (DeleteUpdateCommand)cmd.clone();
+ // TODO: shouldnt the clone do this?
+ c.setFlags(cmd.getFlags());
+ c.setVersion(cmd.getVersion());
+ return c;
+ }
+
+ public static class Request {
+ public Node node;
+ UpdateRequestExt ureq;
+ NamedList<Object> ursp;
+ int rspCode;
+ public Exception exception;
+ int retries;
+ }
+
+ void submit(UpdateRequestExt ureq, Node node) {
+ Request sreq = new Request();
+ sreq.node = node;
+ sreq.ureq = ureq;
+ submit(sreq);
+ }
+
+ public void submit(final Request sreq) {
+ if (completionService == null) {
+ completionService = new ExecutorCompletionService<Request>(commExecutor);
+ pending = new HashSet<Future<Request>>();
+ }
+ final String url = sreq.node.getUrl();
+
+ Callable<Request> task = new Callable<Request>() {
+ @Override
+ public Request call() throws Exception {
+ Request clonedRequest = new Request();
+ clonedRequest.node = sreq.node;
+ clonedRequest.ureq = sreq.ureq;
+ clonedRequest.retries = sreq.retries;
+
+ try {
+ String fullUrl;
+ if (!url.startsWith("http://") && !url.startsWith("https://")) {
+ fullUrl = "http://" + url;
+ } else {
+ fullUrl = url;
+ }
+
+ CommonsHttpSolrServer server = new CommonsHttpSolrServer(fullUrl,
+ client);
+
+ clonedRequest.ursp = server.request(clonedRequest.ureq);
+
+ // currently no way to get the request body.
+ } catch (Exception e) {
+ clonedRequest.exception = e;
+ if (e instanceof SolrException) {
+ clonedRequest.rspCode = ((SolrException) e).code();
+ } else {
+ clonedRequest.rspCode = -1;
+ }
+ }
+ return clonedRequest;
+ }
+ };
+
+ pending.add(completionService.submit(task));
+
+ }
+
+ void checkResponses(boolean block) {
+
+ while (pending != null && pending.size() > 0) {
+ try {
+ Future<Request> future = block ? completionService.take()
+ : completionService.poll();
+ if (future == null) return;
+ pending.remove(future);
+
+ try {
+ Request sreq = future.get();
+ if (sreq.rspCode != 0) {
+ // error during request
+
+ // if there is a retry url, we want to retry...
+ // TODO: but we really should only retry on connection errors...
+ if (sreq.retries < 5 && sreq.node.checkRetry()) {
+ sreq.retries++;
+ sreq.rspCode = 0;
+ sreq.exception = null;
+ Thread.sleep(500);
+ submit(sreq);
+ checkResponses(block);
+ } else {
+ Exception e = sreq.exception;
+ Error error = new Error();
+ error.e = e;
+ error.node = sreq.node;
+ response.errors.add(error);
+ response.sreq = sreq;
+ SolrException.log(SolrCore.log, "shard update error "
+ + sreq.node, sreq.exception);
+ }
+ }
+
+ } catch (ExecutionException e) {
+ // shouldn't happen since we catch exceptions ourselves
+ SolrException.log(SolrCore.log,
+ "error sending update request to shard", e);
+ }
+
+ } catch (InterruptedException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+ "interrupted waiting for shard update response", e);
+ }
+ }
+ }
+
+ public static class Response {
+ public Request sreq;
+ public List<Error> errors = new ArrayList<Error>();
+ }
+
+ public static class Error {
+ public Node node;
+ public Exception e;
+ }
+
+ public Response getResponse() {
+ return response;
+ }
+
+ public static abstract class Node {
+ public abstract String getUrl();
+ public abstract boolean checkRetry();
+ public abstract String getCoreName();
+ public abstract String getBaseUrl();
+ public abstract ZkCoreNodeProps getNodeProps();
+ }
+
+ public static class StdNode extends Node {
+ protected String url;
+ protected String baseUrl;
+ protected String coreName;
+ private ZkCoreNodeProps nodeProps;
+
+ public StdNode(ZkCoreNodeProps nodeProps) {
+ this.url = nodeProps.getCoreUrl();
+ this.baseUrl = nodeProps.getBaseUrl();
+ this.coreName = nodeProps.getCoreName();
+ this.nodeProps = nodeProps;
+ }
+
+ @Override
+ public String getUrl() {
+ return url;
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName() + ": " + url;
+ }
+
+ @Override
+ public boolean checkRetry() {
+ return false;
+ }
+
+ @Override
+ public String getBaseUrl() {
+ return baseUrl;
+ }
+
+ @Override
+ public String getCoreName() {
+ return coreName;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((baseUrl == null) ? 0 : baseUrl.hashCode());
+ result = prime * result + ((coreName == null) ? 0 : coreName.hashCode());
+ result = prime * result + ((url == null) ? 0 : url.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ StdNode other = (StdNode) obj;
+ if (baseUrl == null) {
+ if (other.baseUrl != null) return false;
+ } else if (!baseUrl.equals(other.baseUrl)) return false;
+ if (coreName == null) {
+ if (other.coreName != null) return false;
+ } else if (!coreName.equals(other.coreName)) return false;
+ if (url == null) {
+ if (other.url != null) return false;
+ } else if (!url.equals(other.url)) return false;
+ return true;
+ }
+
+ public ZkCoreNodeProps getNodeProps() {
+ return nodeProps;
+ }
+ }
+}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCoreState.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCoreState.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCoreState.java Wed Jan 25 19:49:26 2012
@@ -49,11 +49,12 @@ public abstract class SolrCoreState {
/**
* Decrement the number of references to this state. When then number of
- * references hits 0, the state will close.
+ * references hits 0, the state will close. If an optional closer is
+ * passed, that will be used to close the writer.
*
* @throws IOException
*/
- public abstract void decref() throws IOException;
+ public abstract void decref(IndexWriterCloser closer) throws IOException;
/**
* Increment the number of references to this state.
@@ -73,5 +74,14 @@ public abstract class SolrCoreState {
* @return the {@link DirectoryFactory} that should be used.
*/
public abstract DirectoryFactory getDirectoryFactory();
+
+
+ public interface IndexWriterCloser {
+ public void closeWriter(IndexWriter writer) throws IOException;
+ }
+
+ public abstract void doRecovery(SolrCore core);
+ public abstract void cancelRecovery();
+
}