You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/11/19 15:41:33 UTC
svn commit: r1203981 - in /lucene/dev/branches/solrcloud/solr:
core/src/java/org/apache/solr/update/
core/src/java/org/apache/solr/update/processor/
core/src/test/org/apache/solr/cloud/ test-framework/src/java/org/apache/solr/
Author: markrmiller
Date: Sat Nov 19 14:41:33 2011
New Revision: 1203981
URL: http://svn.apache.org/viewvc?rev=1203981&view=rev
Log:
clean up some things around distrib update proc
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1203981&r1=1203980&r2=1203981&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Sat Nov 19 14:41:33 2011
@@ -1,5 +1,22 @@
package org.apache.solr.update;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -21,9 +38,9 @@ import org.apache.solr.client.solrj.impl
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequestExt;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
-import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
@@ -31,7 +48,6 @@ import org.apache.solr.request.SolrQuery
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
-import org.apache.solr.update.processor.UpdateRequestProcessor;
public class SolrCmdDistributor {
// TODO: shut this thing down
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1203981&r1=1203980&r2=1203981&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sat Nov 19 14:41:33 2011
@@ -103,13 +103,8 @@ public class DistributedUpdateProcessor
System.out.println("hash:" + hash);
CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
-
String shardId = getShard(hash); // get the right shard based on the hash...
-
-
- // TODO: first thing we actually have to do here is get a hash so we can send to the right shard...
- // to do that, most of this likely has to move
-
+
// if we are in zk mode...
if (coreDesc.getCoreContainer().getZkController() != null) {
// the leader is...
@@ -143,9 +138,8 @@ public class DistributedUpdateProcessor
.getCoreContainer().getZkController().getNodeName();
String shardZkNodeName = nodeName + "_" + req.getCore().getName();
- System.out.println("params:" + params);
if (params.getBool(SEEN_LEADER, false)) {
- // we got a version, just go local - add no shards param
+ // we got a version, just go local - set no shardStr
// still mark if i am the leader though
if (shardZkNodeName.equals(leader)) {
@@ -164,7 +158,6 @@ public class DistributedUpdateProcessor
System.out.println("mark leader seen");
} else {
// I need to forward onto the leader...
- // first I must hash...
shardStr = leaderUrl;
forwardToLeader = true;
}
@@ -201,7 +194,12 @@ public class DistributedUpdateProcessor
int hash = hash(cmd);
setupRequest(hash);
- versionAdd(cmd, hash);
+
+ if (!forwardToLeader) {
+ versionAdd(cmd, hash);
+ }
+
+
if (shardStr != null) {
cmdDistrib.distribAdd(cmd, shardStr);
} else {
@@ -235,11 +233,7 @@ public class DistributedUpdateProcessor
System.out.println("leader? " + isLeader);
- if (forwardToLeader) {
- // TODO: forward update to the leader
- System.out.println("forward to leader");
- return;
- }
+
// at this point, there is an update we need to try and apply.
// we may or may not be the leader.
@@ -303,14 +297,6 @@ public class DistributedUpdateProcessor
}
- // TODO: this is brittle
- private DeleteUpdateCommand clone(DeleteUpdateCommand cmd) {
- DeleteUpdateCommand c = new DeleteUpdateCommand(req);
- c.id = cmd.id;
- c.query = cmd.query;
- return c;
- }
-
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
int hash = 0;
@@ -322,8 +308,10 @@ public class DistributedUpdateProcessor
setupRequest(hash);
- versionDelete(cmd, hash);
-
+ if (!forwardToLeader) {
+ versionDelete(cmd, hash);
+ }
+
if (shardStr != null) {
cmdDistrib.distribDelete(cmd, shardStr);
} else {
@@ -357,11 +345,6 @@ public class DistributedUpdateProcessor
return;
}
-
- if (forwardToLeader) {
- return;
- }
-
// at this point, there is an update we need to try and apply.
// we may or may not be the leader.
@@ -369,7 +352,6 @@ public class DistributedUpdateProcessor
String versionOnUpdateS = req.getParams().get(VERSION_FIELD);
Long versionOnUpdate = versionOnUpdateS == null ? null : Long.parseLong(versionOnUpdateS);
-
VersionBucket bucket = vinfo.bucket(hash);
synchronized (bucket) {
if (versionsStored) {
@@ -442,17 +424,17 @@ public class DistributedUpdateProcessor
replicasUrl.append(replicaUrl);
}
- // we don't currently use self - it does not yet work with the | notation anyhow
- //params.add("self", self);
return replicasUrl.toString();
}
- // TODO: move this to AddUpdateCommand/DeleteUpdateCommand and cache it? And make the hash pluggable of course.
+ // TODO: move this to AddUpdateCommand/DeleteUpdateCommand and cache it? And
+ // make the hash pluggable of course.
// The hash also needs to be pluggable
private int hash(AddUpdateCommand cmd) {
BytesRef br = cmd.getIndexedId();
return Hash.murmurhash3_x86_32(br.bytes, br.offset, br.length, 0);
}
+
private int hash(DeleteUpdateCommand cmd) {
BytesRef br = cmd.getIndexedId();
return Hash.murmurhash3_x86_32(br.bytes, br.offset, br.length, 0);
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java?rev=1203981&r1=1203980&r2=1203981&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java Sat Nov 19 14:41:33 2011
@@ -240,12 +240,14 @@ public class FullDistributedZkTest exten
for (int i = 0; i < fields.length; i += 2) {
doc.addField((String) (fields[i]), fields[i + 1]);
}
- controlClient.add(doc);
UpdateRequest ureq = new UpdateRequest();
ureq.add(doc);
ureq.setParam("update.chain", "distrib-update-chain");
ureq.process(client);
+
+ // add to control second in case adding to shards fails
+ controlClient.add(doc);
}
protected void del(String q) throws Exception {
@@ -393,7 +395,7 @@ public class FullDistributedZkTest exten
query("q","*:*", "rows",0, "facet","true", "facet.field","{!key='a b/c \\' \\} foo'}"+t1,"facet.limit",5, "facet.shard.limit",5);
handle.remove("facet_fields");
- //query("q", "*:*");
+ query("q", "*:*", "sort", "n_tl1 desc");
// index the same document to two shards and make sure things
// don't blow up.
@@ -414,7 +416,7 @@ public class FullDistributedZkTest exten
// our hash is not stable yet in distrib update proc
assertDocCounts();
- //query("q", "*:*");
+ query("q", "*:*", "sort", "n_tl1 desc");
// kill a shard
JettySolrRunner deadShard = killShard("shard2", 0);
@@ -430,12 +432,12 @@ public class FullDistributedZkTest exten
}
// try to index to a living shard at shard2
- index_specific(shardToClient.get("shard2").get(1), id, 1000, i1, 107, t1,
+ index_specific(shardToClient.get("shard2").get(1), id, 1000, i1, 108, t1,
"specific doc!");
commit();
- //query("q", "*:*");
+ query("q", "*:*", "sort", "n_tl1 desc");
// TMP: try adding a doc with CloudSolrServer
CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress());
@@ -456,7 +458,7 @@ public class FullDistributedZkTest exten
commit();
- //query("q", "*:*");
+ query("q", "*:*", "sort", "n_tl1 desc");
long numFound2 = server.query(query).getResults().getNumFound();
@@ -511,10 +513,9 @@ public class FullDistributedZkTest exten
//query("q", "id:[1 TO 5]", CommonParams.DEBUG, CommonParams.QUERY);
// we can't do this here - we have killed a shard
- // assertDocCounts();
+ //assertDocCounts();
- // TODO: why is this failing with no servers hosting shard?
- //query("q", "*:*");
+ query("q", "*:*", "sort", "n_tl1 desc");
// Thread.sleep(10000000000L);
if (DEBUG) {
@@ -601,4 +602,16 @@ public class FullDistributedZkTest exten
System.clearProperty("zkHost");
System.clearProperty("remove.version.field");
}
+
+ protected void commit() throws Exception {
+ controlClient.commit();
+ for (SolrServer client : clients) {
+ try {
+ client.commit();
+ } catch (SolrServerException e) {
+ // we might have killed a server on purpose in the test
+ log.warn("", e);
+ }
+ }
+ }
}
Modified: lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java?rev=1203981&r1=1203980&r2=1203981&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java (original)
+++ lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java Sat Nov 19 14:41:33 2011
@@ -311,12 +311,7 @@ public abstract class BaseDistributedSea
protected void commit() throws Exception {
controlClient.commit();
for (SolrServer client : clients) {
- try {
- client.commit();
- } catch (SolrServerException e) {
- // we might have killed a server on purpose in the test
- log.warn("", e);
- }
+ client.commit();
}
}