You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2013/10/21 20:58:44 UTC
svn commit: r1534320 [32/39] - in /lucene/dev/branches/lucene4956: ./
dev-tools/ dev-tools/idea/.idea/ dev-tools/idea/lucene/expressions/
dev-tools/idea/solr/contrib/velocity/ dev-tools/maven/
dev-tools/maven/lucene/ dev-tools/maven/lucene/expressions/...
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java Mon Oct 21 18:58:24 2013
@@ -18,12 +18,14 @@
package org.apache.solr.servlet;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -35,9 +37,11 @@ import org.apache.solr.common.util.Conte
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.ConfigSolr;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.handler.ContentStreamHandlerBase;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryRequestBase;
@@ -60,6 +64,7 @@ import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -131,12 +136,44 @@ public class SolrDispatchFilter implemen
log.info("SolrDispatchFilter.init() done");
}
+ private ConfigSolr loadConfigSolr(SolrResourceLoader loader) {
+
+ String solrxmlLocation = System.getProperty("solr.solrxml.location", "solrhome");
+
+ if (solrxmlLocation == null || "solrhome".equalsIgnoreCase(solrxmlLocation))
+ return ConfigSolr.fromSolrHome(loader, loader.getInstanceDir());
+
+ if ("zookeeper".equalsIgnoreCase(solrxmlLocation)) {
+ String zkHost = System.getProperty("zkHost");
+ log.info("Trying to read solr.xml from " + zkHost);
+ if (StringUtils.isEmpty(zkHost))
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Could not load solr.xml from zookeeper: zkHost system property not set");
+ SolrZkClient zkClient = new SolrZkClient(zkHost, 30000);
+ try {
+ if (!zkClient.exists("/solr.xml", true))
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Could not load solr.xml from zookeeper: node not found");
+ byte[] data = zkClient.getData("/solr.xml", null, null, true);
+ return ConfigSolr.fromInputStream(loader, new ByteArrayInputStream(data));
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Could not load solr.xml from zookeeper", e);
+ } finally {
+ zkClient.close();
+ }
+ }
+
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Bad solr.solrxml.location set: " + solrxmlLocation + " - should be 'solrhome' or 'zookeeper'");
+ }
+
/**
* Override this to change CoreContainer initialization
* @return a CoreContainer to hold this server's cores
*/
protected CoreContainer createCoreContainer() {
- CoreContainer cores = new CoreContainer();
+ SolrResourceLoader loader = new SolrResourceLoader(SolrResourceLoader.locateSolrHome());
+ ConfigSolr config = loadConfigSolr(loader);
+ CoreContainer cores = new CoreContainer(loader, config);
cores.load();
return cores;
}
@@ -165,7 +202,7 @@ public class SolrDispatchFilter implemen
}
if (this.cores == null) {
- ((HttpServletResponse)response).sendError( 503, "Server is shutting down" );
+ ((HttpServletResponse)response).sendError( 503, "Server is shutting down or failed to initialize" );
return;
}
CoreContainer cores = this.cores;
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java Mon Oct 21 18:58:24 2013
@@ -290,21 +290,6 @@ public final class ZookeeperInfoServlet
printZnode(json, path);
}
- /*
- if (stat.getNumChildren() != 0)
- {
- writeKeyValue(json, "children_count", stat.getNumChildren(), false );
- out.println(", \"children_count\" : \"" + stat.getNumChildren() + "\"");
- }
- */
-
- //if (stat.getDataLength() != 0)
- if (data != null) {
- String str = new BytesRef(data).utf8ToString();
- //?? writeKeyValue(json, "content", str, false );
- // Does nothing now, but on the assumption this will be used later we'll leave it in. If it comes out
- // the catches below need to be restructured.
- }
} catch (IllegalArgumentException e) {
// path doesn't exist (must have been removed)
writeKeyValue(json, "warning", "(path gone)", false);
@@ -381,6 +366,16 @@ public final class ZookeeperInfoServlet
// Trickily, the call to zkClient.getData fills in the stat variable
byte[] data = zkClient.getData(path, null, stat, true);
+ String dataStr = null;
+ String dataStrErr = null;
+ if (null != data) {
+ try {
+ dataStr = (new BytesRef(data)).utf8ToString();
+ } catch (Exception e) {
+ dataStrErr = "data is not parsable as a utf8 String: " + e.toString();
+ }
+ }
+
json.writeString("znode");
json.writeNameSeparator();
json.startObject();
@@ -397,15 +392,18 @@ public final class ZookeeperInfoServlet
writeKeyValue(json, "ctime", time(stat.getCtime()), false);
writeKeyValue(json, "cversion", stat.getCversion(), false);
writeKeyValue(json, "czxid", stat.getCzxid(), false);
- writeKeyValue(json, "dataLength", stat.getDataLength(), false);
writeKeyValue(json, "ephemeralOwner", stat.getEphemeralOwner(), false);
writeKeyValue(json, "mtime", time(stat.getMtime()), false);
writeKeyValue(json, "mzxid", stat.getMzxid(), false);
writeKeyValue(json, "pzxid", stat.getPzxid(), false);
+ writeKeyValue(json, "dataLength", stat.getDataLength(), false);
+ if (null != dataStrErr) {
+ writeKeyValue(json, "dataNote", dataStrErr, false);
+ }
json.endObject();
- if (data != null) {
- writeKeyValue(json, "data", new BytesRef(data).utf8ToString(), false);
+ if (null != dataStr) {
+ writeKeyValue(json, "data", dataStr, false);
}
json.endObject();
} catch (KeeperException e) {
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/spelling/SpellCheckCollator.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/spelling/SpellCheckCollator.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/spelling/SpellCheckCollator.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/spelling/SpellCheckCollator.java Mon Oct 21 18:58:24 2013
@@ -147,10 +147,14 @@ public class SpellCheckCollator {
hits = (Integer) checkResponse.rsp.getToLog().get("hits");
} catch (EarlyTerminatingCollectorException etce) {
assert (docCollectionLimit > 0);
- if (etce.getLastDocId() + 1 == maxDocId) {
- hits = docCollectionLimit;
+ assert 0 < etce.getNumberScanned();
+ assert 0 < etce.getNumberCollected();
+
+ if (etce.getNumberScanned() == maxDocId) {
+ hits = etce.getNumberCollected();
} else {
- hits = maxDocId / ((etce.getLastDocId() + 1) / docCollectionLimit);
+ hits = (int) ( ((float)( maxDocId * etce.getNumberCollected() ))
+ / (float)etce.getNumberScanned() );
}
} catch (Exception e) {
LOG.warn("Exception trying to re-query to check if a spell check possibility would return any hits.", e);
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/store/hdfs/HdfsDirectory.java Mon Oct 21 18:58:24 2013
@@ -23,22 +23,24 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.BaseDirectory;
+import org.apache.lucene.store.BufferedIndexOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.NoLockFactory;
+import org.apache.lucene.util.IOUtils;
import org.apache.solr.store.blockcache.CustomBufferedIndexInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class HdfsDirectory extends Directory {
+public class HdfsDirectory extends BaseDirectory {
public static Logger LOG = LoggerFactory.getLogger(HdfsDirectory.class);
public static final int BUFFER_SIZE = 8192;
@@ -62,7 +64,7 @@ public class HdfsDirectory extends Direc
fileSystem.mkdirs(hdfsDirPath);
}
} catch (Exception e) {
- IOUtils.closeQuietly(fileSystem);
+ org.apache.solr.util.IOUtils.closeQuietly(fileSystem);
throw new RuntimeException("Problem creating directory: " + hdfsDirPath,
e);
}
@@ -108,7 +110,7 @@ public class HdfsDirectory extends Direc
}
private IndexInput openInput(String name, int bufferSize) throws IOException {
- return new HdfsNormalIndexInput(name, getFileSystem(), new Path(
+ return new HdfsIndexInput(name, getFileSystem(), new Path(
hdfsDirPath, name), BUFFER_SIZE);
}
@@ -163,16 +165,16 @@ public class HdfsDirectory extends Direc
return configuration;
}
- static class HdfsNormalIndexInput extends CustomBufferedIndexInput {
+ static class HdfsIndexInput extends CustomBufferedIndexInput {
public static Logger LOG = LoggerFactory
- .getLogger(HdfsNormalIndexInput.class);
+ .getLogger(HdfsIndexInput.class);
private final Path path;
private final FSDataInputStream inputStream;
private final long length;
private boolean clone = false;
- public HdfsNormalIndexInput(String name, FileSystem fileSystem, Path path,
+ public HdfsIndexInput(String name, FileSystem fileSystem, Path path,
int bufferSize) throws IOException {
super(name);
this.path = path;
@@ -185,12 +187,12 @@ public class HdfsDirectory extends Direc
@Override
protected void readInternal(byte[] b, int offset, int length)
throws IOException {
- inputStream.read(getFilePointer(), b, offset, length);
+ inputStream.readFully(getFilePointer(), b, offset, length);
}
@Override
protected void seekInternal(long pos) throws IOException {
- inputStream.seek(pos);
+
}
@Override
@@ -208,13 +210,13 @@ public class HdfsDirectory extends Direc
@Override
public IndexInput clone() {
- HdfsNormalIndexInput clone = (HdfsNormalIndexInput) super.clone();
+ HdfsIndexInput clone = (HdfsIndexInput) super.clone();
clone.clone = true;
return clone;
}
}
- static class HdfsIndexOutput extends IndexOutput {
+ static class HdfsIndexOutput extends BufferedIndexOutput {
private HdfsFileWriter writer;
@@ -224,33 +226,26 @@ public class HdfsDirectory extends Direc
@Override
public void close() throws IOException {
- writer.close();
- }
-
- @Override
- public void flush() throws IOException {
- writer.flush();
+ IOException priorE = null;
+ try {
+ super.close();
+ } catch (IOException ioe) {
+ priorE = ioe;
+ } finally {
+ IOUtils.closeWhileHandlingException(priorE, writer);
+ }
}
-
+
@Override
- public long getFilePointer() {
- return writer.getPosition();
+ protected void flushBuffer(byte[] b, int offset, int len)
+ throws IOException {
+ writer.writeBytes(b, offset, len);
}
-
+
@Override
- public long length() {
+ public long length() throws IOException {
return writer.length();
}
-
- @Override
- public void writeByte(byte b) throws IOException {
- writer.writeByte(b);
- }
-
- @Override
- public void writeBytes(byte[] b, int offset, int length) throws IOException {
- writer.writeBytes(b, offset, length);
- }
}
@Override
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileWriter.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileWriter.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileWriter.java Mon Oct 21 18:58:24 2013
@@ -17,6 +17,7 @@ package org.apache.solr.store.hdfs;
* limitations under the License.
*/
+import java.io.Closeable;
import java.io.IOException;
import java.util.EnumSet;
@@ -31,7 +32,7 @@ import org.apache.lucene.store.DataOutpu
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class HdfsFileWriter extends DataOutput {
+public class HdfsFileWriter extends DataOutput implements Closeable {
public static Logger LOG = LoggerFactory.getLogger(HdfsFileWriter.class);
public static final String HDFS_SYNC_BLOCK = "solr.hdfs.sync.block";
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java Mon Oct 21 18:58:24 2013
@@ -17,6 +17,10 @@
package org.apache.solr.update;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexDocument;
import org.apache.lucene.index.Term;
@@ -28,11 +32,6 @@ import org.apache.solr.request.SolrQuery
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
/**
*
*/
@@ -194,15 +193,17 @@ public class AddUpdateCommand extends Up
private List<SolrInputDocument> flatten(SolrInputDocument root) {
List<SolrInputDocument> unwrappedDocs = new ArrayList<SolrInputDocument>();
recUnwrapp(unwrappedDocs, root);
- Collections.reverse(unwrappedDocs);
return unwrappedDocs;
}
private void recUnwrapp(List<SolrInputDocument> unwrappedDocs, SolrInputDocument currentDoc) {
- unwrappedDocs.add(currentDoc);
- for (SolrInputDocument child : currentDoc.getChildDocuments()) {
- recUnwrapp(unwrappedDocs, child);
+ List<SolrInputDocument> children = currentDoc.getChildDocuments();
+ if (children != null) {
+ for (SolrInputDocument child : children) {
+ recUnwrapp(unwrappedDocs, child);
+ }
}
+ unwrappedDocs.add(currentDoc);
}
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java Mon Oct 21 18:58:24 2013
@@ -42,22 +42,18 @@ import org.apache.solr.util.IOUtils;
/** @lucene.experimental */
public class HdfsUpdateLog extends UpdateLog {
- private FileSystem fs;
- private Path tlogDir;
- private String confDir;
+ private volatile FileSystem fs;
+ private volatile Path tlogDir;
+ private final String confDir;
public HdfsUpdateLog() {
-
+ this.confDir = null;
}
public HdfsUpdateLog(String confDir) {
this.confDir = confDir;
}
- public FileSystem getFs() {
- return fs;
- }
-
// HACK
// while waiting for HDFS-3107, instead of quickly
// dropping, we slowly apply
@@ -118,6 +114,14 @@ public class HdfsUpdateLog extends Updat
}
try {
+ if (fs != null) {
+ fs.close();
+ }
+ } catch (IOException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+
+ try {
fs = FileSystem.newInstance(new Path(dataDir).toUri(), getConf());
} catch (IOException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/PeerSync.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/PeerSync.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/PeerSync.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/PeerSync.java Mon Oct 21 18:58:24 2013
@@ -215,8 +215,7 @@ public class PeerSync {
if (startingVersions != null) {
if (startingVersions.size() == 0) {
- // no frame of reference to tell of we've missed updates
- log.warn("no frame of reference to tell of we've missed updates");
+ log.warn("no frame of reference to tell if we've missed updates");
return false;
}
Collections.sort(startingVersions, absComparator);
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Mon Oct 21 18:58:24 2013
@@ -19,30 +19,21 @@ package org.apache.solr.update;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ExecutorService;
+import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.UpdateRequestExt;
+import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.Diagnostics;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.util.AdjustableSemaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,469 +41,207 @@ import org.slf4j.LoggerFactory;
public class SolrCmdDistributor {
private static final int MAX_RETRIES_ON_FORWARD = 15;
public static Logger log = LoggerFactory.getLogger(SolrCmdDistributor.class);
-
- static AdjustableSemaphore semaphore = new AdjustableSemaphore(8);
-
- CompletionService<Request> completionService;
- Set<Future<Request>> pending;
-
- int maxBufferedAddsPerServer = 10;
- int maxBufferedDeletesPerServer = 10;
-
- private Response response = new Response();
-
- private final Map<Node,List<AddRequest>> adds = new HashMap<Node,List<AddRequest>>();
- private final Map<Node,List<DeleteRequest>> deletes = new HashMap<Node,List<DeleteRequest>>();
- private UpdateShardHandler updateShardHandler;
- class AddRequest {
- AddUpdateCommand cmd;
- ModifiableSolrParams params;
- }
+ private StreamingSolrServers servers;
- class DeleteRequest {
- DeleteUpdateCommand cmd;
- ModifiableSolrParams params;
- }
+ private List<Error> allErrors = new ArrayList<Error>();
+ private List<Error> errors = new ArrayList<Error>();
public static interface AbortCheck {
public boolean abortCheck();
}
- public SolrCmdDistributor(int numHosts, UpdateShardHandler updateShardHandler) {
- int maxPermits = Math.max(16, numHosts * 16);
- // limits how many tasks can actually execute at once
- if (maxPermits != semaphore.getMaxPermits()) {
- semaphore.setMaxPermits(maxPermits);
- }
-
- this.updateShardHandler = updateShardHandler;
- completionService = new ExecutorCompletionService<Request>(updateShardHandler.getCmdDistribExecutor());
- pending = new HashSet<Future<Request>>();
+ public SolrCmdDistributor(ExecutorService updateExecutor) {
+ servers = new StreamingSolrServers(updateExecutor);
}
public void finish() {
+ try {
+ servers.blockUntilFinished();
+ doRetriesIfNeeded();
+ } finally {
+ servers.shutdown();
+ }
+ }
- flushAdds(1);
- flushDeletes(1);
+ private void doRetriesIfNeeded() {
+ // NOTE: retries will be forwards to a single url
+
+ List<Error> errors = new ArrayList<Error>(this.errors);
+ errors.addAll(servers.getErrors());
+ allErrors.addAll(errors);
+ boolean blockUntilFinishedAgain = false;
+ for (Error err : errors) {
+ String oldNodeUrl = err.req.node.getUrl();
+
+ // if there is a retry url, we want to retry...
+ boolean isRetry = err.req.node.checkRetry();
+ boolean doRetry = false;
+ int rspCode = err.statusCode;
+
+ if (testing_errorHook != null) Diagnostics.call(testing_errorHook, err.e);
+
+ // this can happen in certain situations such as shutdown
+ if (isRetry) {
+ if (rspCode == 404 || rspCode == 403 || rspCode == 503
+ || rspCode == 500) {
+ doRetry = true;
+ }
+
+ // if its an ioexception, lets try again
+ if (err.e instanceof IOException) {
+ doRetry = true;
+ } else if (err.e instanceof SolrServerException) {
+ if (((SolrServerException) err.e).getRootCause() instanceof IOException) {
+ doRetry = true;
+ }
+ }
+ }
+
+ if (isRetry && err.req.retries < MAX_RETRIES_ON_FORWARD && doRetry) {
+ err.req.retries++;
- checkResponses(true);
- }
-
- public void distribDelete(DeleteUpdateCommand cmd, List<Node> urls, ModifiableSolrParams params) throws IOException {
- checkResponses(false);
+ SolrException.log(SolrCmdDistributor.log, "forwarding update to "
+ + oldNodeUrl + " failed - retrying ... ");
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn(null, e);
+ }
+
+ submit(err.req);
+ blockUntilFinishedAgain = true;
+ }
+ }
+
+ servers.clearErrors();
+ this.errors.clear();
- if (cmd.isDeleteById()) {
- doDelete(cmd, urls, params);
- } else {
- doDelete(cmd, urls, params);
+ if (blockUntilFinishedAgain) {
+ servers.blockUntilFinished();
+ doRetriesIfNeeded();
}
}
- public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
- checkResponses(false);
+ public void distribDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
+ distribDelete(cmd, nodes, params, false);
+ }
+
+ public void distribDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean sync) throws IOException {
- // make sure any pending deletes are flushed
- flushDeletes(1);
-
- // TODO: this is brittle
- // need to make a clone since these commands may be reused
- AddUpdateCommand clone = new AddUpdateCommand(null);
-
- clone.solrDoc = cmd.solrDoc;
- clone.commitWithin = cmd.commitWithin;
- clone.overwrite = cmd.overwrite;
- clone.setVersion(cmd.getVersion());
- AddRequest addRequest = new AddRequest();
- addRequest.cmd = clone;
- addRequest.params = params;
-
for (Node node : nodes) {
- List<AddRequest> alist = adds.get(node);
- if (alist == null) {
- alist = new ArrayList<AddRequest>(2);
- adds.put(node, alist);
+ UpdateRequest uReq = new UpdateRequest();
+ uReq.setParams(params);
+ if (cmd.isDeleteById()) {
+ uReq.deleteById(cmd.getId(), cmd.getVersion());
+ } else {
+ uReq.deleteByQuery(cmd.query);
}
- alist.add(addRequest);
+
+ submit(new Req(node, uReq, sync));
}
-
- flushAdds(maxBufferedAddsPerServer);
}
-
- /**
- * Synchronous (blocking) add to specified node. Any error returned from node is propagated.
- */
- public void syncAdd(AddUpdateCommand cmd, Node node, ModifiableSolrParams params) throws IOException {
- log.info("SYNCADD on {} : {}", node, cmd.getPrintableId());
- checkResponses(false);
- // flush all pending deletes
- flushDeletes(1);
- // flush all pending adds
- flushAdds(1);
- // finish with the pending requests
- checkResponses(false);
-
- UpdateRequestExt ureq = new UpdateRequestExt();
- ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
- ureq.setParams(params);
- syncRequest(node, ureq);
- }
-
- public void syncDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
- log.info("SYNCDELETE on {} : ", nodes, cmd);
- checkResponses(false);
- // flush all pending adds
- flushAdds(1);
- // flush all pending deletes
- flushDeletes(1);
- // finish pending requests
- checkResponses(false);
-
- DeleteUpdateCommand clonedCmd = clone(cmd);
- DeleteRequest deleteRequest = new DeleteRequest();
- deleteRequest.cmd = clonedCmd;
- deleteRequest.params = params;
-
- UpdateRequestExt ureq = new UpdateRequestExt();
- if (cmd.isDeleteById()) {
- ureq.deleteById(cmd.getId(), cmd.getVersion());
- } else {
- ureq.deleteByQuery(cmd.query);
- }
- ureq.setParams(params);
- for (Node node : nodes) {
- syncRequest(node, ureq);
- }
+
+ public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
+ distribAdd(cmd, nodes, params, false);
}
+
+ public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous) throws IOException {
- private void syncRequest(Node node, UpdateRequestExt ureq) {
- Request sreq = new Request();
- sreq.node = node;
- sreq.ureq = ureq;
-
- String url = node.getUrl();
- String fullUrl;
- if (!url.startsWith("http://") && !url.startsWith("https://")) {
- fullUrl = "http://" + url;
- } else {
- fullUrl = url;
- }
-
- HttpSolrServer server = new HttpSolrServer(fullUrl,
- updateShardHandler.getHttpClient());
-
- try {
- sreq.ursp = server.request(ureq);
- } catch (Exception e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Failed synchronous update on shard " + sreq.node + " update: " + ureq , e);
+ for (Node node : nodes) {
+ UpdateRequest uReq = new UpdateRequest();
+ uReq.setParams(params);
+ uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+ submit(new Req(node, uReq, synchronous));
}
+
}
public void distribCommit(CommitUpdateCommand cmd, List<Node> nodes,
ModifiableSolrParams params) throws IOException {
- // make sure we are ordered
- flushAdds(1);
- flushDeletes(1);
-
-
- // Wait for all outstanding responses to make sure that a commit
- // can't sneak in ahead of adds or deletes we already sent.
- // We could do this on a per-server basis, but it's more complex
- // and this solution will lead to commits happening closer together.
- checkResponses(true);
-
- // currently, we dont try to piggy back on outstanding adds or deletes
-
- UpdateRequestExt ureq = new UpdateRequestExt();
- ureq.setParams(params);
-
- addCommit(ureq, cmd);
-
- log.info("Distrib commit to:" + nodes + " params:" + params);
+ // we need to do any retries before commit...
+ servers.blockUntilFinished();
+ doRetriesIfNeeded();
- for (Node node : nodes) {
- submit(ureq, node);
- }
-
- // if the command wanted to block until everything was committed,
- // then do that here.
+ UpdateRequest uReq = new UpdateRequest();
+ uReq.setParams(params);
- if (cmd.waitSearcher) {
- checkResponses(true);
- }
- }
-
- private void doDelete(DeleteUpdateCommand cmd, List<Node> nodes,
- ModifiableSolrParams params) {
+ addCommit(uReq, cmd);
- flushAdds(1);
+ log.debug("Distrib commit to:" + nodes + " params:" + params);
- DeleteUpdateCommand clonedCmd = clone(cmd);
- DeleteRequest deleteRequest = new DeleteRequest();
- deleteRequest.cmd = clonedCmd;
- deleteRequest.params = params;
for (Node node : nodes) {
- List<DeleteRequest> dlist = deletes.get(node);
-
- if (dlist == null) {
- dlist = new ArrayList<DeleteRequest>(2);
- deletes.put(node, dlist);
- }
- dlist.add(deleteRequest);
+ submit(new Req(node, uReq, false));
}
- flushDeletes(maxBufferedDeletesPerServer);
}
- void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
+ void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
if (cmd == null) return;
ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE
: AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes);
}
-
- boolean flushAdds(int limit) {
- // check for pending deletes
-
- Set<Node> removeNodes = new HashSet<Node>();
- Set<Node> nodes = adds.keySet();
-
- for (Node node : nodes) {
- List<AddRequest> alist = adds.get(node);
- if (alist == null || alist.size() < limit) continue;
-
- UpdateRequestExt ureq = new UpdateRequestExt();
-
- ModifiableSolrParams combinedParams = new ModifiableSolrParams();
-
- for (AddRequest aReq : alist) {
- AddUpdateCommand cmd = aReq.cmd;
- combinedParams.add(aReq.params);
-
- ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
- }
-
- if (ureq.getParams() == null) ureq.setParams(new ModifiableSolrParams());
- ureq.getParams().add(combinedParams);
- removeNodes.add(node);
-
- submit(ureq, node);
- }
-
- for (Node node : removeNodes) {
- adds.remove(node);
- }
-
- return true;
- }
-
- boolean flushDeletes(int limit) {
- // check for pending deletes
-
- Set<Node> removeNodes = new HashSet<Node>();
- Set<Node> nodes = deletes.keySet();
- for (Node node : nodes) {
- List<DeleteRequest> dlist = deletes.get(node);
- if (dlist == null || dlist.size() < limit) continue;
- UpdateRequestExt ureq = new UpdateRequestExt();
+ private void submit(Req req) {
+ if (req.synchronous) {
+ servers.blockUntilFinished();
+ doRetriesIfNeeded();
- ModifiableSolrParams combinedParams = new ModifiableSolrParams();
-
- for (DeleteRequest dReq : dlist) {
- DeleteUpdateCommand cmd = dReq.cmd;
- combinedParams.add(dReq.params);
- if (cmd.isDeleteById()) {
- ureq.deleteById(cmd.getId(), cmd.getVersion());
- } else {
- ureq.deleteByQuery(cmd.query);
- }
-
- if (ureq.getParams() == null) ureq
- .setParams(new ModifiableSolrParams());
- ureq.getParams().add(combinedParams);
+ HttpSolrServer server = new HttpSolrServer(req.node.getUrl(),
+ servers.getHttpClient());
+ try {
+ server.request(req.uReq);
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Failed synchronous update on shard " + req.node + " update: " + req.uReq , e);
+ } finally {
+ server.shutdown();
}
- removeNodes.add(node);
- submit(ureq, node);
+ return;
}
- for (Node node : removeNodes) {
- deletes.remove(node);
+ SolrServer solrServer = servers.getSolrServer(req);
+ try {
+ NamedList<Object> rsp = solrServer.request(req.uReq);
+ } catch (Exception e) {
+ SolrException.log(log, e);
+ Error error = new Error();
+ error.e = e;
+ error.req = req;
+ if (e instanceof SolrException) {
+ error.statusCode = ((SolrException) e).code();
+ }
+ errors.add(error);
}
-
- return true;
- }
-
- private DeleteUpdateCommand clone(DeleteUpdateCommand cmd) {
- DeleteUpdateCommand c = (DeleteUpdateCommand)cmd.clone();
- // TODO: shouldnt the clone do this?
- c.setFlags(cmd.getFlags());
- c.setVersion(cmd.getVersion());
- return c;
}
- public static class Request {
+ public static class Req {
public Node node;
- UpdateRequestExt ureq;
- NamedList<Object> ursp;
- int rspCode;
- public Exception exception;
- int retries;
- }
-
- void submit(UpdateRequestExt ureq, Node node) {
- Request sreq = new Request();
- sreq.node = node;
- sreq.ureq = ureq;
- submit(sreq);
- }
-
- public void submit(final Request sreq) {
-
- final String url = sreq.node.getUrl();
-
- Callable<Request> task = new Callable<Request>() {
- @Override
- public Request call() throws Exception {
- Request clonedRequest = null;
- try {
- clonedRequest = new Request();
- clonedRequest.node = sreq.node;
- clonedRequest.ureq = sreq.ureq;
- clonedRequest.retries = sreq.retries;
-
- String fullUrl;
- if (!url.startsWith("http://") && !url.startsWith("https://")) {
- fullUrl = "http://" + url;
- } else {
- fullUrl = url;
- }
-
- HttpSolrServer server = new HttpSolrServer(fullUrl,
- updateShardHandler.getHttpClient());
-
- if (Thread.currentThread().isInterrupted()) {
- clonedRequest.rspCode = 503;
- clonedRequest.exception = new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Shutting down.");
- return clonedRequest;
- }
-
- clonedRequest.ursp = server.request(clonedRequest.ureq);
-
- // currently no way to get the request body.
- } catch (Exception e) {
- clonedRequest.exception = e;
- if (e instanceof SolrException) {
- clonedRequest.rspCode = ((SolrException) e).code();
- } else {
- clonedRequest.rspCode = -1;
- }
- } finally {
- semaphore.release();
- }
- return clonedRequest;
- }
- };
- try {
- semaphore.acquire();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Update thread interrupted", e);
+ public UpdateRequest uReq;
+ public int retries;
+ public boolean synchronous;
+
+ public Req(Node node, UpdateRequest uReq, boolean synchronous) {
+ this.node = node;
+ this.uReq = uReq;
+ this.synchronous = synchronous;
}
- try {
- pending.add(completionService.submit(task));
- } catch (RejectedExecutionException e) {
- semaphore.release();
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Shutting down", e);
- }
-
}
+
public static Diagnostics.Callable testing_errorHook; // called on error when forwarding request. Currently data=[this, Request]
- void checkResponses(boolean block) {
-
- while (pending != null && pending.size() > 0) {
- try {
- Future<Request> future = block ? completionService.take()
- : completionService.poll();
- if (future == null) return;
- pending.remove(future);
-
- try {
- Request sreq = future.get();
- if (sreq.rspCode != 0) {
- // error during request
-
- if (testing_errorHook != null) Diagnostics.call(testing_errorHook, this, sreq);
-
- // if there is a retry url, we want to retry...
- boolean isRetry = sreq.node.checkRetry();
- boolean doRetry = false;
- int rspCode = sreq.rspCode;
-
- // this can happen in certain situations such as shutdown
- if (isRetry) {
- if (rspCode == 404 || rspCode == 403 || rspCode == 503
- || rspCode == 500) {
- doRetry = true;
- }
-
- // if its an ioexception, lets try again
- if (sreq.exception instanceof IOException) {
- doRetry = true;
- } else if (sreq.exception instanceof SolrServerException) {
- if (((SolrServerException) sreq.exception).getRootCause() instanceof IOException) {
- doRetry = true;
- }
- }
- }
-
- if (isRetry && sreq.retries < MAX_RETRIES_ON_FORWARD && doRetry) {
- sreq.retries++;
- sreq.rspCode = 0;
- sreq.exception = null;
- SolrException.log(SolrCmdDistributor.log, "forwarding update to " + sreq.node.getUrl() + " failed - retrying ... ");
- Thread.sleep(500);
- submit(sreq);
- } else {
- Exception e = sreq.exception;
- Error error = new Error();
- error.e = e;
- error.node = sreq.node;
- response.errors.add(error);
- response.sreq = sreq;
- SolrException.log(SolrCmdDistributor.log, "shard update error "
- + sreq.node, sreq.exception);
- }
- }
-
- } catch (ExecutionException e) {
- // shouldn't happen since we catch exceptions ourselves
- SolrException.log(SolrCore.log,
- "error sending update request to shard", e);
- }
-
- } catch (InterruptedException e) {
- throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
- "interrupted waiting for shard update response", e);
- }
- }
- }
public static class Response {
- public Request sreq;
public List<Error> errors = new ArrayList<Error>();
}
public static class Error {
- public Node node;
public Exception e;
- }
-
- public Response getResponse() {
- return response;
+ public int statusCode;
+ public Req req;
}
public static abstract class Node {
@@ -595,6 +324,64 @@ public class SolrCmdDistributor {
}
}
+ // RetryNodes are used in the case of 'forward to leader' where we want
+ // to try the latest leader on a fail in the case the leader just went down.
+ public static class RetryNode extends StdNode {
+
+ private ZkStateReader zkStateReader;
+ private String collection;
+ private String shardId;
+
+ public RetryNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId) {
+ super(nodeProps);
+ this.zkStateReader = zkStateReader;
+ this.collection = collection;
+ this.shardId = shardId;
+ }
+
+ @Override
+ public boolean checkRetry() {
+ ZkCoreNodeProps leaderProps;
+ try {
+ leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderRetry(
+ collection, shardId));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+
+ this.nodeProps = leaderProps;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result
+ + ((collection == null) ? 0 : collection.hashCode());
+ result = prime * result + ((shardId == null) ? 0 : shardId.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (!super.equals(obj)) return false;
+ if (getClass() != obj.getClass()) return false;
+ RetryNode other = (RetryNode) obj;
+ if (nodeProps.getCoreUrl() == null) {
+ if (other.nodeProps.getCoreUrl() != null) return false;
+ } else if (!nodeProps.getCoreUrl().equals(other.nodeProps.getCoreUrl())) return false;
+
+ return true;
+ }
+ }
+
+ public List<Error> getErrors() {
+ return allErrors;
+ }
}
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java Mon Oct 21 18:58:24 2013
@@ -17,7 +17,6 @@
package org.apache.solr.update;
-import org.apache.commons.io.FileUtils;
import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.util.InfoStream;
@@ -34,7 +33,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.List;
@@ -166,8 +164,8 @@ public class SolrIndexConfig {
}
}
mergedSegmentWarmerInfo = getPluginInfo(prefix + "/mergedSegmentWarmer", solrConfig, def.mergedSegmentWarmerInfo);
- if (mergedSegmentWarmerInfo != null && solrConfig.reopenReaders == false) {
- throw new IllegalArgumentException("Supplying a mergedSegmentWarmer will do nothing since reopenReaders is false");
+ if (mergedSegmentWarmerInfo != null && solrConfig.nrtMode == false) {
+ throw new IllegalArgumentException("Supplying a mergedSegmentWarmer will do nothing since nrtMode is false");
}
}
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java Mon Oct 21 18:58:24 2013
@@ -31,12 +31,11 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.OpenBitSet;
+import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.HashBasedRouter;
-import org.apache.solr.common.util.Hash;
import org.apache.solr.core.SolrCore;
import org.apache.solr.schema.SchemaField;
-import org.apache.solr.schema.StrField;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
@@ -59,10 +58,11 @@ public class SolrIndexSplitter {
HashBasedRouter hashRouter;
int numPieces;
int currPartition = 0;
+ String routeFieldName;
+ String splitKey;
public SolrIndexSplitter(SplitIndexCommand cmd) {
searcher = cmd.getReq().getSearcher();
- field = searcher.getSchema().getUniqueKeyField();
ranges = cmd.ranges;
paths = cmd.paths;
cores = cmd.cores;
@@ -75,6 +75,15 @@ public class SolrIndexSplitter {
numPieces = ranges.size();
rangesArr = ranges.toArray(new DocRouter.Range[ranges.size()]);
}
+ routeFieldName = cmd.routeFieldName;
+ if (routeFieldName == null) {
+ field = searcher.getSchema().getUniqueKeyField();
+ } else {
+ field = searcher.getSchema().getField(routeFieldName);
+ }
+ if (cmd.splitKey != null) {
+ splitKey = getRouteKey(cmd.splitKey);
+ }
}
public void split() throws IOException {
@@ -170,11 +179,20 @@ public class SolrIndexSplitter {
idRef = field.getType().indexedToReadable(term, idRef);
String idString = idRef.toString();
+ if (splitKey != null) {
+ // todo have composite routers support these kind of things instead
+ String part1 = getRouteKey(idString);
+ if (part1 == null)
+ continue;
+ if (!splitKey.equals(part1)) {
+ continue;
+ }
+ }
+
int hash = 0;
if (hashRouter != null) {
- hash = hashRouter.sliceHash(idString, null, null);
+ hash = hashRouter.sliceHash(idString, null, null, null);
}
- // int hash = Hash.murmurhash3_x86_32(ref, ref.offset, ref.length, 0);
docsEnum = termsEnum.docs(liveDocs, docsEnum, DocsEnum.FLAG_NONE);
for (;;) {
@@ -196,6 +214,22 @@ public class SolrIndexSplitter {
return docSets;
}
+ private String getRouteKey(String idString) {
+ int idx = idString.indexOf(CompositeIdRouter.separator);
+ if (idx <= 0) return null;
+ String part1 = idString.substring(0, idx);
+ int commaIdx = part1.indexOf(CompositeIdRouter.bitsSeparator);
+ if (commaIdx > 0) {
+ if (commaIdx + 1 < part1.length()) {
+ char ch = part1.charAt(commaIdx + 1);
+ if (ch >= '0' && ch <= '9') {
+ part1 = part1.substring(0, commaIdx);
+ }
+ }
+ }
+ return part1;
+ }
+
// change livedocs on the reader to delete those docs we don't want
static class LiveDocsReader extends FilterAtomicReader {
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java Mon Oct 21 18:58:24 2013
@@ -24,7 +24,7 @@ import org.apache.solr.request.SolrQuery
import java.util.List;
/**
- * A merge indexes command encapsulated in an object.
+ * A split index command encapsulated in an object.
*
* @since solr 1.4
*
@@ -35,13 +35,17 @@ public class SplitIndexCommand extends U
public List<SolrCore> cores; // either paths or cores should be specified
public List<DocRouter.Range> ranges;
public DocRouter router;
+ public String routeFieldName;
+ public String splitKey;
- public SplitIndexCommand(SolrQueryRequest req, List<String> paths, List<SolrCore> cores, List<DocRouter.Range> ranges, DocRouter router) {
+ public SplitIndexCommand(SolrQueryRequest req, List<String> paths, List<SolrCore> cores, List<DocRouter.Range> ranges, DocRouter router, String routeFieldName, String splitKey) {
super(req);
this.paths = paths;
this.cores = cores;
this.ranges = ranges;
this.router = router;
+ this.routeFieldName = routeFieldName;
+ this.splitKey = splitKey;
}
@Override
@@ -56,6 +60,12 @@ public class SplitIndexCommand extends U
sb.append(",cores=" + cores);
sb.append(",ranges=" + ranges);
sb.append(",router=" + router);
+ if (routeFieldName != null) {
+ sb.append(",routeFieldName=" + routeFieldName);
+ }
+ if (splitKey != null) {
+ sb.append(",split.key=" + splitKey);
+ }
sb.append('}');
return sb.toString();
}
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/UpdateCommand.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/UpdateCommand.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/UpdateCommand.java Mon Oct 21 18:58:24 2013
@@ -17,7 +17,6 @@
package org.apache.solr.update;
-import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.request.SolrQueryRequest;
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/VersionInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/VersionInfo.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/VersionInfo.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/VersionInfo.java Mon Oct 21 18:58:24 2013
@@ -49,7 +49,7 @@ public class VersionInfo {
*/
public static SchemaField getAndCheckVersionField(IndexSchema schema)
throws SolrException {
- final String errPrefix = VERSION_FIELD + "field must exist in schema, using indexed=\"true\" stored=\"true\" and multiValued=\"false\"";
+ final String errPrefix = VERSION_FIELD + " field must exist in schema, using indexed=\"true\" stored=\"true\" and multiValued=\"false\"";
SchemaField sf = schema.getFieldOrNull(VERSION_FIELD);
if (null == sf) {
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java Mon Oct 21 18:58:24 2013
@@ -217,7 +217,7 @@ public class AddSchemaFieldsUpdateProces
String fieldType = fieldTypeObj.toString();
Collection<String> valueClasses
- = FieldMutatingUpdateProcessorFactory.oneOrMany(typeMappingNamedList, VALUE_CLASS_PARAM);
+ = typeMappingNamedList.removeConfigArgs(VALUE_CLASS_PARAM);
if (valueClasses.isEmpty()) {
throw new SolrException(SERVER_ERROR,
"Each '" + TYPE_MAPPING_PARAM + "' <lst/> must contain at least one '" + VALUE_CLASS_PARAM + "' <str>");
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/CloneFieldUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/CloneFieldUpdateProcessorFactory.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/CloneFieldUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/CloneFieldUpdateProcessorFactory.java Mon Oct 21 18:58:24 2013
@@ -176,7 +176,7 @@ public class CloneFieldUpdateProcessorFa
} else {
// source better be one or more strings
srcInclusions.fieldName = new HashSet<String>
- (FieldMutatingUpdateProcessorFactory.oneOrMany(args, "source"));
+ (args.removeConfigArgs("source"));
}
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Mon Oct 21 18:58:24 2013
@@ -17,8 +17,10 @@ package org.apache.solr.update.processor
* limitations under the License.
*/
+import org.apache.http.client.HttpClient;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
import org.apache.solr.cloud.CloudDescriptor;
@@ -43,7 +45,9 @@ import org.apache.solr.common.params.Upd
import org.apache.solr.common.util.Hash;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.RealTimeGetComponent;
+import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
@@ -53,8 +57,9 @@ import org.apache.solr.update.AddUpdateC
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.SolrCmdDistributor;
+import org.apache.solr.update.SolrCmdDistributor.Error;
import org.apache.solr.update.SolrCmdDistributor.Node;
-import org.apache.solr.update.SolrCmdDistributor.Response;
+import org.apache.solr.update.SolrCmdDistributor.RetryNode;
import org.apache.solr.update.SolrCmdDistributor.StdNode;
import org.apache.solr.update.UpdateCommand;
import org.apache.solr.update.UpdateHandler;
@@ -68,11 +73,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
@@ -104,6 +111,17 @@ public class DistributedUpdateProcessor
}
}
}
+
+ private final HttpClient client;
+ {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
+ params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 20);
+ params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 15000);
+ params.set(HttpClientUtil.PROP_SO_TIMEOUT, 60000);
+ params.set(HttpClientUtil.PROP_USE_RETRY, false);
+ client = HttpClientUtil.createClient(params);
+ }
public static final String COMMIT_END_POINT = "commit_end_point";
public static final String LOG_REPLAY = "log_replay";
@@ -142,10 +160,7 @@ public class DistributedUpdateProcessor
private boolean isSubShardLeader = false;
private List<Node> nodes;
- private int numNodes;
-
private UpdateCommand updateCommand; // the current command this processor is working on.
-
public DistributedUpdateProcessor(SolrQueryRequest req,
SolrQueryResponse rsp, UpdateRequestProcessor next) {
@@ -171,8 +186,7 @@ public class DistributedUpdateProcessor
this.zkEnabled = coreDesc.getCoreContainer().isZooKeeperAware();
zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
if (zkEnabled) {
- numNodes = zkController.getZkStateReader().getClusterState().getLiveNodes().size();
- cmdDistrib = new SolrCmdDistributor(numNodes, coreDesc.getCoreContainer().getZkController().getUpdateShardHandler());
+ cmdDistrib = new SolrCmdDistributor(coreDesc.getCoreContainer().getUpdateExecutor());
}
//this.rsp = reqInfo != null ? reqInfo.getRsp() : null;
@@ -181,6 +195,7 @@ public class DistributedUpdateProcessor
if (cloudDesc != null) {
collection = cloudDesc.getCollectionName();
}
+
}
@@ -199,7 +214,6 @@ public class DistributedUpdateProcessor
String coreName = req.getCore().getName();
ClusterState cstate = zkController.getClusterState();
- numNodes = cstate.getLiveNodes().size();
DocCollection coll = cstate.getCollection(collection);
Slice slice = coll.getRouter().getTargetSlice(id, doc, req.getParams(), coll);
@@ -301,7 +315,7 @@ public class DistributedUpdateProcessor
// Am I the leader of a shard in "construction" state?
String myShardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
Slice mySlice = coll.getSlice(myShardId);
- if (Slice.CONSTRUCTION.equals(mySlice.getState())) {
+ if (Slice.CONSTRUCTION.equals(mySlice.getState()) || Slice.RECOVERY.equals(mySlice.getState())) {
Replica myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId);
boolean amILeader = myLeader.getName().equals(
req.getCore().getCoreDescriptor().getCloudDescriptor()
@@ -326,7 +340,7 @@ public class DistributedUpdateProcessor
Collection<Slice> allSlices = coll.getSlices();
List<Node> nodes = null;
for (Slice aslice : allSlices) {
- if (Slice.CONSTRUCTION.equals(aslice.getState())) {
+ if (Slice.CONSTRUCTION.equals(aslice.getState()) || Slice.RECOVERY.equals(aslice.getState())) {
DocRouter.Range myRange = coll.getSlice(shardId).getRange();
if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
boolean isSubset = aslice.getRange() != null && aslice.getRange().isSubsetOf(myRange);
@@ -358,9 +372,9 @@ public class DistributedUpdateProcessor
if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay
String fromShard = req.getParams().get("distrib.from.parent");
if (fromShard != null) {
- if (!Slice.CONSTRUCTION.equals(mySlice.getState())) {
+ if (Slice.ACTIVE.equals(mySlice.getState())) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
- "Request says it is coming from parent shard leader but we are not in construction state");
+ "Request says it is coming from parent shard leader but we are in active state");
}
// shard splitting case -- check ranges to see if we are a sub-shard
Slice fromSlice = zkController.getClusterState().getCollection(collection).getSlice(fromShard);
@@ -451,7 +465,7 @@ public class DistributedUpdateProcessor
zkController.getBaseUrl(), req.getCore().getName()));
params.set("distrib.from.parent", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
for (Node subShardLeader : subShardLeaders) {
- cmdDistrib.syncAdd(cmd, subShardLeader, params);
+ cmdDistrib.distribAdd(cmd, Collections.singletonList(subShardLeader), params, true);
}
}
}
@@ -497,16 +511,16 @@ public class DistributedUpdateProcessor
// send in a background thread
cmdDistrib.finish();
- Response response = cmdDistrib.getResponse();
+ List<Error> errors = cmdDistrib.getErrors();
// TODO - we may need to tell about more than one error...
// if its a forward, any fail is a problem -
// otherwise we assume things are fine if we got it locally
// until we start allowing min replication param
- if (response.errors.size() > 0) {
+ if (errors.size() > 0) {
// if one node is a RetryNode, this was a forward request
- if (response.errors.get(0).node instanceof RetryNode) {
- rsp.setException(response.errors.get(0).e);
+ if (errors.get(0).req.node instanceof RetryNode) {
+ rsp.setException(errors.get(0).e);
}
// else
// for now we don't error - we assume if it was added locally, we
@@ -519,33 +533,46 @@ public class DistributedUpdateProcessor
// legit
// TODO: we should do this in the background it would seem
- for (SolrCmdDistributor.Error error : response.errors) {
- if (error.node instanceof RetryNode) {
+ for (final SolrCmdDistributor.Error error : errors) {
+ if (error.req.node instanceof RetryNode) {
// we don't try to force a leader to recover
// when we cannot forward to it
continue;
}
// TODO: we should force their state to recovering ??
- // TODO: could be sent in parallel
// TODO: do retries??
// TODO: what if its is already recovering? Right now recoveries queue up -
// should they?
- String recoveryUrl = error.node.getBaseUrl();
- HttpSolrServer server;
- log.info("try and ask " + recoveryUrl + " to recover");
- try {
- server = new HttpSolrServer(recoveryUrl);
- server.setSoTimeout(15000);
- server.setConnectionTimeout(15000);
-
- RequestRecovery recoverRequestCmd = new RequestRecovery();
- recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
- recoverRequestCmd.setCoreName(error.node.getCoreName());
-
- server.request(recoverRequestCmd);
- } catch (Exception e) {
- log.info("Could not tell a replica to recover", e);
- }
+ final String recoveryUrl = error.req.node.getBaseUrl();
+
+ Thread thread = new Thread() {
+ {
+ setDaemon(true);
+ }
+ @Override
+ public void run() {
+ log.info("try and ask " + recoveryUrl + " to recover");
+ HttpSolrServer server = new HttpSolrServer(recoveryUrl);
+ try {
+ server.setSoTimeout(60000);
+ server.setConnectionTimeout(15000);
+
+ RequestRecovery recoverRequestCmd = new RequestRecovery();
+ recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
+ recoverRequestCmd.setCoreName(error.req.node.getCoreName());
+ try {
+ server.request(recoverRequestCmd);
+ } catch (Throwable t) {
+ SolrException.log(log, recoveryUrl
+ + ": Could not tell a replica to recover", t);
+ }
+ } finally {
+ server.shutdown();
+ }
+ }
+ };
+ ExecutorService executor = req.getCore().getCoreDescriptor().getCoreContainer().getUpdateExecutor();
+ executor.execute(thread);
}
}
@@ -838,7 +865,7 @@ public class DistributedUpdateProcessor
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
params.set("distrib.from.parent", cloudDesc.getShardId());
- cmdDistrib.syncDelete(cmd, subShardLeaders, params);
+ cmdDistrib.distribDelete(cmd, subShardLeaders, params, true);
}
}
@@ -1061,7 +1088,7 @@ public class DistributedUpdateProcessor
if (leaderLogic) {
List<Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), null, null);
if (subShardLeaders != null) {
- cmdDistrib.syncDelete(cmd, subShardLeaders, params);
+ cmdDistrib.distribDelete(cmd, subShardLeaders, params, true);
}
if (replicas != null) {
cmdDistrib.distribDelete(cmd, replicas, params);
@@ -1286,61 +1313,6 @@ public class DistributedUpdateProcessor
}
return urls;
}
-
- // RetryNodes are used in the case of 'forward to leader' where we want
- // to try the latest leader on a fail in the case the leader just went down.
- public static class RetryNode extends StdNode {
-
- private ZkStateReader zkStateReader;
- private String collection;
- private String shardId;
-
- public RetryNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId) {
- super(nodeProps);
- this.zkStateReader = zkStateReader;
- this.collection = collection;
- this.shardId = shardId;
- }
-
- @Override
- public boolean checkRetry() {
- ZkCoreNodeProps leaderProps;
- try {
- leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderRetry(
- collection, shardId));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- }
-
- this.nodeProps = leaderProps;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = super.hashCode();
- result = prime * result
- + ((collection == null) ? 0 : collection.hashCode());
- result = prime * result + ((shardId == null) ? 0 : shardId.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) return true;
- if (!super.equals(obj)) return false;
- if (getClass() != obj.getClass()) return false;
- RetryNode other = (RetryNode) obj;
- if (nodeProps.getCoreUrl() == null) {
- if (other.nodeProps.getCoreUrl() != null) return false;
- } else if (!nodeProps.getCoreUrl().equals(other.nodeProps.getCoreUrl())) return false;
-
- return true;
- }
- }
/**
* Returns a boolean indicating whether or not the caller should behave as
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/FieldMutatingUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/FieldMutatingUpdateProcessorFactory.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/FieldMutatingUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/FieldMutatingUpdateProcessorFactory.java Mon Oct 21 18:58:24 2013
@@ -17,8 +17,8 @@
package org.apache.solr.update.processor;
-import java.util.Arrays;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -27,10 +27,9 @@ import java.util.Set;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
-import org.apache.solr.core.SolrCore;
import org.apache.solr.common.SolrException;
-import static org.apache.solr.common.SolrException.ErrorCode.*;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
import org.apache.solr.util.plugin.SolrCoreAware;
@@ -133,18 +132,18 @@ public abstract class FieldMutatingUpdat
protected final FieldMutatingUpdateProcessor.FieldNameSelector getSelector() {
if (null != selector) return selector;
- throw new SolrException(SERVER_ERROR, "selector was never initialized, "+
- " inform(SolrCore) never called???");
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "selector was never initialized, inform(SolrCore) never called???");
}
public static SelectorParams parseSelectorParams(NamedList args) {
SelectorParams params = new SelectorParams();
- params.fieldName = new HashSet<String>(oneOrMany(args, "fieldName"));
- params.typeName = new HashSet<String>(oneOrMany(args, "typeName"));
+ params.fieldName = new HashSet<String>(args.removeConfigArgs("fieldName"));
+ params.typeName = new HashSet<String>(args.removeConfigArgs("typeName"));
// we can compile the patterns now
- Collection<String> patterns = oneOrMany(args, "fieldRegex");
+ Collection<String> patterns = args.removeConfigArgs("fieldRegex");
if (! patterns.isEmpty()) {
params.fieldRegex = new ArrayList<Pattern>(patterns.size());
for (String s : patterns) {
@@ -152,16 +151,17 @@ public abstract class FieldMutatingUpdat
params.fieldRegex.add(Pattern.compile(s));
} catch (PatternSyntaxException e) {
throw new SolrException
- (SERVER_ERROR, "Invalid 'fieldRegex' pattern: " + s, e);
+ (SolrException.ErrorCode.SERVER_ERROR,
+ "Invalid 'fieldRegex' pattern: " + s, e);
}
}
}
// resolve this into actual Class objects later
- params.typeClass = oneOrMany(args, "typeClass");
+ params.typeClass = args.removeConfigArgs("typeClass");
- // getBooleanArg() returns null if the arg is not specified
- params.fieldNameMatchesSchemaField = getBooleanArg(args, "fieldNameMatchesSchemaField");
+ // Returns null if the arg is not specified
+ params.fieldNameMatchesSchemaField = args.removeBooleanArg("fieldNameMatchesSchemaField");
return params;
}
@@ -171,17 +171,17 @@ public abstract class FieldMutatingUpdat
List<Object> excList = args.getAll("exclude");
for (Object excObj : excList) {
if (null == excObj) {
- throw new SolrException
- (SERVER_ERROR, "'exclude' init param can not be null");
+ throw new SolrException (SolrException.ErrorCode.SERVER_ERROR,
+ "'exclude' init param can not be null");
}
if (! (excObj instanceof NamedList) ) {
- throw new SolrException
- (SERVER_ERROR, "'exclude' init param must be <lst/>");
+ throw new SolrException (SolrException.ErrorCode.SERVER_ERROR,
+ "'exclude' init param must be <lst/>");
}
NamedList exc = (NamedList) excObj;
exclusions.add(parseSelectorParams(exc));
if (0 < exc.size()) {
- throw new SolrException(SERVER_ERROR,
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unexpected 'exclude' init sub-param(s): '" +
args.getName(0) + "'");
}
@@ -207,9 +207,8 @@ public abstract class FieldMutatingUpdat
exclusions = parseSelectorExclusionParams(args);
if (0 < args.size()) {
- throw new SolrException(SERVER_ERROR,
- "Unexpected init param(s): '" +
- args.getName(0) + "'");
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Unexpected init param(s): '" + args.getName(0) + "'");
}
}
@@ -243,67 +242,4 @@ public abstract class FieldMutatingUpdat
return FieldMutatingUpdateProcessor.SELECT_ALL_FIELDS;
}
-
- /**
- * Removes all instance of the key from NamedList, returning the Set of
- * Strings that key referred to. Throws an error if the key didn't refer
- * to one or more strings (or arrays of strings)
- * @exception SolrException invalid arr/str structure.
- */
- public static Collection<String> oneOrMany(final NamedList args, final String key) {
- List<String> result = new ArrayList<String>(args.size() / 2);
- final String err = "init arg '" + key + "' must be a string "
- + "(ie: 'str'), or an array (ie: 'arr') containing strings; found: ";
-
- for (Object o = args.remove(key); null != o; o = args.remove(key)) {
- if (o instanceof String) {
- result.add((String)o);
- continue;
- }
-
- if (o instanceof Object[]) {
- o = Arrays.asList((Object[]) o);
- }
-
- if (o instanceof Collection) {
- for (Object item : (Collection)o) {
- if (! (item instanceof String)) {
- throw new SolrException(SERVER_ERROR, err + item.getClass());
- }
- result.add((String)item);
- }
- continue;
- }
-
- // who knows what the hell we have
- throw new SolrException(SERVER_ERROR, err + o.getClass());
- }
-
- return result;
- }
-
- /**
- * Removes the first instance of the key from NamedList, returning the Boolean
- * that key referred to, or null if the key is not specified.
- * @exception SolrException invalid type or structure
- */
- public static Boolean getBooleanArg(final NamedList args, final String key) {
- Boolean bool;
- List values = args.getAll(key);
- if (0 == values.size()) {
- return null;
- }
- if (values.size() > 1) {
- throw new SolrException(SERVER_ERROR, "Only one '" + key + "' is allowed");
- }
- Object o = args.remove(key);
- if (o instanceof Boolean) {
- bool = (Boolean)o;
- } else if (o instanceof CharSequence) {
- bool = Boolean.parseBoolean(o.toString());
- } else {
- throw new SolrException(SERVER_ERROR, "'" + key + "' must have type 'bool' or 'str'; found " + o.getClass());
- }
- return bool;
- }
}
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/ParseBooleanFieldUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/ParseBooleanFieldUpdateProcessorFactory.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/ParseBooleanFieldUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/ParseBooleanFieldUpdateProcessorFactory.java Mon Oct 21 18:58:24 2013
@@ -112,7 +112,7 @@ public class ParseBooleanFieldUpdateProc
}
}
- Collection<String> trueValuesParam = oneOrMany(args, TRUE_VALUES_PARAM);
+ Collection<String> trueValuesParam = args.removeConfigArgs(TRUE_VALUES_PARAM);
if ( ! trueValuesParam.isEmpty()) {
trueValues.clear();
for (String trueVal : trueValuesParam) {
@@ -120,7 +120,7 @@ public class ParseBooleanFieldUpdateProc
}
}
- Collection<String> falseValuesParam = oneOrMany(args, FALSE_VALUES_PARAM);
+ Collection<String> falseValuesParam = args.removeConfigArgs(FALSE_VALUES_PARAM);
if ( ! falseValuesParam.isEmpty()) {
falseValues.clear();
for (String val : falseValuesParam) {
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/ParseDateFieldUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/ParseDateFieldUpdateProcessorFactory.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/ParseDateFieldUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/ParseDateFieldUpdateProcessorFactory.java Mon Oct 21 18:58:24 2013
@@ -150,7 +150,7 @@ public class ParseDateFieldUpdateProcess
defaultTimeZone = DateTimeZone.forID(defaultTimeZoneParam.toString());
}
- Collection<String> formatsParam = oneOrMany(args, FORMATS_PARAM);
+ Collection<String> formatsParam = args.removeConfigArgs(FORMATS_PARAM);
if (null != formatsParam) {
for (String value : formatsParam) {
formats.put(value, DateTimeFormat.forPattern(value).withZone(defaultTimeZone).withLocale(locale));
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/StatelessScriptUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/StatelessScriptUpdateProcessorFactory.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/StatelessScriptUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/StatelessScriptUpdateProcessorFactory.java Mon Oct 21 18:58:24 2013
@@ -173,7 +173,7 @@ public class StatelessScriptUpdateProces
@Override
public void init(NamedList args) {
Collection<String> scripts =
- FieldMutatingUpdateProcessorFactory.oneOrMany(args, SCRIPT_ARG);
+ args.removeConfigArgs(SCRIPT_ARG);
if (scripts.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"StatelessScriptUpdateProcessorFactory must be " +
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/UniqFieldsUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/UniqFieldsUpdateProcessorFactory.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/UniqFieldsUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/UniqFieldsUpdateProcessorFactory.java Mon Oct 21 18:58:24 2013
@@ -23,6 +23,9 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.Map;
+
+import org.apache.solr.core.SolrCore;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.NamedList;
@@ -30,77 +33,52 @@ import org.apache.solr.request.SolrQuery
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.AddUpdateCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
- * A non-duplicate processor. Removes duplicates in the specified fields.
- *
- * <pre class="prettyprint" >
- * <updateRequestProcessorChain name="uniq-fields">
- * <processor class="org.apache.solr.update.processor.UniqFieldsUpdateProcessorFactory">
- * <lst name="fields">
- * <str>uniq</str>
- * <str>uniq2</str>
- * <str>uniq3</str>
- * </lst>
- * </processor>
- * <processor class="solr.RunUpdateProcessorFactory" />
- * </updateRequestProcessorChain></pre>
+ * Removes duplicate values found in fields matching the specified conditions.
+ * The existing field values are iterated in order, and values are removed when
+ * they are equal to a value that has already been seen for this field.
+ * <p>
+ * By default this processor matches no fields.
+ * </p>
*
+ * <p>
+ * In the example configuration below, if a document initially contains the values
+ * <code>"Steve","Lucy","Jim",Steve","Alice","Bob","Alice"</code> in a field named
+ * <code>foo_uniq</code> then using this processor will result in the final list of
+ * field values being <code>"Steve","Lucy","Jim","Alice","Bob"</code>
+ * </p>
+ * <pre class="prettyprint">
+ * <processor class="solr.UniqFieldsUpdateProcessorFactory">
+ * <str name="fieldRegex">.*_uniq</str>
+ * </processor>
+ * </pre>
*/
-public class UniqFieldsUpdateProcessorFactory extends UpdateRequestProcessorFactory {
+public class UniqFieldsUpdateProcessorFactory extends FieldValueSubsetUpdateProcessorFactory {
- private Set<String> fields;
+ public final static Logger log = LoggerFactory.getLogger(UniqFieldsUpdateProcessorFactory.class);
- @SuppressWarnings("unchecked")
- @Override
- public void init(@SuppressWarnings("rawtypes") NamedList args) {
- NamedList<String> flst = (NamedList<String>)args.get("fields");
- if(flst != null){
- fields = new HashSet<String>();
- for(int i = 0; i < flst.size(); i++){
- fields.add(flst.getVal(i));
- }
- }
- }
-
@Override
- public UpdateRequestProcessor getInstance(SolrQueryRequest req,
- SolrQueryResponse rsp,
- UpdateRequestProcessor next) {
- return new UniqFieldsUpdateProcessor(next, fields);
- }
-
- public class UniqFieldsUpdateProcessor extends UpdateRequestProcessor {
+ public FieldMutatingUpdateProcessor.FieldNameSelector
+ getDefaultSelector(final SolrCore core) {
- private final Set<String> fields;
+ return FieldMutatingUpdateProcessor.SELECT_NO_FIELDS;
+ }
- public UniqFieldsUpdateProcessor(UpdateRequestProcessor next,
- Set<String> fields) {
- super(next);
- this.fields = fields;
- }
-
- @Override
- public void processAdd(AddUpdateCommand cmd) throws IOException {
- if(fields != null){
- SolrInputDocument solrInputDocument = cmd.getSolrInputDocument();
- List<Object> uniqList = new ArrayList<Object>();
- for (String field : fields) {
- uniqList.clear();
- Collection<Object> col = solrInputDocument.getFieldValues(field);
- if (col != null) {
- for (Object o : col) {
- if(!uniqList.contains(o))
- uniqList.add(o);
- }
- solrInputDocument.remove(field);
- for (Object o : uniqList) {
- solrInputDocument.addField(field, o);
- }
- }
- }
+ @Override
+ @SuppressWarnings("unchecked")
+ public Collection pickSubset(Collection values) {
+ Set<Object> uniqs = new HashSet<Object>();
+ List<Object> result = new ArrayList<Object>(values.size());
+ for (Object o : values) {
+ if (!uniqs.contains(o)) {
+ uniqs.add(o);
+ result.add(o);
}
- super.processAdd(cmd);
}
+ return result;
}
}
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java Mon Oct 21 18:58:24 2013
@@ -112,6 +112,8 @@ public final class UpdateRequestProcesso
(null != info.name ? info.name : "") + "\"" +
(info.isDefault() ? " (default)" : "");
+ log.info("creating " + infomsg);
+
// wrap in an ArrayList so we know we know we can do fast index lookups
// and that add(int,Object) is supported
List<UpdateRequestProcessorFactory> list = new ArrayList
Modified: lucene/dev/branches/lucene4956/solr/core/src/test-files/solr/collection1/conf/schema-docValuesFaceting.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/test-files/solr/collection1/conf/schema-docValuesFaceting.xml?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/test-files/solr/collection1/conf/schema-docValuesFaceting.xml (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/test-files/solr/collection1/conf/schema-docValuesFaceting.xml Mon Oct 21 18:58:24 2013
@@ -26,17 +26,16 @@
<fields>
<field name="id" type="string" indexed="true" stored="true" docValues="false" multiValued="false" required="true"/>
<field name="id_dv" type="string" indexed="false" stored="false" docValues="true" multiValued="false" required="true"/>
- <!-- TODO: improve this test so we don't have to make all these DV types multivalued (for missing values) -->
<dynamicField name="*_i" type="int" indexed="true" stored="false" docValues="false"/>
- <dynamicField name="*_i_dv" type="int" indexed="false" stored="false" docValues="true" multiValued="true"/>
+ <dynamicField name="*_i_dv" type="int" indexed="false" stored="false" docValues="true"/>
<dynamicField name="*_is" type="int" indexed="true" stored="false" docValues="false" multiValued="true"/>
<dynamicField name="*_is_dv" type="int" indexed="false" stored="false" docValues="true" multiValued="true"/>
- <dynamicField name="*_s" type="string" indexed="true" stored="false" docValues="false" multiValued="true"/>
- <dynamicField name="*_s_dv" type="string" indexed="false" stored="false" docValues="true" multiValued="true"/>
+ <dynamicField name="*_s" type="string" indexed="true" stored="false" docValues="false"/>
+ <dynamicField name="*_s_dv" type="string" indexed="false" stored="false" docValues="true"/>
<dynamicField name="*_ss" type="string" indexed="true" stored="false" docValues="false" multiValued="true"/>
<dynamicField name="*_ss_dv" type="string" indexed="false" stored="false" docValues="true" multiValued="true"/>
<dynamicField name="*_f" type="float" indexed="true" stored="false" docValues="false"/>
- <dynamicField name="*_f_dv" type="float" indexed="false" stored="false" docValues="true" multiValued="true"/>
+ <dynamicField name="*_f_dv" type="float" indexed="false" stored="false" docValues="true"/>
</fields>
<defaultSearchField>id</defaultSearchField>