You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/10/10 01:46:00 UTC
svn commit: r1180745 [1/2] - in /lucene/dev/branches/solrcloud:
solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/
solr/core/src/java/org/apache/solr/client/solrj/embedded/
solr/core/src/java/org/apache/solr/cloud/ solr/core/sr...
Author: markrmiller
Date: Sun Oct 9 23:45:59 2011
New Revision: 1180745
URL: http://svn.apache.org/viewvc?rev=1180745&view=rev
Log:
SOLR-2358: Distributing Indexing - early infrastructure and tests
Added:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (with props)
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java (with props)
lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-distrib-update.xml (with props)
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java (with props)
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/processor/TestDistributedUpdate.java (with props)
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java (with props)
lucene/dev/branches/solrcloud/src/
Modified:
lucene/dev/branches/solrcloud/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java
lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/schema/TestBinaryField.java
lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java
lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/MultiCoreExampleJettyTest.java
lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/SolrJettyTestBase.java
Modified: lucene/dev/branches/solrcloud/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java (original)
+++ lucene/dev/branches/solrcloud/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java Sun Oct 9 23:45:59 2011
@@ -145,9 +145,8 @@ public class TestContentStreamDataSource
}
private JettySolrRunner createJetty(SolrInstance instance) throws Exception {
- System.setProperty("solr.solr.home", instance.getHomeDir());
System.setProperty("solr.data.dir", instance.getDataDir());
- JettySolrRunner jetty = new JettySolrRunner("/solr", 0);
+ JettySolrRunner jetty = new JettySolrRunner(instance.getHomeDir(), "/solr", 0);
jetty.start();
return jetty;
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java Sun Oct 9 23:45:59 2011
@@ -51,19 +51,20 @@ public class JettySolrRunner {
private boolean waitOnSolr = false;
- public JettySolrRunner(String context, int port) {
- this.init(context, port);
+ public JettySolrRunner(String solrHome, String context, int port) {
+ this.init(solrHome, context, port);
}
- public JettySolrRunner(String context, int port, String solrConfigFilename) {
- this.init(context, port);
+ public JettySolrRunner(String solrHome, String context, int port, String solrConfigFilename) {
+ this.init(solrHome, context, port);
this.solrConfigFilename = solrConfigFilename;
}
- private void init(String context, int port) {
+ private void init(String solrHome, String context, int port) {
this.context = context;
server = new Server(port);
server.setStopAtShutdown(true);
+ System.setProperty("solr.solr.home", solrHome);
if (System.getProperty("jetty.testMode") != null) {
// SelectChannelConnector connector = new SelectChannelConnector();
// Normal SocketConnector is what solr's example server uses by default
@@ -99,6 +100,8 @@ public class JettySolrRunner {
Handler.REQUEST);
if (solrConfigFilename != null)
System.clearProperty("solrconfig");
+
+ System.clearProperty("solr.solr.home");
}
public void lifeCycleFailure(LifeCycle arg0, Throwable arg1) {
@@ -172,7 +175,7 @@ public class JettySolrRunner {
*/
public static void main(String[] args) {
try {
- JettySolrRunner jetty = new JettySolrRunner("/solr", 8983);
+ JettySolrRunner jetty = new JettySolrRunner(".", "/solr", 8983);
jetty.start();
} catch (Exception ex) {
ex.printStackTrace();
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java Sun Oct 9 23:45:59 2011
@@ -17,6 +17,7 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
+import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collections;
@@ -27,6 +28,7 @@ import java.util.regex.Pattern;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.zookeeper.CreateMode;
@@ -77,13 +79,15 @@ public class SliceLeaderElector {
* @param collection
* @param seq
* @param leaderId
+ * @param props
* @throws KeeperException
* @throws InterruptedException
+ * @throws IOException
* @throws UnsupportedEncodingException
*/
private void checkIfIamLeader(final String shardId, final String collection,
- final int seq, final String leaderId) throws KeeperException,
- InterruptedException {
+ final int seq, final String leaderId, final ZkNodeProps props) throws KeeperException,
+ InterruptedException, IOException {
// get all other numbers...
String holdElectionPath = getElectionPath(shardId, collection)
+ ELECTION_NODE;
@@ -91,7 +95,7 @@ public class SliceLeaderElector {
sortSeqs(seqs);
List<Integer> intSeqs = getSeqs(seqs);
if (seq <= intSeqs.get(0)) {
- runIamLeaderProcess(shardId, collection, leaderId);
+ runIamLeaderProcess(shardId, collection, leaderId, props);
} else {
// I am not the leader - watch the node below me
int i = 1;
@@ -111,7 +115,7 @@ public class SliceLeaderElector {
public void process(WatchedEvent event) {
// am I the next leader?
try {
- checkIfIamLeader(shardId, collection, seq, leaderId);
+ checkIfIamLeader(shardId, collection, seq, leaderId, props);
} catch (KeeperException e) {
log.warn("", e);
@@ -119,6 +123,8 @@ public class SliceLeaderElector {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.warn("", e);
+ } catch (IOException e) {
+ log.warn("", e);
}
}
@@ -126,17 +132,18 @@ public class SliceLeaderElector {
} catch (KeeperException e) {
// we couldn't set our watch - the node before us may already be down?
// we need to check if we are the leader again
- checkIfIamLeader(shardId, collection, seq, leaderId);
+ checkIfIamLeader(shardId, collection, seq, leaderId, props);
}
}
}
private void runIamLeaderProcess(final String shardId,
- final String collection, final String leaderId) throws KeeperException,
- InterruptedException {
+ final String collection, final String leaderId, ZkNodeProps props) throws KeeperException,
+ InterruptedException, IOException {
String currentLeaderZkPath = getElectionPath(shardId, collection)
+ LEADER_NODE;
- zkClient.makePath(currentLeaderZkPath + "/" + leaderId, CreateMode.EPHEMERAL);
+ // TODO: leader election tests do not currently set the props
+ zkClient.makePath(currentLeaderZkPath + "/" + leaderId, props == null ? null : props.store(), CreateMode.EPHEMERAL);
}
/**
@@ -193,14 +200,15 @@ public class SliceLeaderElector {
* @param shardId
* @param collection
* @param shardZkNodeName
+ * @param props
* @return sequential node number
* @throws KeeperException
* @throws InterruptedException
+ * @throws IOException
* @throws UnsupportedEncodingException
*/
public int joinElection(String shardId, String collection,
- String shardZkNodeName) throws KeeperException, InterruptedException,
- UnsupportedEncodingException {
+ String shardZkNodeName, ZkNodeProps props) throws KeeperException, InterruptedException, IOException {
final String shardsElectZkPath = getElectionPath(shardId, collection)
+ SliceLeaderElector.ELECTION_NODE;
@@ -209,6 +217,7 @@ public class SliceLeaderElector {
int tries = 0;
while (cont) {
try {
+
leaderSeqPath = zkClient.create(shardsElectZkPath + "/n_", null,
CreateMode.EPHEMERAL_SEQUENTIAL);
cont = false;
@@ -224,7 +233,7 @@ public class SliceLeaderElector {
}
}
int seq = getSeq(leaderSeqPath);
- checkIfIamLeader(shardId, collection, seq, shardZkNodeName);
+ checkIfIamLeader(shardId, collection, seq, shardZkNodeName, props);
return seq;
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Sun Oct 9 23:45:59 2011
@@ -19,7 +19,6 @@ package org.apache.solr.cloud;
import java.io.File;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.util.Iterator;
import java.util.List;
@@ -500,6 +499,7 @@ public final class ZkController {
if (shardId == null) {
shardId = assignShard.assignShard(collection, 3); // nocommit: hard coded
// number of slices
+ cloudDesc.setShardId(shardId);
}
String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + shardId;
@@ -510,10 +510,7 @@ public final class ZkController {
+ shardUrl);
}
- ZkNodeProps props = new ZkNodeProps();
- props.put(ZkStateReader.URL_PROP, shardUrl);
-
- props.put(ZkStateReader.NODE_NAME, getNodeName());
+ ZkNodeProps props = getShardZkProps(shardUrl);
byte[] bytes = props.store();
@@ -544,15 +541,24 @@ public final class ZkController {
}
// leader election
- doLeaderElectionProcess(shardId, collection, shardZkNodeName);
+ doLeaderElectionProcess(shardId, collection, shardZkNodeName, props);
return shardId;
}
+
+ private ZkNodeProps getShardZkProps(String shardUrl) {
+ ZkNodeProps props = new ZkNodeProps();
+ props.put(ZkStateReader.URL_PROP, shardUrl);
+
+ props.put(ZkStateReader.NODE_NAME, getNodeName());
+ return props;
+ }
+
private void doLeaderElectionProcess(String shardId,
- final String collection, String shardZkNodeName) throws KeeperException,
- InterruptedException, UnsupportedEncodingException {
+ final String collection, String shardZkNodeName, ZkNodeProps props) throws KeeperException,
+ InterruptedException, IOException {
- leaderElector.joinElection(shardId, collection, shardZkNodeName);
+ leaderElector.joinElection(shardId, collection, shardZkNodeName, props);
}
/**
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java Sun Oct 9 23:45:59 2011
@@ -47,7 +47,6 @@ public class CoreDescriptor {
this.cloudDesc = new CloudDescriptor();
// cloud collection defaults to core name
cloudDesc.setCollectionName(name.isEmpty() ? coreContainer.getDefaultCoreName() : name);
- this.cloudDesc.setShardId(coreContainer.getZkController().getNodeName() + "_" + name);
}
if (name == null) {
Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1180745&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sun Oct 9 23:45:59 2011
@@ -0,0 +1,439 @@
+package org.apache.solr.update.processor;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequestExt;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+
+// NOT mt-safe... create a new processor for each add thread
+public class DistributedUpdateProcessor extends UpdateRequestProcessor {
+ // TODO: shut this thing down
+ static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
+ 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
+
+ static HttpClient client;
+
+ static {
+ MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
+ mgr.getParams().setDefaultMaxConnectionsPerHost(8);
+ mgr.getParams().setMaxTotalConnections(200);
+ client = new HttpClient(mgr);
+ }
+
+ CompletionService<Request> completionService;
+ Set<Future<Request>> pending;
+
+ private final SolrQueryRequest req;
+ private final SolrQueryResponse rsp;
+ private final UpdateRequestProcessor next;;
+ private final SchemaField idField;
+
+ private List<String> shards;
+ private final List<AddUpdateCommand>[] adds;
+ private final List<DeleteUpdateCommand>[] deletes;
+
+ String selfStr;
+ int self;
+ int maxBufferedAddsPerServer = 10;
+ int maxBufferedDeletesPerServer = 100;
+
+ private DistributedUpdateProcessorFactory factory;
+
+ public DistributedUpdateProcessor(String shardStr, SolrQueryRequest req,
+ SolrQueryResponse rsp, DistributedUpdateProcessorFactory factory,
+ UpdateRequestProcessor next) {
+ super(next);
+ this.factory = factory;
+ this.req = req;
+ this.rsp = rsp;
+ this.next = next;
+ this.idField = req.getSchema().getUniqueKeyField();
+
+ shards = factory.shards;
+
+ String selfStr = req.getParams().get("self", factory.selfStr);
+
+ if (shardStr != null) {
+ shards = StrUtils.splitSmart(shardStr, ",", true);
+ }
+
+ self = -1;
+ if (shards != null) {
+ for (int i = 0; i < shards.size(); i++) {
+ if (shards.get(i).equals(selfStr)) {
+ self = i;
+ break;
+ }
+ }
+ }
+
+ if (shards == null) {
+ shards = new ArrayList<String>(1);
+ shards.add("self");
+ self = 0;
+ }
+
+ adds = new List[shards.size()];
+ deletes = new List[shards.size()];
+ }
+
+ private int getSlot(String id) {
+ return (id.hashCode() >>> 1) % shards.size();
+ }
+
+ @Override
+ public void processAdd(AddUpdateCommand cmd) throws IOException {
+ checkResponses(false);
+
+ SolrInputDocument doc = cmd.getSolrInputDocument();
+ SolrInputField field = doc.getField(idField.getName());
+ if (field == null) {
+ if (next != null) next.processAdd(cmd);
+ return;
+ }
+ String id = field.getFirstValue().toString();
+ int slot = getSlot(id);
+ if (slot == self) {
+ if (next != null) next.processAdd(cmd);
+ return;
+ }
+
+ // make sure any pending deletes are flushed
+ flushDeletes(slot, 1, null);
+
+ // TODO: this is brittle
+ // need to make a clone since these commands may be reused
+ AddUpdateCommand clone = new AddUpdateCommand(req);
+
+ clone.solrDoc = cmd.solrDoc;
+ clone.commitWithin = cmd.commitWithin;
+ clone.overwrite = cmd.overwrite;
+
+ // nocommit: review as far as SOLR-2685
+ // clone.indexedId = cmd.indexedId;
+ // clone.doc = cmd.doc;
+
+ List<AddUpdateCommand> alist = adds[slot];
+ if (alist == null) {
+ alist = new ArrayList<AddUpdateCommand>(2);
+ adds[slot] = alist;
+ }
+ alist.add(clone);
+
+ flushAdds(slot, maxBufferedAddsPerServer, null);
+ }
+
+ // TODO: this is brittle
+ private DeleteUpdateCommand clone(DeleteUpdateCommand cmd) {
+ DeleteUpdateCommand c = new DeleteUpdateCommand(req);
+ c.id = cmd.id;
+ c.query = cmd.query;
+ return c;
+ }
+
+ private void doDelete(int slot, DeleteUpdateCommand cmd) throws IOException {
+ if (slot == self) {
+ if (self >= 0) next.processDelete(cmd);
+ return;
+ }
+
+ flushAdds(slot, 1, null);
+
+ List<DeleteUpdateCommand> dlist = deletes[slot];
+ if (dlist == null) {
+ dlist = new ArrayList<DeleteUpdateCommand>(2);
+ deletes[slot] = dlist;
+ }
+ dlist.add(clone(cmd));
+
+ flushDeletes(slot, maxBufferedDeletesPerServer, null);
+ }
+
+ @Override
+ public void processDelete(DeleteUpdateCommand cmd) throws IOException {
+ checkResponses(false);
+
+ if (cmd.id != null) {
+ doDelete(getSlot(cmd.id), cmd);
+ } else if (cmd.query != null) {
+ // query must be broadcast to all
+ for (int slot = 0; slot < deletes.length; slot++) {
+ if (slot == self) continue;
+ doDelete(slot, cmd);
+ }
+ doDelete(self, cmd);
+ }
+ }
+
+ @Override
+ public void processCommit(CommitUpdateCommand cmd) throws IOException {
+ // Wait for all outstanding repsonses to make sure that a commit
+ // can't sneak in ahead of adds or deletes we already sent.
+ // We could do this on a per-server basis, but it's more complex
+ // and this solution will lead to commits happening closer together.
+ checkResponses(true);
+
+ for (int slot = 0; slot < shards.size(); slot++) {
+ if (slot == self) continue;
+ // piggyback on any outstanding adds or deletes if possible.
+ if (flushAdds(slot, 1, cmd)) continue;
+ if (flushDeletes(slot, 1, cmd)) continue;
+
+ UpdateRequestExt ureq = new UpdateRequestExt();
+ // pass on version
+ if (ureq.getParams() == null) {
+ ureq.setParams(new ModifiableSolrParams());
+ }
+ if (req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION) != null) {
+ ureq.getParams().add(DistributedUpdateProcessorFactory.DOCVERSION,
+ req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION));
+ }
+ ureq.getParams().add("update.chain", "distrib-update-chain");
+ addCommit(ureq, cmd);
+ submit(slot, ureq);
+ }
+ if (next != null && self >= 0) next.processCommit(cmd);
+
+ // if the command wanted to block until everything was committed,
+ // then do that here.
+ // nocommit
+ if (/* cmd.waitFlush || */cmd.waitSearcher) {
+ checkResponses(true);
+ }
+ }
+
+ @Override
+ public void finish() throws IOException {
+ for (int slot = 0; slot < shards.size(); slot++) {
+ if (slot == self) continue;
+ // piggyback on any outstanding adds or deletes if possible.
+ flushAdds(slot, 1, null);
+ flushDeletes(slot, 1, null);
+ }
+ checkResponses(true);
+ if (next != null && self >= 0) next.finish();
+ }
+
+ void checkResponses(boolean block) {
+ while (pending != null && pending.size() > 0) {
+ try {
+ Future<Request> future = block ? completionService.take()
+ : completionService.poll();
+ if (future == null) return;
+ pending.remove(future);
+
+ try {
+ Request sreq = future.get();
+ if (sreq.rspCode != 0) {
+ // error during request
+
+ // use the first exception encountered
+ if (rsp.getException() == null) {
+ Exception e = sreq.exception;
+ String newMsg = "shard update error (" + sreq.shard + "):"
+ + e.getMessage();
+ if (e instanceof SolrException) {
+ SolrException se = (SolrException) e;
+ e = new SolrException(ErrorCode.getErrorCode(se.code()),
+ newMsg, se.getCause());
+ } else {
+ e = new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "newMsg", e);
+ }
+ rsp.setException(e);
+ }
+
+ SolrException.logOnce(SolrCore.log, "shard update error ("
+ + sreq.shard + ")", sreq.exception);
+ }
+
+ } catch (ExecutionException e) {
+ // shouldn't happen since we catch exceptions ourselves
+ SolrException.log(SolrCore.log,
+ "error sending update request to shard", e);
+ }
+
+ } catch (InterruptedException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+ "interrupted waiting for shard update response", e);
+ }
+ }
+ }
+
+ void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
+ if (cmd == null) return;
+ // nocommit
+ ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE
+ : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher);
+ }
+
+ boolean flushAdds(int slot, int limit, CommitUpdateCommand ccmd) {
+ // check for pending deletes
+ List<AddUpdateCommand> alist = adds[slot];
+ if (alist == null || alist.size() < limit) return false;
+
+ UpdateRequestExt ureq = new UpdateRequestExt();
+ // pass on version
+ if (ureq.getParams() == null) {
+ ureq.setParams(new ModifiableSolrParams());
+ }
+ if (req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION) != null) {
+ ureq.getParams().add(DistributedUpdateProcessorFactory.DOCVERSION,
+ req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION));
+ }
+ ureq.getParams().add("update.chain", "distrib-update-chain");
+ addCommit(ureq, ccmd);
+
+ for (AddUpdateCommand cmd : alist) {
+ ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+ }
+
+ adds[slot] = null;
+ submit(slot, ureq);
+ return true;
+ }
+
+ boolean flushDeletes(int slot, int limit, CommitUpdateCommand ccmd) {
+ // check for pending deletes
+ List<DeleteUpdateCommand> dlist = deletes[slot];
+ if (dlist == null || dlist.size() < limit) return false;
+
+ UpdateRequestExt ureq = new UpdateRequestExt();
+ // pass on version
+ if (ureq.getParams() == null) {
+ ureq.setParams(new ModifiableSolrParams());
+ }
+ if (req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION) != null) {
+ ureq.getParams().add(DistributedUpdateProcessorFactory.DOCVERSION,
+ req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION));
+ }
+ ureq.getParams().add("update.chain", "distrib-update-chain");
+ addCommit(ureq, ccmd);
+ for (DeleteUpdateCommand cmd : dlist) {
+ if (cmd.id != null) {
+ ureq.deleteById(cmd.id);
+ }
+ if (cmd.query != null) {
+ ureq.deleteByQuery(cmd.query);
+ }
+ }
+
+ deletes[slot] = null;
+ submit(slot, ureq);
+ return true;
+ }
+
+ static class Request {
+ String shard;
+ UpdateRequestExt ureq;
+ NamedList<Object> ursp;
+ int rspCode;
+ Exception exception;
+ }
+
+ void submit(int slot, UpdateRequestExt ureq) {
+ Request sreq = new Request();
+ sreq.shard = shards.get(slot);
+ sreq.ureq = ureq;
+ submit(sreq);
+ }
+
+ void submit(final Request sreq) {
+ if (completionService == null) {
+ completionService = new ExecutorCompletionService<Request>(commExecutor);
+ pending = new HashSet<Future<Request>>();
+ }
+ String[] shards;
+ // look to see if we should send to multiple servers
+ if (sreq.shard.contains("|")) {
+ shards = sreq.shard.split("\\|");
+ } else {
+ shards = new String[1];
+ shards[0] = sreq.shard;
+ }
+ for (final String shard : shards) {
+ // TODO: when we break up shards, we might forward
+ // to self again - makes things simple here, but we could
+ // also have realized this before, done the req locally, and
+ // removed self from this list.
+
+ Callable<Request> task = new Callable<Request>() {
+ @Override
+ public Request call() throws Exception {
+
+ try {
+ String url;
+ if (!shard.startsWith("http://")) {
+ url = "http://" + sreq.shard;
+ } else {
+ url = shard;
+ }
+
+ // TODO: allow shard syntax to use : to specify replicas
+ SolrServer server = new CommonsHttpSolrServer(url, client);
+ sreq.ursp = server.request(sreq.ureq);
+
+ // currently no way to get the request body.
+ } catch (Exception e) {
+ sreq.exception = e;
+ if (e instanceof SolrException) {
+ sreq.rspCode = ((SolrException) e).code();
+ } else {
+ sreq.rspCode = -1;
+ }
+ }
+ return sreq;
+ }
+ };
+
+ pending.add(completionService.submit(task));
+ }
+ }
+}
Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java?rev=1180745&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java Sun Oct 9 23:45:59 2011
@@ -0,0 +1,165 @@
+package org.apache.solr.update.processor;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.zookeeper.KeeperException;
+
+public class DistributedUpdateProcessorFactory extends
+ UpdateRequestProcessorFactory {
+ public static final String DOCVERSION = "docversion";
+ NamedList args;
+ List<String> shards;
+ String selfStr;
+ String shardsString;
+
+ @Override
+ public void init(NamedList args) {
+ selfStr = (String) args.get("self");
+ Object o = args.get("shards");
+ if (o != null && o instanceof List) {
+ shards = (List<String>) o;
+ shardsString = StrUtils.join((List<String>) o, ',');
+ } else if (o != null && o instanceof String) {
+ shards = StrUtils.splitSmart((String) o, ",", true);
+ shardsString = (String) o;
+ }
+ }
+
+ /** return the list of shards, or null if not configured */
+ public List<String> getShards() {
+ return shards;
+ }
+
+ public String getShardsString() {
+ return shardsString;
+ }
+
+ /** return "self", or null if not configured */
+ public String getSelf() {
+ return selfStr;
+ }
+
+ @Override
+ public DistributedUpdateProcessor getInstance(SolrQueryRequest req,
+ SolrQueryResponse rsp, UpdateRequestProcessor next) {
+ CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
+
+ // TODO: could do this here, or in a previous update processor.
+ // if we are in zk mode...
+ if (coreDesc.getCoreContainer().getZkController() != null) {
+ // the leader is...
+ // TODO: if there is no leader, wait and look again
+ // TODO: we are reading the leader from zk every time - we should cache
+ // this
+ // and watch for changes
+ List<String> leaderChildren;
+ String collection = coreDesc.getCloudDescriptor().getCollectionName();
+ String shardId = coreDesc.getCloudDescriptor().getShardId();
+ ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
+ String leaderNode = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
+ + ZkStateReader.LEADER_ELECT_ZKNODE + "/" + shardId + "/leader";
+ SolrZkClient zkClient = coreDesc.getCoreContainer().getZkController()
+ .getZkClient();
+ try {
+ leaderChildren = zkClient.getChildren(leaderNode, null);
+ if (leaderChildren.size() > 0) {
+ String leader = leaderChildren.get(0);
+ ZkNodeProps zkNodeProps = new ZkNodeProps();
+ byte[] bytes = zkClient
+ .getData(leaderNode + "/" + leader, null, null);
+ zkNodeProps.load(bytes);
+ String leaderUrl = zkNodeProps.get("url");
+
+ String nodeName = req.getCore().getCoreDescriptor()
+ .getCoreContainer().getZkController().getNodeName();
+ String shardZkNodeName = nodeName + "_" + req.getCore().getName();
+
+ if (params.get(DOCVERSION) != null
+ && params.get(DOCVERSION).equals("yes")) {
+ // we got a version, just go local
+ } else if (shardZkNodeName.equals(leader)) {
+ // that means I want to forward onto my replicas...
+
+ // so get the replicas...
+ CloudState cloudState = req.getCore().getCoreDescriptor()
+ .getCoreContainer().getZkController().getCloudState();
+ Slice replicas = cloudState.getSlices(collection).get(shardId);
+ Map<String,ZkNodeProps> shardMap = replicas.getShards();
+ String self = null;
+ StringBuilder replicasUrl = new StringBuilder();
+ for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {
+ if (replicasUrl.length() > 0) {
+ replicasUrl.append("|");
+ }
+ String replicaUrl = entry.getValue().get("url");
+ if (shardZkNodeName.equals(entry.getKey())) {
+ self = replicaUrl;
+ }
+ replicasUrl.append(replicaUrl);
+ }
+ versionDoc(params);
+ params.add("self", self);
+ params.add("shards", replicasUrl.toString());
+ } else {
+ // I need to forward onto the leader...
+ // TODO: don't use leader - we need to get the real URL from the zk
+ // node
+ params.add("shards", leaderUrl);
+ }
+ req.setParams(params);
+ }
+ } catch (KeeperException e) {
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+ e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+ e);
+ } catch (IOException e) {
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+ e);
+ }
+ }
+
+ String shardStr = req.getParams().get("shards");
+ if (shards == null && shardStr == null) return null;
+ return new DistributedUpdateProcessor(shardStr, req, rsp, this, next);
+ }
+
+ private void versionDoc(ModifiableSolrParams params) {
+ params.set(DOCVERSION, "yes");
+ }
+}
Added: lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-distrib-update.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-distrib-update.xml?rev=1180745&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-distrib-update.xml (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-distrib-update.xml Sun Oct 9 23:45:59 2011
@@ -0,0 +1,52 @@
+<?xml version="1.0" ?>
+
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<!-- $Id: solrconfig-nocache.xml 1144761 2011-07-09 23:01:53Z sarowe $ $Source$
+ $Name$ -->
+
+<config>
+ <luceneMatchVersion>${tests.luceneMatchVersion:LUCENE_CURRENT}</luceneMatchVersion>
+ <dataDir>${solr.data.dir:}</dataDir>
+ <!-- The DirectoryFactory to use for indexes. solr.StandardDirectoryFactory,
+ the default, is filesystem based. solr.RAMDirectoryFactory is memory based
+ and not persistent. -->
+ <directoryFactory name="DirectoryFactory"
+ class="${solr.directoryFactory:solr.RAMDirectoryFactory}" />
+
+ <updateHandler class="solr.DirectUpdateHandler2">
+
+ </updateHandler>
+ <requestHandler name="standard" class="solr.StandardRequestHandler" />
+
+ <requestDispatcher handleSelect="true">
+ <requestParsers enableRemoteStreaming="true"
+ multipartUploadLimitInKB="2048" />
+ <httpCaching never304="true" />
+ </requestDispatcher>
+
+ <requestHandler name="/update" class="solr.XmlUpdateRequestHandler" />
+
+ <updateRequestProcessorChain name="distrib-update-chain">
+ <processor class="solr.DistributedUpdateProcessorFactory">
+ <!-- example configuration... "shards should be in the *same* order for
+ every server in a cluster. Only "self" should change to represent what server
+ *this* is. <str name="self">localhost:8983/solr</str> <arr name="shards">
+ <str>localhost:8983/solr</str> <str>localhost:7574/solr</str> </arr> -->
+ </processor>
+ <processor class="solr.LogUpdateProcessorFactory">
+ <int name="maxNumToLog">10</int>
+ </processor>
+ <processor class="solr.RunUpdateProcessorFactory" />
+ </updateRequestProcessorChain>
+
+</config>
Modified: lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml Sun Oct 9 23:45:59 2011
@@ -30,8 +30,11 @@
solr.RAMDirectoryFactory is memory based and not persistent. -->
<directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
+ <dataDir>${solr.data.dir:}</dataDir>
+
<updateHandler class="solr.DirectUpdateHandler2">
</updateHandler>
+
<requestHandler name="standard" class="solr.StandardRequestHandler"/>
<requestDispatcher handleSelect="true" >
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java Sun Oct 9 23:45:59 2011
@@ -45,10 +45,9 @@ public class TestSolrCoreProperties exte
public void setUp() throws Exception {
super.setUp();
setUpMe();
- System.setProperty("solr.solr.home", getHomeDir());
System.setProperty("solr.data.dir", getDataDir());
- solrJetty = new JettySolrRunner("/solr", 0);
+ solrJetty = new JettySolrRunner(getHomeDir(), "/solr", 0);
solrJetty.start();
String url = "http://localhost:" + solrJetty.getLocalPort() + "/solr";
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java Sun Oct 9 23:45:59 2011
@@ -84,6 +84,7 @@ public abstract class AbstractZkTestCase
putConfig(zkClient, config);
putConfig(zkClient, schema);
+ putConfig(zkClient, "solrconfig-distrib-update.xml");
putConfig(zkClient, "stopwords.txt");
putConfig(zkClient, "protwords.txt");
putConfig(zkClient, "mapping-ISOLatin1Accent.txt");
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java Sun Oct 9 23:45:59 2011
@@ -19,13 +19,11 @@ package org.apache.solr.cloud;
import java.net.MalformedURLException;
-import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.junit.BeforeClass;
/**
*
@@ -56,12 +54,6 @@ public class BasicDistributedZkTest exte
System.setProperty("CLOUD_UPDATE_DELAY", "0");
}
-
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- System.setProperty("solr.solr.home", SolrTestCaseJ4.TEST_HOME());
- }
@Override
protected void setDistributedParams(ModifiableSolrParams params) {
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java Sun Oct 9 23:45:59 2011
@@ -243,18 +243,18 @@ public class CloudStateUpdateTest extend
cloudState2 = zkController2.getCloudState();
slices = cloudState2.getSlices("testcore");
- if (slices != null && slices.containsKey(host + ":1661_solr_testcore")
- && slices.get(host + ":1661_solr_testcore").getShards().size() > 0) {
+ if (slices != null && slices.containsKey("shard1")
+ && slices.get("shard1").getShards().size() > 0) {
break;
}
Thread.sleep(500);
}
assertNotNull(slices);
- assertTrue(slices.containsKey(host + ":1661_solr_testcore"));
+ assertTrue(slices.containsKey("shard1"));
- Slice slice = slices.get(host + ":1661_solr_testcore");
- assertEquals(host + ":1661_solr_testcore", slice.getName());
+ Slice slice = slices.get("shard1");
+ assertEquals("shard1", slice.getName());
Map<String,ZkNodeProps> shards = slice.getShards();
Added: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java?rev=1180745&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java Sun Oct 9 23:45:59 2011
@@ -0,0 +1,333 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrServer;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+
+/**
+ *
+ */
+public class FullDistributedZkTest extends AbstractDistributedZkTestCase {
+
+ private static final String DEFAULT_COLLECTION = "collection1";
+ private static final boolean DEBUG = false;
+ String t1="a_t";
+ String i1="a_si";
+ String nint = "n_i";
+ String tint = "n_ti";
+ String nfloat = "n_f";
+ String tfloat = "n_tf";
+ String ndouble = "n_d";
+ String tdouble = "n_td";
+ String nlong = "n_l";
+ String tlong = "other_tl1";
+ String ndate = "n_dt";
+ String tdate = "n_tdt";
+
+ String oddField="oddField_s";
+ String missingField="ignore_exception__missing_but_valid_field_t";
+ String invalidField="ignore_exception__invalid_field_not_in_schema";
+
+ public FullDistributedZkTest() {
+ fixShardCount = true;
+ shardCount = 6;
+ System.setProperty("CLOUD_UPDATE_DELAY", "0");
+ }
+
+ @Override
+ protected void createServers(int numShards) throws Exception {
+ System.setProperty("collection", "control_collection");
+ controlJetty = createJetty(testDir, testDir + "/control/data", "control_shard");
+ System.clearProperty("collection");
+ controlClient = createNewSolrServer(controlJetty.getLocalPort());
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 1; i <= numShards; i++) {
+ if (sb.length() > 0) sb.append(',');
+ JettySolrRunner j = createJetty(testDir, testDir + "/jetty" + i, null, "solrconfig-distrib-update.xml");
+ jettys.add(j);
+ clients.add(createNewSolrServer(j.getLocalPort()));
+
+ }
+
+ // build the shard string
+ for (int i = 1; i <= numShards/2; i++) {
+ JettySolrRunner j = jettys.get(i);
+ JettySolrRunner j2 = jettys.get(i + (numShards/2 - 1));
+ if (sb.length() > 0) sb.append(',');
+ sb.append("localhost:").append(j.getLocalPort()).append(context);
+ sb.append("|localhost:").append(j2.getLocalPort()).append(context);
+ }
+ shards = sb.toString();
+ }
+
+ @Override
+ protected void setDistributedParams(ModifiableSolrParams params) {
+
+ if (r.nextBoolean()) {
+ // don't set shards, let that be figured out from the cloud state
+ params.set("distrib", "true");
+ } else {
+ // use shard ids rather than physical locations
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < shardCount / 2; i++) {
+ if (i > 0)
+ sb.append(',');
+ sb.append("shard" + (i+1));
+ }
+ params.set("shards", sb.toString());
+ params.set("distrib", "true");
+ }
+ }
+
+ @Override
+ protected void indexDoc(SolrInputDocument doc) throws IOException, SolrServerException {
+ controlClient.add(doc);
+
+ boolean pick = random.nextBoolean();
+
+ int mod = (clients.size() / 2);
+
+ int which = (doc.getField(id).toString().hashCode() & 0x7fffffff) % mod;
+
+ if (pick) {
+ which = which + mod;
+ }
+
+ CommonsHttpSolrServer client = (CommonsHttpSolrServer) clients.get(which);
+
+ UpdateRequest ureq = new UpdateRequest();
+ ureq.add(doc);
+ ureq.setParam("update.chain", "distrib-update-chain");
+ ureq.process(client);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.solr.BaseDistributedSearchTestCase#doTest()
+ *
+ * Create 3 shards, each with one replica
+ */
+ @Override
+ public void doTest() throws Exception {
+ printLayout();
+ // make sure 'shard1' was auto-assigned
+ SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT);
+ assertTrue("shard1 was not found in zk layout", zkClient.exists("/solr/collections/collection1/shards/shard1"));
+ zkClient.close();
+
+ del("*:*");
+ indexr(id,1, i1, 100, tlong, 100,t1,"now is the time for all good men"
+ ,"foo_f", 1.414f, "foo_b", "true", "foo_d", 1.414d);
+ indexr(id,2, i1, 50 , tlong, 50,t1,"to come to the aid of their country."
+ );
+ indexr(id,3, i1, 2, tlong, 2,t1,"how now brown cow"
+ );
+ indexr(id,4, i1, -100 ,tlong, 101,t1,"the quick fox jumped over the lazy dog"
+ );
+ indexr(id,5, i1, 500, tlong, 500 ,t1,"the quick fox jumped way over the lazy dog"
+ );
+ indexr(id,6, i1, -600, tlong, 600 ,t1,"humpty dumpy sat on a wall");
+ indexr(id,7, i1, 123, tlong, 123 ,t1,"humpty dumpy had a great fall");
+ indexr(id,8, i1, 876, tlong, 876,t1,"all the kings horses and all the kings men");
+ indexr(id,9, i1, 7, tlong, 7,t1,"couldn't put humpty together again");
+ indexr(id,10, i1, 4321, tlong, 4321,t1,"this too shall pass");
+ indexr(id,11, i1, -987, tlong, 987,t1,"An eye for eye only ends up making the whole world blind.");
+ indexr(id,12, i1, 379, tlong, 379,t1,"Great works are performed, not by strength, but by perseverance.");
+ indexr(id,13, i1, 232, tlong, 232,t1,"no eggs on wall, lesson learned", oddField, "odd man out");
+
+ indexr(id, 14, "SubjectTerms_mfacet", new String[] {"mathematical models", "mathematical analysis"});
+ indexr(id, 15, "SubjectTerms_mfacet", new String[] {"test 1", "test 2", "test3"});
+ indexr(id, 16, "SubjectTerms_mfacet", new String[] {"test 1", "test 2", "test3"});
+ String[] vals = new String[100];
+ for (int i=0; i<100; i++) {
+ vals[i] = "test " + i;
+ }
+ indexr(id, 17, "SubjectTerms_mfacet", vals);
+
+ for (int i=100; i<150; i++) {
+ indexr(id, i);
+ }
+
+ commit();
+
+ handle.clear();
+ handle.put("QTime", SKIPVAL);
+ handle.put("timestamp", SKIPVAL);
+
+ // random value sort
+ for (String f : fieldNames) {
+ query("q","*:*", "sort",f+" desc");
+ query("q","*:*", "sort",f+" asc");
+ }
+
+ // these queries should be exactly ordered and scores should exactly match
+ query("q","*:*", "sort",i1+" desc");
+ query("q","*:*", "sort",i1+" asc");
+ query("q","*:*", "sort",i1+" desc", "fl","*,score");
+ query("q","*:*", "sort","n_tl1 asc", "fl","score"); // test legacy behavior - "score"=="*,score"
+ query("q","*:*", "sort","n_tl1 desc");
+ handle.put("maxScore", SKIPVAL);
+ query("q","{!func}"+i1);// does not expect maxScore. So if it comes ,ignore it. JavaBinCodec.writeSolrDocumentList()
+ //is agnostic of request params.
+ handle.remove("maxScore");
+ query("q","{!func}"+i1, "fl","*,score"); // even scores should match exactly here
+
+ handle.put("highlighting", UNORDERED);
+ handle.put("response", UNORDERED);
+
+ handle.put("maxScore", SKIPVAL);
+ query("q","quick");
+ query("q","all","fl","id","start","0");
+ query("q","all","fl","foofoofoo","start","0"); // no fields in returned docs
+ query("q","all","fl","id","start","100");
+
+ handle.put("score", SKIPVAL);
+ query("q","quick","fl","*,score");
+ query("q","all","fl","*,score","start","1");
+ query("q","all","fl","*,score","start","100");
+
+ query("q","now their fox sat had put","fl","*,score",
+ "hl","true","hl.fl",t1);
+
+ query("q","now their fox sat had put","fl","foofoofoo",
+ "hl","true","hl.fl",t1);
+
+ query("q","matchesnothing","fl","*,score");
+
+ query("q","*:*", "rows",100, "facet","true", "facet.field",t1);
+ query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.limit",-1, "facet.sort","count");
+ query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.limit",-1, "facet.sort","count", "facet.mincount",2);
+ query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.limit",-1, "facet.sort","index");
+ query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.limit",-1, "facet.sort","index", "facet.mincount",2);
+ query("q","*:*", "rows",100, "facet","true", "facet.field",t1,"facet.limit",1);
+ query("q","*:*", "rows",100, "facet","true", "facet.query","quick", "facet.query","all", "facet.query","*:*");
+ query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.offset",1);
+ query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.mincount",2);
+
+ // test faceting multiple things at once
+ query("q","*:*", "rows",100, "facet","true", "facet.query","quick", "facet.query","all", "facet.query","*:*"
+ ,"facet.field",t1);
+
+ // test filter tagging, facet exclusion, and naming (multi-select facet support)
+ query("q","*:*", "rows",100, "facet","true", "facet.query","{!key=myquick}quick", "facet.query","{!key=myall ex=a}all", "facet.query","*:*"
+ ,"facet.field","{!key=mykey ex=a}"+t1
+ ,"facet.field","{!key=other ex=b}"+t1
+ ,"facet.field","{!key=again ex=a,b}"+t1
+ ,"facet.field",t1
+ ,"fq","{!tag=a}id:[1 TO 7]", "fq","{!tag=b}id:[3 TO 9]"
+ );
+ query("q", "*:*", "facet", "true", "facet.field", "{!ex=t1}SubjectTerms_mfacet", "fq", "{!tag=t1}SubjectTerms_mfacet:(test 1)", "facet.limit", "10", "facet.mincount", "1");
+
+ // test field that is valid in schema but missing in all shards
+ query("q","*:*", "rows",100, "facet","true", "facet.field",missingField, "facet.mincount",2);
+ // test field that is valid in schema and missing in some shards
+ query("q","*:*", "rows",100, "facet","true", "facet.field",oddField, "facet.mincount",2);
+
+ query("q","*:*", "sort",i1+" desc", "stats", "true", "stats.field", i1);
+
+
+ // Try to get better coverage for refinement queries by turning off over requesting.
+ // This makes it much more likely that we may not get the top facet values and hence
+ // we turn of that checking.
+ handle.put("facet_fields", SKIPVAL);
+ query("q","*:*", "rows",0, "facet","true", "facet.field",t1,"facet.limit",5, "facet.shard.limit",5);
+ // check a complex key name
+ query("q","*:*", "rows",0, "facet","true", "facet.field","{!key='a b/c \\' \\} foo'}"+t1,"facet.limit",5, "facet.shard.limit",5);
+ handle.remove("facet_fields");
+
+
+ // index the same document to two servers and make sure things
+ // don't blow up.
+ if (clients.size()>=2) {
+ index(id,100, i1, 107 ,t1,"oh no, a duplicate!");
+ for (int i=0; i<clients.size(); i++) {
+ index_specific(i, id,100, i1, 107 ,t1,"oh no, a duplicate!");
+ }
+ commit();
+ query("q","duplicate", "hl","true", "hl.fl", t1);
+ query("q","fox duplicate horses", "hl","true", "hl.fl", t1);
+ query("q","*:*", "rows",100);
+ }
+
+ // test debugging
+ handle.put("explain", UNORDERED);
+ handle.put("debug", UNORDERED);
+ handle.put("time", SKIPVAL);
+ query("q","now their fox sat had put","fl","*,score",CommonParams.DEBUG_QUERY, "true");
+ query("q", "id:[1 TO 5]", CommonParams.DEBUG_QUERY, "true");
+ query("q", "id:[1 TO 5]", CommonParams.DEBUG, CommonParams.TIMING);
+ query("q", "id:[1 TO 5]", CommonParams.DEBUG, CommonParams.RESULTS);
+ query("q", "id:[1 TO 5]", CommonParams.DEBUG, CommonParams.QUERY);
+
+ // TODO: This test currently fails because debug info is obtained only
+ // on shards with matches.
+ // query("q","matchesnothing","fl","*,score", "debugQuery", "true");
+
+ // Thread.sleep(10000000000L);
+ if (DEBUG) {
+ super.printLayout();
+ }
+ }
+
+ volatile CloudSolrServer solrj;
+
+ @Override
+ protected QueryResponse queryServer(ModifiableSolrParams params) throws SolrServerException {
+
+ if (r.nextBoolean())
+ return super.queryServer(params);
+
+ // use the distributed solrj client
+ if (solrj == null) {
+ synchronized(this) {
+ try {
+ CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress());
+ server.setDefaultCollection(DEFAULT_COLLECTION);
+ solrj = server;
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ if (r.nextBoolean())
+ params.set("collection",DEFAULT_COLLECTION);
+
+ QueryResponse rsp = solrj.query(params);
+ return rsp;
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ System.clearProperty("CLOUD_UPDATE_DELAY");
+ System.clearProperty("zkHost");
+ }
+}
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java Sun Oct 9 23:45:59 2011
@@ -79,7 +79,7 @@ public class LeaderElectionTest extends
elector.setupForSlice("shard1", "collection1");
seq = elector.joinElection("shard1", "collection1",
- Integer.toString(nodeNumber));
+ Integer.toString(nodeNumber), null);
seqToThread.put(seq, this);
// run forever - we will be explicitly killed
Thread.sleep(Integer.MAX_VALUE);
@@ -103,14 +103,14 @@ public class LeaderElectionTest extends
SliceLeaderElector elector = new SliceLeaderElector(zkClient1);
elector.setupForSlice("shard2", "collection1");
- elector.joinElection("shard2", "collection1", "dummynode1");
+ elector.joinElection("shard2", "collection1", "dummynode1", null);
SolrZkClient zkClient2 = new SolrZkClient(server.getZkAddress(), TIMEOUT);
SliceLeaderElector elector2 = new SliceLeaderElector(zkClient2);
elector2.setupForSlice("shard2", "collection1");
- elector2.joinElection("shard2", "collection1", "dummynode2");
+ elector2.joinElection("shard2", "collection1", "dummynode2", null);
List<ClientThread> threads = new ArrayList<ClientThread>();
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java Sun Oct 9 23:45:59 2011
@@ -260,11 +260,17 @@ public class ZkControllerTest extends So
zkController.createCollectionZkNode(cloudDesc);
String shard1 = zkController.register("core1", cloudDesc);
+ cloudDesc.setShardId(null);
String shard2 = zkController.register("core2", cloudDesc);
+ cloudDesc.setShardId(null);
String shard3 = zkController.register("core3", cloudDesc);
+ cloudDesc.setShardId(null);
String shard4 = zkController.register("core4", cloudDesc);
+ cloudDesc.setShardId(null);
String shard5 = zkController.register("core5", cloudDesc);
+ cloudDesc.setShardId(null);
String shard6 = zkController.register("core6", cloudDesc);
+ cloudDesc.setShardId(null);
assertEquals("shard1", shard1);
assertEquals("shard2", shard2);
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java Sun Oct 9 23:45:59 2011
@@ -105,10 +105,9 @@ public class TestReplicationHandler exte
}
private static JettySolrRunner createJetty(SolrInstance instance) throws Exception {
- System.setProperty("solr.solr.home", instance.getHomeDir());
System.setProperty("solr.data.dir", instance.getDataDir());
- JettySolrRunner jetty = new JettySolrRunner("/solr", 0);
+ JettySolrRunner jetty = new JettySolrRunner(instance.getHomeDir(), "/solr", 0);
jetty.start();
return jetty;
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/schema/TestBinaryField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/schema/TestBinaryField.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/schema/TestBinaryField.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/schema/TestBinaryField.java Sun Oct 9 23:45:59 2011
@@ -67,12 +67,11 @@ public class TestBinaryField extends Luc
out = new FileOutputStream(f);
IOUtils.copy(loader.openResource(fname), out);
out.close();
- System.setProperty("solr.solr.home", homeDir.getAbsolutePath());
System.setProperty("solr.data.dir", dataDir.getAbsolutePath());
System.setProperty("solr.test.sys.prop1", "propone");
System.setProperty("solr.test.sys.prop2", "proptwo");
- jetty = new JettySolrRunner(context, 0);
+ jetty = new JettySolrRunner(homeDir.getAbsolutePath(), context, 0);
jetty.start();
port = jetty.getLocalPort();
Added: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/processor/TestDistributedUpdate.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/processor/TestDistributedUpdate.java?rev=1180745&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/processor/TestDistributedUpdate.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/processor/TestDistributedUpdate.java Sun Oct 9 23:45:59 2011
@@ -0,0 +1,369 @@
+package org.apache.solr.update.processor;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequestExt;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+
+public class TestDistributedUpdate extends SolrTestCaseJ4 {
+
+ private static final int NUM_JETTIES = 2;
+
+ File testDir;
+
+ List<SolrServer> clients = new ArrayList<SolrServer>();
+ List<JettySolrRunner> jettys = new ArrayList<JettySolrRunner>();
+ String context = "/solr/collection1";
+ String shardStr;
+ String[] shards;
+ boolean updateSelf;
+
+ String id = "id";
+ String t1 = "a_t";
+ String i1 = "a_i";
+ String oddField = "oddField_s";
+ String missingField = "missing_but_valid_field_t";
+ String invalidField = "invalid_field_not_in_schema";
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ System.setProperty("solr.test.sys.prop1", "propone");
+ System.setProperty("solr.test.sys.prop2", "proptwo");
+ testDir = new File(TEMP_DIR, "distrib_update_test");
+ testDir.mkdirs();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ destroyServers();
+ super.tearDown();
+ }
+
+ private void createServers() throws Exception {
+ StringBuilder sb = new StringBuilder();
+ shards = new String[NUM_JETTIES];
+ for (int i = 0; i < NUM_JETTIES; i++) {
+
+ if (sb.length() > 0) sb.append(',');
+ JettySolrRunner jetty = createJetty(testDir, testDir + "/shard" + i
+ + "/data", "solrconfig-distrib-update.xml");
+ jettys.add(jetty);
+ int port = jetty.getLocalPort();
+ clients.add(createNewSolrServer(port));
+ shards[i] = "localhost:" + port + context;
+ sb.append(shards[i]);
+ }
+
+ shardStr = sb.toString();
+
+ // Assure that Solr starts with no documents
+ send(commit(u().deleteByQuery("*:*")));
+ }
+
+ private void destroyServers() throws Exception {
+ for (JettySolrRunner jetty : jettys)
+ jetty.stop();
+ clients.clear();
+ jettys.clear();
+ }
+
+ public JettySolrRunner createJetty(File baseDir, String dataDir)
+ throws Exception {
+ return createJetty(baseDir, dataDir, null, null);
+ }
+
+ public JettySolrRunner createJetty(File baseDir, String dataDir,
+ String shardId) throws Exception {
+ return createJetty(baseDir, dataDir, shardId,
+ "solrconfig-distrib-update.xml");
+ }
+
+ public JettySolrRunner createJetty(File baseDir, String dataDir,
+ String shardList, String solrConfigOverride) throws Exception {
+ System.setProperty("solr.data.dir", dataDir);
+
+ JettySolrRunner jetty = new JettySolrRunner(TEST_HOME(), "/solr", 0, solrConfigOverride);
+ if (shardList != null) {
+ System.setProperty("shard", shardList);
+ }
+ jetty.start();
+ System.clearProperty("shard");
+ return jetty;
+ }
+
+ protected SolrServer createNewSolrServer(int port) {
+ try {
+ // setup the server...
+ String url = "http://localhost:" + port + context;
+ CommonsHttpSolrServer s = new CommonsHttpSolrServer(url);
+ s.setConnectionTimeout(1000); // 1 sec
+ s.setDefaultMaxConnectionsPerHost(100);
+ s.setMaxTotalConnections(100);
+ return s;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ QueryResponse query(Object... q) throws Exception {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+
+ for (int i = 0; i < q.length; i += 2) {
+ params.add(q[i].toString(), q[i + 1].toString());
+ }
+
+ params.set("shards", shardStr);
+
+ // query a random server
+ int which = random.nextInt(clients.size());
+ SolrServer client = clients.get(which);
+ QueryResponse rsp = client.query(params);
+ return rsp;
+ }
+
+ void send(int which, AbstractUpdateRequest ureq) throws Exception {
+ ureq.setParam("update.chain", "distrib-update-chain");
+ ureq.setParam("shards", shardStr);
+ ureq.setParam("self", updateSelf ? shards[which] : "foo");
+
+ SolrServer client = clients.get(which);
+ client.request(ureq);
+ }
+
+ SolrInputDocument doc(Object... fields) {
+ SolrInputDocument doc = new SolrInputDocument();
+ for (int i = 0; i < fields.length; i += 2) {
+ doc.addField((String) (fields[i]), fields[i + 1]);
+ }
+ return doc;
+ }
+
+ // send request to a random server
+ void send(AbstractUpdateRequest ureq) throws Exception {
+ send((random.nextInt() >>> 1) % shards.length, ureq);
+ }
+
+ UpdateRequest u() {
+ return new UpdateRequest();
+ }
+
+ AbstractUpdateRequest commit(AbstractUpdateRequest ureq) {
+ ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+ return ureq;
+ }
+
+ UpdateRequest optimize(UpdateRequest ureq) {
+ ureq.setAction(UpdateRequest.ACTION.OPTIMIZE, true, true);
+ return ureq;
+ }
+
+ UpdateRequest add(UpdateRequest ureq, Object... ids) {
+ for (Object id : ids) {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", id.toString());
+ ureq.add(doc);
+ }
+ return ureq;
+ }
+
+ void verifyCount(String q, int count) throws Exception {
+ verifyCount(q, count, 0);
+ }
+
+ void verifyCount(String q, int count, int retries) throws Exception {
+ long found = query("q", q).getResults().getNumFound();
+ for (int i = 0; i < retries; i++) {
+ if (found == count) {
+ break;
+ }
+ Thread.sleep(500);
+ found = query("q", q).getResults().getNumFound();
+ }
+
+ assertEquals(count, found);
+ // use a facet to get the "real" count since distributed search
+ // can do some dedup for us.
+ assertEquals(count, query("q", "*:*", "facet", "true", "facet.query", q)
+ .getFacetQuery().get(q).longValue());
+ }
+
+ public void testStress() throws Exception {
+ int iter = 10; // crank this number up for a long term test
+
+ createServers();
+ updateSelf = true;
+
+ List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(1000);
+ for (int i = 0; i < 1000; i++) {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "" + i);
+ docs.add(doc);
+ }
+
+ List<String> ids = new ArrayList<String>(1000);
+ for (int i = 0; i < 1000; i++)
+ ids.add("" + i);
+
+ boolean haveAddedDocs = false;
+ for (int i = 0; i < iter; i++) {
+ // System.out.println("ITERATION"+i);
+ if ((random.nextInt() & 0x01) == 0) {
+ // System.out.println("ITERATION"+i+" - 0");
+ haveAddedDocs = true;
+ UpdateRequest addReq = new UpdateRequest();
+ addReq.add(docs);
+ send(commit(addReq));
+ verifyCount("id:[* TO *]", 1000);
+ } else {
+ // System.out.println("ITERATION"+i+" - 1");
+ UpdateRequest delReq = new UpdateRequest();
+ for (int j = 0; j < 1000; j += 2) {
+ delReq.deleteById(ids.get(j));
+ }
+ send(commit(delReq));
+ verifyCount("id:[* TO *]", (haveAddedDocs ? 500 : 0));
+ }
+
+ // optimize to keep the index size under control.
+ if (i % 25 == 0) {
+ send(optimize(u()));
+ }
+
+ }
+
+ destroyServers();
+ }
+
+ public void testDistribUpdate() throws Exception {
+ for (int nServers = 2; nServers < 4; nServers++) {
+
+ createServers();
+
+ // node doesn't know who it is... sends to itself over HTTP
+ updateSelf = false;
+ doTest();
+
+ // node does know who it is... updates index directly for itself
+ updateSelf = true;
+ doTest();
+
+ destroyServers();
+ }
+ }
+
+ public void doTest() throws Exception {
+ send(0, commit(u().deleteByQuery("*:*")));
+ verifyCount("id:1", 0);
+
+ send(0, add(u(), 1));
+ send(1, add(u(), 1));
+ verifyCount("id:1", 0); // no commit yet
+ send(commit(u()));
+ verifyCount("id:1", 1); // doc should only have been sent to single server
+
+ send(u().deleteById("1"));
+ verifyCount("id:1", 1); // no commit yet
+ send(commit(u()));
+ verifyCount("id:1", 0);
+
+ // test adding a commit onto an add
+ send(commit(add(u(), 1)));
+ verifyCount("id:1", 1);
+
+ // test adding a commmit onto a delete
+ send(commit(u().deleteById("1")));
+ verifyCount("id:1", 0);
+
+ // test that batching adds doesn't mess anything up
+ send(add(u(), 1, 2, 3, 4, 5, 6, 7, 8, 9));
+ send(commit(u()));
+ // Thread.sleep(1000000000);
+ verifyCount("id:[1 TO 9]", 9);
+
+ // test delete by query
+ send(commit(u().deleteByQuery("id:[2 TO 8]")));
+ verifyCount("id:[1 TO 9]", 2);
+
+ send(commit(add(u(), 1, 2, 3, 4, 5, 6, 7, 8, 9)));
+ verifyCount("id:[1 TO 9]", 9);
+
+ send(commit(u().deleteByQuery("*:*")));
+ verifyCount("id:[1 TO 9]", 0);
+
+ // this test can cause failures if a commit can sneak in ahead of
+ // add requests that are still pending.
+ Object[] docs = new Object[1000];
+ for (int i = 0; i < 1000; i++)
+ docs[i] = i;
+ send(commit(add(u(), docs)));
+ verifyCount("id:[* TO *]", 1000);
+
+ // test delete batching
+ UpdateRequest ureq = u();
+ for (int i = 0; i < 1000; i += 2) {
+ ureq.deleteById("" + i);
+ }
+ send(commit(ureq));
+ verifyCount("id:[* TO *]", 500);
+
+ // test commit within
+ ureq = u();
+ ureq.setCommitWithin(1);
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "999999");
+ ureq.add(doc);
+ send(ureq);
+
+
+ verifyCount("id:[* TO *]", 501, 300);
+
+ send(commit(ureq));
+
+ // test overwrite
+ UpdateRequestExt lweureq = new UpdateRequestExt();
+ doc = new SolrInputDocument();
+ doc.addField("id", "999999");
+ lweureq.add(doc, 3, false);
+ send(commit(lweureq));
+
+ verifyCount("id:[* TO *]", 502);
+
+ // test overwrite with no commitWithin
+ lweureq = new UpdateRequestExt();
+ doc = new SolrInputDocument();
+ doc.addField("id", "999999");
+ lweureq.add(doc, -1, false);
+ send(commit(lweureq));
+
+ verifyCount("id:[* TO *]", 503);
+ }
+
+}
Added: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java?rev=1180745&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java (added)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java Sun Oct 9 23:45:59 2011
@@ -0,0 +1,234 @@
+package org.apache.solr.client.solrj.request;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ContentStream;
+import org.apache.solr.common.util.XML;
+
+// TODO: bake this into UpdateRequest
+public class UpdateRequestExt extends AbstractUpdateRequest {
+
+ private List<SolrDoc> documents = null;
+ private List<String> deleteById = null;
+ private List<String> deleteQuery = null;
+
+ private class SolrDoc {
+ @Override
+ public String toString() {
+ return "SolrDoc [document=" + document + ", commitWithin=" + commitWithin
+ + ", overwrite=" + overwrite + "]";
+ }
+ SolrInputDocument document;
+ int commitWithin;
+ boolean overwrite;
+ }
+
+ public UpdateRequestExt() {
+ super(METHOD.POST, "/update");
+ }
+
+ public UpdateRequestExt(String url) {
+ super(METHOD.POST, url);
+ }
+
+ // ---------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------
+
+ /**
+ * clear the pending documents and delete commands
+ */
+ public void clear() {
+ if (documents != null) {
+ documents.clear();
+ }
+ if (deleteById != null) {
+ deleteById.clear();
+ }
+ if (deleteQuery != null) {
+ deleteQuery.clear();
+ }
+ }
+
+ // ---------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------
+
+ public UpdateRequestExt add(final SolrInputDocument doc) {
+ if (documents == null) {
+ documents = new ArrayList<SolrDoc>(2);
+ }
+ SolrDoc solrDoc = new SolrDoc();
+ solrDoc.document = doc;
+ solrDoc.commitWithin = -1;
+ solrDoc.overwrite = true;
+ documents.add(solrDoc);
+
+ return this;
+ }
+
+ public UpdateRequestExt add(final SolrInputDocument doc, int commitWithin,
+ boolean overwrite) {
+ if (documents == null) {
+ documents = new ArrayList<SolrDoc>(2);
+ }
+ SolrDoc solrDoc = new SolrDoc();
+ solrDoc.document = doc;
+ solrDoc.commitWithin = commitWithin;
+ solrDoc.overwrite = overwrite;
+ documents.add(solrDoc);
+
+ return this;
+ }
+
+ public UpdateRequestExt deleteById(String id) {
+ if (deleteById == null) {
+ deleteById = new ArrayList<String>();
+ }
+ deleteById.add(id);
+ return this;
+ }
+
+ public UpdateRequestExt deleteById(List<String> ids) {
+ if (deleteById == null) {
+ deleteById = new ArrayList<String>(ids);
+ } else {
+ deleteById.addAll(ids);
+ }
+ return this;
+ }
+
+ public UpdateRequestExt deleteByQuery(String q) {
+ if (deleteQuery == null) {
+ deleteQuery = new ArrayList<String>();
+ }
+ deleteQuery.add(q);
+ return this;
+ }
+
+ // --------------------------------------------------------------------------
+ // --------------------------------------------------------------------------
+
+ @Override
+ public Collection<ContentStream> getContentStreams() throws IOException {
+ return ClientUtils.toContentStreams(getXML(), ClientUtils.TEXT_XML);
+ }
+
+ public String getXML() throws IOException {
+ StringWriter writer = new StringWriter();
+ writeXML(writer);
+ writer.flush();
+
+ String xml = writer.toString();
+
+ return (xml.length() > 0) ? xml : null;
+ }
+
+ public void writeXML(Writer writer) throws IOException {
+ List<List<SolrDoc>> getDocLists = getDocLists(documents);
+
+ for (List<SolrDoc> docs : getDocLists) {
+
+ if ((docs != null && docs.size() > 0)) {
+ SolrDoc firstDoc = docs.get(0);
+ int commitWithin = firstDoc.commitWithin != -1 ? firstDoc.commitWithin : this.commitWithin;
+ boolean overwrite = firstDoc.overwrite;
+ if (commitWithin > -1 || overwrite != true) {
+ writer.write("<add commitWithin=\"" + commitWithin + "\" " + "overwrite=\"" + overwrite + "\">");
+ } else {
+ writer.write("<add>");
+ }
+ if (documents != null) {
+ for (SolrDoc doc : documents) {
+ if (doc != null) {
+ ClientUtils.writeXML(doc.document, writer);
+ }
+ }
+ }
+
+ writer.write("</add>");
+ }
+ }
+
+ // Add the delete commands
+ boolean deleteI = deleteById != null && deleteById.size() > 0;
+ boolean deleteQ = deleteQuery != null && deleteQuery.size() > 0;
+ if (deleteI || deleteQ) {
+ writer.append("<delete>");
+ if (deleteI) {
+ for (String id : deleteById) {
+ writer.append("<id>");
+ XML.escapeCharData(id, writer);
+ writer.append("</id>");
+ }
+ }
+ if (deleteQ) {
+ for (String q : deleteQuery) {
+ writer.append("<query>");
+ XML.escapeCharData(q, writer);
+ writer.append("</query>");
+ }
+ }
+ writer.append("</delete>");
+ }
+ }
+
+ private List<List<SolrDoc>> getDocLists(List<SolrDoc> documents) {
+ List<List<SolrDoc>> docLists = new ArrayList<List<SolrDoc>>();
+ if (this.documents == null) {
+ return docLists;
+ }
+ boolean lastOverwrite = true;
+ int lastCommitWithin = -1;
+ List<SolrDoc> docList = null;
+ for (SolrDoc doc : this.documents) {
+ if (doc.overwrite != lastOverwrite
+ || doc.commitWithin != lastCommitWithin || docLists.size() == 0) {
+ docList = new ArrayList<SolrDoc>();
+ docLists.add(docList);
+ }
+ docList.add(doc);
+ lastCommitWithin = doc.commitWithin;
+ lastOverwrite = doc.overwrite;
+ }
+
+ return docLists;
+ }
+
+ public List<String> getDeleteById() {
+ return deleteById;
+ }
+
+ public List<String> getDeleteQuery() {
+ return deleteQuery;
+ }
+
+ @Override
+ public String toString() {
+ return "UpdateRequestExt [documents=" + documents + ", deleteById="
+ + deleteById + ", deleteQuery=" + deleteQuery + "]";
+ }
+
+}
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java Sun Oct 9 23:45:59 2011
@@ -258,8 +258,7 @@ public class TestLBHttpSolrServer extend
}
public void startJetty() throws Exception {
- jetty = new JettySolrRunner("/solr", port);
- System.setProperty("solr.solr.home", getHomeDir());
+ jetty = new JettySolrRunner(getHomeDir(), "/solr", port);
System.setProperty("solr.data.dir", getDataDir());
jetty.start();
int newPort = jetty.getLocalPort();
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/MultiCoreExampleJettyTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/MultiCoreExampleJettyTest.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/MultiCoreExampleJettyTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/MultiCoreExampleJettyTest.java Sun Oct 9 23:45:59 2011
@@ -53,7 +53,7 @@ public class MultiCoreExampleJettyTest e
System.clearProperty("solr.directoryFactory");
super.setUp();
- jetty = new JettySolrRunner( context, 0 );
+ jetty = new JettySolrRunner(getSolrHome(), context, 0 );
jetty.start(false);
port = jetty.getLocalPort();