You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/11/14 14:50:57 UTC
svn commit: r1201704 - in /lucene/dev/branches/solrcloud/solr:
core/src/java/org/apache/solr/cloud/
core/src/java/org/apache/solr/update/processor/
core/src/test/org/apache/solr/cloud/
solrj/src/java/org/apache/solr/client/solrj/impl/ test-framework/sr...
Author: markrmiller
Date: Mon Nov 14 13:50:57 2011
New Revision: 1201704
URL: http://svn.apache.org/viewvc?rev=1201704&view=rev
Log:
push on FullDistributedZkTest and tweaks/fixes around that
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1201704&r1=1201703&r2=1201704&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Mon Nov 14 13:50:57 2011
@@ -138,8 +138,6 @@ public final class ZkController {
public void command() {
try {
// we need to create all of our lost watches
-// zkStateReader.makeCollectionsNodeWatches();
-// zkStateReader.makeShardsWatches(true);
createEphemeralLiveNode();
zkStateReader.createClusterStateWatchersAndUpdate();
@@ -486,7 +484,12 @@ public final class ZkController {
log.info("Attempting to update /clusterstate version "
+ stat.getVersion());
CloudState state = CloudState.load(data);
-
+ Map<String,Slice> slices = state.getSlices(cloudDesc.getCollectionName());
+ if (slices != null && slices.containsKey(shardZkNodeName)) {
+ // TODO: we where already registered - go into recovery mode
+ System.out.println("RECOVERY");
+ }
+
state.addSlice(cloudDesc.getCollectionName(), slice);
try {
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1201704&r1=1201703&r2=1201704&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Mon Nov 14 13:50:57 2011
@@ -681,7 +681,6 @@ public class DistributedUpdateProcessor
url = shard;
}
- // TODO: allow shard syntax to use : to specify replicas
SolrServer server = new CommonsHttpSolrServer(url, client);
sreq.ursp = server.request(sreq.ureq);
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java?rev=1201704&r1=1201703&r2=1201704&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java Mon Nov 14 13:50:57 2011
@@ -176,7 +176,8 @@ public class DistributedUpdateProcessorF
replicasUrl.append(replicaUrl);
}
- params.add("self", self);
+ // we don't currently use self - it does not yet work with the | notation anyhow
+ //params.add("self", self);
params.add("shards", replicasUrl.toString());
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java?rev=1201704&r1=1201703&r2=1201704&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java Mon Nov 14 13:50:57 2011
@@ -19,7 +19,13 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
@@ -28,7 +34,9 @@ import org.apache.solr.client.solrj.impl
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.junit.BeforeClass;
@@ -58,6 +66,11 @@ public class FullDistributedZkTest exten
String invalidField="ignore_exception__invalid_field_not_in_schema";
private static final int sliceCount = 3;
+
+ protected Map<SolrServer,ZkNodeProps> clientToInfo = new HashMap<SolrServer,ZkNodeProps>();
+ protected Map<String,List<SolrServer>> shardToClient = new HashMap<String,List<SolrServer>>();
+ protected Map<String,List<JettySolrRunner>> shardToJetty = new HashMap<String,List<JettySolrRunner>>();
+
@BeforeClass
public static void beforeClass() throws Exception {
System.setProperty("CLOUD_UPDATE_DELAY", "0");
@@ -69,6 +82,10 @@ public class FullDistributedZkTest exten
public FullDistributedZkTest() {
fixShardCount = true;
shardCount = 6;
+
+ // TODO: for now, turn off stress because it uses regular clients, and we
+ // need the cloud client because we kill servers
+ stress = 0;
}
@Override
@@ -83,8 +100,73 @@ public class FullDistributedZkTest exten
if (sb.length() > 0) sb.append(',');
JettySolrRunner j = createJetty(testDir, testDir + "/jetty" + i, null, "solrconfig-distrib-update.xml");
jettys.add(j);
- clients.add(createNewSolrServer(j.getLocalPort()));
-
+ SolrServer client = createNewSolrServer(j.getLocalPort());
+ clients.add(client);
+ }
+
+ for (SolrServer client : clients) {
+ // find info for this client in zk
+ ZkStateReader zk = new ZkStateReader(zkServer.getZkAddress(), 10000,
+ AbstractZkTestCase.TIMEOUT);
+ zk.createClusterStateWatchersAndUpdate();
+
+ Map<String,Slice> slices = zk.getCloudState().getSlices(
+ DEFAULT_COLLECTION);
+ zk.updateCloudState(true);
+
+ for (Map.Entry<String,Slice> slice : slices.entrySet()) {
+ Map<String,ZkNodeProps> theShards = slice.getValue().getShards();
+ for (Map.Entry<String,ZkNodeProps> shard : theShards.entrySet()) {
+ String shardName = new URI(
+ ((CommonsHttpSolrServer) client).getBaseURL()).getPort()
+ + "_solr_";
+ // System.out.println("key:" + shard.getKey() + " try:" + shardName);
+ if (shard.getKey().endsWith(shardName)) {
+ System.out.println("shard:" + slice.getKey());
+ System.out.println(shard.getValue());
+
+ clientToInfo.put(client, shard.getValue());
+ List<SolrServer> list = shardToClient.get(slice.getKey());
+ if (list == null) {
+ list = new ArrayList<SolrServer>();
+ shardToClient.put(slice.getKey(), list);
+ }
+ list.add(client);
+ }
+ }
+ }
+
+ }
+
+ for (JettySolrRunner jetty : jettys) {
+ // find info for this client in zk
+ ZkStateReader zk = new ZkStateReader(zkServer.getZkAddress(), 10000,
+ AbstractZkTestCase.TIMEOUT);
+ zk.createClusterStateWatchersAndUpdate();
+
+ Map<String,Slice> slices = zk.getCloudState().getSlices(
+ DEFAULT_COLLECTION);
+ zk.updateCloudState(true);
+
+ for (Map.Entry<String,Slice> slice : slices.entrySet()) {
+ Map<String,ZkNodeProps> theShards = slice.getValue().getShards();
+ for (Map.Entry<String,ZkNodeProps> shard : theShards.entrySet()) {
+ String shardName = jetty.getLocalPort() + "_solr_";
+ // System.out.println("key:" + shard.getKey() + " try:" + shardName);
+ if (shard.getKey().endsWith(shardName)) {
+// System.out.println("shard:" + slice.getKey());
+// System.out.println(shard.getValue());
+
+ List<JettySolrRunner> list = shardToJetty.get(slice.getKey());
+ if (list == null) {
+ list = new ArrayList<JettySolrRunner>();
+ shardToJetty.put(slice.getKey(), list);
+ }
+ list.add(jetty);
+ }
+ }
+ }
+
}
// build the shard string
@@ -124,6 +206,7 @@ public class FullDistributedZkTest exten
boolean pick = random.nextBoolean();
int which = (doc.getField(id).toString().hashCode() & 0x7fffffff) % sliceCount;
+ System.out.println("add doc to shard:" + which);
if (pick) {
which = which + ((shardCount / sliceCount) * random.nextInt(sliceCount-1));
@@ -138,6 +221,22 @@ public class FullDistributedZkTest exten
ureq.process(client);
}
+ protected void index_specific(int serverNumber, Object... fields) throws Exception {
+ SolrInputDocument doc = new SolrInputDocument();
+ for (int i = 0; i < fields.length; i += 2) {
+ doc.addField((String) (fields[i]), fields[i + 1]);
+ }
+ controlClient.add(doc);
+
+ CommonsHttpSolrServer client = (CommonsHttpSolrServer) clients.get(serverNumber);
+
+ UpdateRequest ureq = new UpdateRequest();
+ ureq.add(doc);
+ ureq.setParam("update.chain", "distrib-update-chain");
+ System.out.println("set update.chain on req");
+ ureq.process(client);
+ }
+
protected void del(String q) throws Exception {
controlClient.deleteByQuery(q);
for (SolrServer client : clients) {
@@ -156,8 +255,14 @@ public class FullDistributedZkTest exten
public void doTest() throws Exception {
del("*:*");
+
indexr(id,1, i1, 100, tlong, 100,t1,"now is the time for all good men"
,"foo_f", 1.414f, "foo_b", "true", "foo_d", 1.414d);
+
+ commit();
+
+ assertDocCounts();
+
indexr(id,2, i1, 50 , tlong, 50,t1,"to come to the aid of their country."
);
indexr(id,3, i1, 2, tlong, 2,t1,"how now brown cow"
@@ -189,7 +294,9 @@ public class FullDistributedZkTest exten
}
commit();
-
+
+ assertDocCounts();
+
handle.clear();
handle.put("QTime", SKIPVAL);
handle.put("timestamp", SKIPVAL);
@@ -276,11 +383,12 @@ public class FullDistributedZkTest exten
handle.remove("facet_fields");
- // index the same document to two servers and make sure things
+ // index the same document to two shards and make sure things
// don't blow up.
+ // assumes first n clients are first n shards
if (clients.size()>=2) {
index(id,100, i1, 107 ,t1,"oh no, a duplicate!");
- for (int i=0; i<clients.size(); i++) {
+ for (int i=0; i<shardCount; i++) {
index_specific(i, id,100, i1, 107 ,t1,"oh no, a duplicate!");
}
commit();
@@ -288,7 +396,47 @@ public class FullDistributedZkTest exten
query("q","fox duplicate horses", "hl","true", "hl.fl", t1);
query("q","*:*", "rows",100);
}
+
+ // TODO: this is failing because the counts per shard don't add up to the control - distrib total
+ // counts do match, so the same doc (same id) must be on different shards.
+ // our hash is not stable yet in distrib update proc
+ //assertDocCounts();
+
+ // kill a shard
+ JettySolrRunner deadShard = killShard("shard2", 0);
+ JettySolrRunner deadShard2 = killShard("shard3", 1);
+
+ // TODO: test indexing after killing shards - smart solrj client should not
+ // care at all
+
+ // try to index to a living shard at shard2
+ index_specific(3, id,1000, i1, 107 ,t1,"specific doc!");
+
+ commit();
+
+ // TMP: try adding a doc with CloudSolrServer
+ CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress());
+ server.setDefaultCollection(DEFAULT_COLLECTION);
+ long numFound1 = server.query(new SolrQuery("*:*")).getResults().getNumFound();
+
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", 1001);
+
+ controlClient.add(doc);
+ UpdateRequest ureq = new UpdateRequest();
+ ureq.add(doc);
+ ureq.setParam("update.chain", "distrib-update-chain");
+ ureq.process(server);
+
+ commit();
+
+ long numFound2 = server.query(new SolrQuery("*:*")).getResults().getNumFound();
+
+ // lets just check that the one doc since last commit made it in...
+ //TODO this sometimes fails - need to dig up what missed/messed up part causes it
+ assertEquals(numFound1 + 1, numFound2);
+
// test debugging
handle.put("explain", UNORDERED);
handle.put("debug", UNORDERED);
@@ -299,24 +447,82 @@ public class FullDistributedZkTest exten
query("q", "id:[1 TO 5]", CommonParams.DEBUG, CommonParams.RESULTS);
query("q", "id:[1 TO 5]", CommonParams.DEBUG, CommonParams.QUERY);
+
+ System.out.println(controlClient.query(new SolrQuery("*:*")).getResults().getNumFound());
+ for (SolrServer client : clients) {
+ try {
+ System.out.println(client.query(new SolrQuery("*:*")).getResults().getNumFound());
+ } catch(Exception e) {
+
+ }
+ }
// TODO: This test currently fails because debug info is obtained only
// on shards with matches.
// query("q","matchesnothing","fl","*,score", "debugQuery", "true");
+ // this should trigger a recovery phase on deadShard
+ deadShard.start(true);
+
+
+
// Thread.sleep(10000000000L);
if (DEBUG) {
super.printLayout();
}
}
+ private JettySolrRunner killShard(String shard, int index) throws Exception {
+ // kill
+ System.out.println(" KILL:" + shardToClient);
+ System.out.println(shardToJetty.get(shard));
+
+ // kill first shard in shard2
+ JettySolrRunner jetty = shardToJetty.get(shard).get(index);
+ jetty.stop();
+ return jetty;
+ }
+
+ private void assertDocCounts() throws Exception {
+ // TODO: as we create the clients, we should build a map from shard to node/client
+ // and node/client to shard?
+ System.out.println("after first doc:");
+ long controlCount = controlClient.query(new SolrQuery("*:*")).getResults().getNumFound();
+ System.out.println("control:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound());
+ // do some really inefficient mapping...
+ ZkStateReader zk = new ZkStateReader(zkServer.getZkAddress(), 10000, AbstractZkTestCase.TIMEOUT);
+ zk.createClusterStateWatchersAndUpdate();
+ // Map<SolrServer,ZkNodeProps> clientToInfo = new HashMap<SolrServer,ZkNodeProps>();
+ Map<String,Slice> slices = zk.getCloudState().getSlices(DEFAULT_COLLECTION);
+
+ zk.updateCloudState(true);
+
+ long clientCount = 0;
+ for (SolrServer client : clients) {
+ for (Map.Entry<String,Slice> slice : slices.entrySet()) {
+ Map<String,ZkNodeProps> theShards = slice.getValue().getShards();
+ for (Map.Entry<String,ZkNodeProps> shard : theShards.entrySet()) {
+ String shardName = new URI(((CommonsHttpSolrServer)client).getBaseURL()).getPort() + "_solr_";
+ // System.out.println("key:" + shard.getKey() + " try:" + shardName);
+ if (shard.getKey().endsWith(shardName)) {
+ System.out.println("shard:" + slice.getKey());
+ System.out.println(shard.getValue());
+ }
+ }
+ }
+
+ long count = client.query(new SolrQuery("*:*")).getResults().getNumFound();
+
+ System.out.println("docs:" + count + "\n\n");
+ clientCount += count;
+ }
+ assertEquals("Doc Counts do not add up", controlCount, clientCount / (shardCount / sliceCount));
+ }
+
volatile CloudSolrServer solrj;
@Override
protected QueryResponse queryServer(ModifiableSolrParams params) throws SolrServerException {
- if (r.nextBoolean())
- return super.queryServer(params);
-
// use the distributed solrj client
if (solrj == null) {
synchronized(this) {
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1201704&r1=1201703&r2=1201704&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Mon Nov 14 13:50:57 2011
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeoutExcep
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.Slice;
@@ -118,6 +119,11 @@ public class CloudSolrServer extends Sol
CloudState cloudState = zkStateReader.getCloudState();
String collection = request.getParams().get("collection", defaultCollection);
+
+ if (request instanceof UpdateRequest) {
+ // hack to kind of let updates work - should be fixed more completely
+ return updateRequest(cloudState, collection, request);
+ }
// TODO: allow multiple collections to be specified via comma separated list
@@ -151,6 +157,39 @@ public class CloudSolrServer extends Sol
return rsp.getResponse();
}
+ private NamedList<Object> updateRequest(CloudState cloudState,
+ String collection, SolrRequest request) throws SolrServerException, IOException {
+ // TODO: prefer updating to the leader
+
+ Map<String,Slice> slices = cloudState.getSlices(collection);
+ Set<String> liveNodes = cloudState.getLiveNodes();
+
+ // IDEA: have versions on various things... like a global cloudState version
+ // or shardAddressVersion (which only changes when the shards change)
+ // to allow caching.
+
+ // build a map of unique nodes
+ // TODO: allow filtering by group, role, etc
+ Map<String,ZkNodeProps> nodes = new HashMap<String,ZkNodeProps>();
+ List<String> urlList = new ArrayList<String>();
+ for (Slice slice : slices.values()) {
+ for (ZkNodeProps nodeProps : slice.getShards().values()) {
+ String node = nodeProps.get(ZkStateReader.NODE_NAME);
+ if (!liveNodes.contains(node)) continue;
+ if (nodes.put(node, nodeProps) == null) {
+ String url = nodeProps.get(ZkStateReader.URL_PROP);
+ urlList.add(url);
+ }
+ }
+ }
+
+
+ // lets update to a server that is up...
+ CommonsHttpSolrServer server = new CommonsHttpSolrServer(urlList.get(0));
+ NamedList<Object> rsp = server.request(request);
+ return rsp;
+ }
+
public void close() {
if (zkStateReader != null) {
synchronized(this) {
Modified: lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java?rev=1201704&r1=1201703&r2=1201704&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java (original)
+++ lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java Mon Nov 14 13:50:57 2011
@@ -40,6 +40,7 @@ import org.apache.solr.client.solrj.resp
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.schema.TrieDateField;
@@ -69,6 +70,7 @@ public abstract class BaseDistributedSea
protected JettySolrRunner controlJetty;
protected List<SolrServer> clients = new ArrayList<SolrServer>();
protected List<JettySolrRunner> jettys = new ArrayList<JettySolrRunner>();
+
protected String context = "/solr";
protected String shards;
protected String[] shardsArr;
@@ -310,7 +312,14 @@ public abstract class BaseDistributedSea
protected void commit() throws Exception {
controlClient.commit();
- for (SolrServer client : clients) client.commit();
+ for (SolrServer client : clients) {
+ try {
+ client.commit();
+ } catch (SolrServerException e) {
+ // we might have killed a server on purpose in the test
+ log.warn("", e);
+ }
+ }
}
protected QueryResponse queryServer(ModifiableSolrParams params) throws SolrServerException {
@@ -322,12 +331,13 @@ public abstract class BaseDistributedSea
}
protected void query(Object... q) throws Exception {
+
final ModifiableSolrParams params = new ModifiableSolrParams();
for (int i = 0; i < q.length; i += 2) {
params.add(q[i].toString(), q[i + 1].toString());
}
-
+ System.out.println("Q:" + params);
final QueryResponse controlRsp = controlClient.query(params);
setDistributedParams(params);
@@ -377,6 +387,8 @@ public abstract class BaseDistributedSea
}
public static String compare(NamedList a, NamedList b, int flags, Map<String, Integer> handle) {
+ System.out.println("resp a:" + a);
+ System.out.println("resp b:" + b);
boolean ordered = (flags & UNORDERED) == 0;
int posa = 0, posb = 0;