You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/02/14 22:58:17 UTC
svn commit: r1244231 - in /lucene/dev/trunk/solr:
core/src/java/org/apache/solr/cloud/ core/src/test/org/apache/solr/cloud/
solrj/src/java/org/apache/solr/common/cloud/
Author: markrmiller
Date: Tue Feb 14 21:58:16 2012
New Revision: 1244231
URL: http://svn.apache.org/viewvc?rev=1244231&view=rev
Log:
SOLR-3122: allow interrupting zkcmdoperations and some test work
Added:
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SafeStopThread.java (with props)
Modified:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1244231&r1=1244230&r2=1244231&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Tue Feb 14 21:58:16 2012
@@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.impl
import org.apache.solr.client.solrj.request.CoreAdminRequest.PrepRecovery;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.SafeStopThread;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -46,7 +47,7 @@ import org.apache.solr.update.UpdateLog.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RecoveryStrategy extends Thread {
+public class RecoveryStrategy extends Thread implements SafeStopThread {
private static final int MAX_RETRIES = 500;
private static final int INTERRUPTED = MAX_RETRIES + 1;
private static final int START_TIMEOUT = 100;
@@ -191,8 +192,14 @@ public class RecoveryStrategy extends Th
}
log.info("Sync Recovery was not successful - trying replication");
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- if (ulog == null) return;
+ if (ulog == null) {
+ SolrException.log(log, "No UpdateLog found - cannot recover");
+ recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
+ core.getCoreDescriptor());
+ return;
+ }
+ log.info("Begin buffering updates");
ulog.bufferUpdates();
replayed = false;
@@ -296,4 +303,8 @@ public class RecoveryStrategy extends Th
return future;
}
+ public boolean isClosed() {
+ return close;
+ }
+
}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java?rev=1244231&r1=1244230&r2=1244231&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java Tue Feb 14 21:58:16 2012
@@ -18,15 +18,23 @@ package org.apache.solr.cloud;
*/
import java.io.File;
+import java.util.Map;
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.core.SolrConfig;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearchTestCase {
+
+ protected static final String DEFAULT_COLLECTION = "collection1";
private static final boolean DEBUG = false;
protected ZkTestServer zkServer;
@@ -44,6 +52,10 @@ public abstract class AbstractDistribute
zkServer.run();
System.setProperty("zkHost", zkServer.getZkAddress());
+ System.setProperty("enable.update.log", "true");
+ System.setProperty("remove.version.field", "true");
+ System
+ .setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
AbstractZkTestCase.buildZooKeeper(zkServer.getZkHost(), zkServer.getZkAddress(), "solrconfig.xml", "schema.xml");
@@ -70,8 +82,83 @@ public abstract class AbstractDistribute
shards = sb.toString();
}
+
+ protected void waitForRecoveriesToFinish(String collection, ZkStateReader zkStateReader, boolean verbose)
+ throws KeeperException, InterruptedException {
+ waitForRecoveriesToFinish(collection, zkStateReader, verbose, false);
+ }
+
+ protected void waitForRecoveriesToFinish(String collection,
+ ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout)
+ throws KeeperException, InterruptedException {
+ boolean cont = true;
+ int cnt = 0;
+
+ while (cont) {
+ if (verbose) System.out.println("-");
+ boolean sawLiveRecovering = false;
+ zkStateReader.updateCloudState(true);
+ CloudState cloudState = zkStateReader.getCloudState();
+ Map<String,Slice> slices = cloudState.getSlices(collection);
+ for (Map.Entry<String,Slice> entry : slices.entrySet()) {
+ Map<String,ZkNodeProps> shards = entry.getValue().getShards();
+ for (Map.Entry<String,ZkNodeProps> shard : shards.entrySet()) {
+ if (verbose) System.out.println("rstate:"
+ + shard.getValue().get(ZkStateReader.STATE_PROP)
+ + " live:"
+ + cloudState.liveNodesContain(shard.getValue().get(
+ ZkStateReader.NODE_NAME_PROP)));
+ String state = shard.getValue().get(ZkStateReader.STATE_PROP);
+ if ((state.equals(ZkStateReader.RECOVERING) || state
+ .equals(ZkStateReader.SYNC))
+ && cloudState.liveNodesContain(shard.getValue().get(
+ ZkStateReader.NODE_NAME_PROP))) {
+ sawLiveRecovering = true;
+ }
+ }
+ }
+ if (!sawLiveRecovering || cnt == 15) {
+ if (!sawLiveRecovering) {
+ if (verbose) System.out.println("no one is recoverying");
+ } else {
+ if (failOnTimeout) {
+ fail("There are still nodes recoverying");
+ return;
+ }
+ if (verbose) System.out
+ .println("gave up waiting for recovery to finish..");
+ }
+ cont = false;
+ } else {
+ Thread.sleep(2000);
+ }
+ cnt++;
+ }
+ }
+
+ protected void assertAllActive(String collection,ZkStateReader zkStateReader)
+ throws KeeperException, InterruptedException {
+ zkStateReader.updateCloudState(true);
+ CloudState cloudState = zkStateReader.getCloudState();
+ Map<String,Slice> slices = cloudState.getSlices(collection);
+ if (slices == null) {
+ throw new IllegalArgumentException("Cannot find collection:" + collection);
+ }
+ for (Map.Entry<String,Slice> entry : slices.entrySet()) {
+ Map<String,ZkNodeProps> shards = entry.getValue().getShards();
+ for (Map.Entry<String,ZkNodeProps> shard : shards.entrySet()) {
+
+ String state = shard.getValue().get(ZkStateReader.STATE_PROP);
+ if (!state.equals(ZkStateReader.ACTIVE)) {
+ fail("Not all shards are ACTIVE");
+ }
+ }
+ }
+ }
+
@Override
+ @After
public void tearDown() throws Exception {
if (DEBUG) {
printLayout();
@@ -79,6 +166,9 @@ public abstract class AbstractDistribute
zkServer.shutdown();
System.clearProperty("zkHost");
System.clearProperty("collection");
+ System.clearProperty("enable.update.log");
+ System.clearProperty("remove.version.field");
+ System.clearProperty("solr.directoryFactory");
System.clearProperty("solr.test.sys.prop1");
System.clearProperty("solr.test.sys.prop2");
resetExceptionIgnores();
@@ -93,7 +183,5 @@ public abstract class AbstractDistribute
@AfterClass
public static void afterClass() throws InterruptedException {
- // wait just a bit for any zk client threads to outlast timeout
- Thread.sleep(2000);
}
}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java?rev=1244231&r1=1244230&r2=1244231&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java Tue Feb 14 21:58:16 2012
@@ -62,7 +62,8 @@ public class BasicDistributedZkTest exte
private Map<String,List<SolrServer>> otherCollectionClients = new HashMap<String,List<SolrServer>>();
private Map<String,List<SolrServer>> oneInstanceCollectionClients = new HashMap<String,List<SolrServer>>();
- private String oneInstanceCollection = "oneInstanceCollection";;
+ private String oneInstanceCollection = "oneInstanceCollection";
+ private String oneInstanceCollection2 = "oneInstanceCollection2";
public BasicDistributedZkTest() {
fixShardCount = true;
@@ -247,12 +248,63 @@ public class BasicDistributedZkTest exte
testMultipleCollections();
testANewCollectionInOneInstance();
testSearchByCollectionName();
+ testANewCollectionInOneInstanceWithManualShardAssignement();
// Thread.sleep(10000000000L);
if (DEBUG) {
super.printLayout();
}
}
+ private void testANewCollectionInOneInstanceWithManualShardAssignement() throws Exception {
+ List<SolrServer> collectionClients = new ArrayList<SolrServer>();
+ SolrServer client = clients.get(0);
+ oneInstanceCollectionClients.put(oneInstanceCollection , collectionClients);
+ String baseUrl = ((CommonsHttpSolrServer) client).getBaseURL();
+ createCollection(oneInstanceCollection2, collectionClients, baseUrl, 1, "slice1");
+ createCollection(oneInstanceCollection2, collectionClients, baseUrl, 2, "slice2");
+ createCollection(oneInstanceCollection2, collectionClients, baseUrl, 3, "slice2");
+ createCollection(oneInstanceCollection2, collectionClients, baseUrl, 4, "slice1");
+
+ SolrServer client1 = createNewSolrServer(oneInstanceCollection2 + "1", baseUrl);
+ SolrServer client2 = createNewSolrServer(oneInstanceCollection2 + "2", baseUrl);
+ SolrServer client3 = createNewSolrServer(oneInstanceCollection2 + "3", baseUrl);
+ SolrServer client4 = createNewSolrServer(oneInstanceCollection2 + "4", baseUrl);
+
+ client2.add(getDoc(id, "1"));
+ client3.add(getDoc(id, "2"));
+ client4.add(getDoc(id, "3"));
+
+ // no one should be recovering
+ waitForRecoveriesToFinish(oneInstanceCollection2, solrj.getZkStateReader(), false, true);
+
+ assertAllActive(oneInstanceCollection2, solrj.getZkStateReader());
+
+ client1.commit();
+ SolrQuery query = new SolrQuery("*:*");
+ query.set("distrib", false);
+ long oneDocs = client1.query(query).getResults().getNumFound();
+ long twoDocs = client2.query(query).getResults().getNumFound();
+ long threeDocs = client3.query(query).getResults().getNumFound();
+ long fourDocs = client4.query(query).getResults().getNumFound();
+
+ query.set("collection", oneInstanceCollection2);
+ query.set("distrib", true);
+ long allDocs = solrj.query(query).getResults().getNumFound();
+
+// System.out.println("1:" + oneDocs);
+// System.out.println("2:" + twoDocs);
+// System.out.println("3:" + threeDocs);
+// System.out.println("4:" + fourDocs);
+// System.out.println("All Docs:" + allDocs);
+
+ assertEquals(oneDocs, threeDocs);
+ assertEquals(twoDocs, fourDocs);
+ assertNotSame(oneDocs, twoDocs);
+ assertEquals(3, allDocs);
+
+
+ }
+
private void testSearchByCollectionName() throws SolrServerException {
SolrServer client = clients.get(0);
String baseUrl = ((CommonsHttpSolrServer) client).getBaseURL();
@@ -280,6 +332,9 @@ public class BasicDistributedZkTest exte
SolrServer client3 = createNewSolrServer(oneInstanceCollection + "3", baseUrl);
SolrServer client4 = createNewSolrServer(oneInstanceCollection + "4", baseUrl);
+ waitForRecoveriesToFinish(oneInstanceCollection, solrj.getZkStateReader(), false);
+ assertAllActive(oneInstanceCollection, solrj.getZkStateReader());
+
client2.add(getDoc(id, "1"));
client3.add(getDoc(id, "2"));
client4.add(getDoc(id, "3"));
@@ -311,6 +366,12 @@ public class BasicDistributedZkTest exte
private void createCollection(String collection,
List<SolrServer> collectionClients, String baseUrl, int num)
throws MalformedURLException, SolrServerException, IOException {
+ createCollection(collection, collectionClients, baseUrl, num, null);
+ }
+
+ private void createCollection(String collection,
+ List<SolrServer> collectionClients, String baseUrl, int num, String shardId)
+ throws MalformedURLException, SolrServerException, IOException {
CommonsHttpSolrServer server = new CommonsHttpSolrServer(
baseUrl);
Create createCmd = new Create();
@@ -319,6 +380,7 @@ public class BasicDistributedZkTest exte
createCmd.setNumShards(2);
createCmd.setDataDir(dataDir.getAbsolutePath() + File.separator
+ collection + num);
+ createCmd.setShardId(shardId);
server.request(createCmd);
collectionClients.add(createNewSolrServer(collection, baseUrl));
}
@@ -389,12 +451,13 @@ public class BasicDistributedZkTest exte
throws MalformedURLException, SolrServerException, IOException {
List<SolrServer> collectionClients = new ArrayList<SolrServer>();
otherCollectionClients.put(collection, collectionClients);
+ int unique = 0;
for (SolrServer client : clients) {
CommonsHttpSolrServer server = new CommonsHttpSolrServer(
((CommonsHttpSolrServer) client).getBaseURL());
Create createCmd = new Create();
createCmd.setCoreName(collection);
- createCmd.setDataDir(dataDir.getAbsolutePath() + File.separator + collection);
+ createCmd.setDataDir(dataDir.getAbsolutePath() + File.separator + collection + unique++);
server.request(createCmd);
collectionClients.add(createNewSolrServer(collection,
((CommonsHttpSolrServer) client).getBaseURL()));
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java?rev=1244231&r1=1244230&r2=1244231&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java Tue Feb 14 21:58:16 2012
@@ -65,8 +65,6 @@ public class FullSolrCloudTest extends A
private static final String SHARD2 = "shard2";
- protected static final String DEFAULT_COLLECTION = "collection1";
-
private boolean printLayoutOnTearDown = false;
String t1 = "a_t";
@@ -151,16 +149,12 @@ public class FullSolrCloudTest extends A
System
.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
System.setProperty("solrcloud.update.delay", "0");
- System.setProperty("enable.update.log", "true");
- System.setProperty("remove.version.field", "true");
}
@AfterClass
public static void afterClass() {
System.clearProperty("solr.directoryFactory");
System.clearProperty("solrcloud.update.delay");
- System.clearProperty("enable.update.log");
- System.clearProperty("remove.version.field");
}
public FullSolrCloudTest() {
@@ -655,45 +649,7 @@ public class FullSolrCloudTest extends A
protected void waitForRecoveriesToFinish(boolean verbose)
throws KeeperException, InterruptedException {
- boolean cont = true;
- int cnt = 0;
-
- while (cont) {
- if (verbose) System.out.println("-");
- boolean sawLiveRecovering = false;
- zkStateReader.updateCloudState(true);
- CloudState cloudState = zkStateReader.getCloudState();
- Map<String,Slice> slices = cloudState.getSlices(DEFAULT_COLLECTION);
- for (Map.Entry<String,Slice> entry : slices.entrySet()) {
- Map<String,ZkNodeProps> shards = entry.getValue().getShards();
- for (Map.Entry<String,ZkNodeProps> shard : shards.entrySet()) {
- if (verbose) System.out.println("rstate:"
- + shard.getValue().get(ZkStateReader.STATE_PROP)
- + " live:"
- + cloudState.liveNodesContain(shard.getValue().get(
- ZkStateReader.NODE_NAME_PROP)));
- String state = shard.getValue().get(ZkStateReader.STATE_PROP);
- if ((state.equals(ZkStateReader.RECOVERING)
- || state.equals(ZkStateReader.SYNC))
- && cloudState.liveNodesContain(shard.getValue().get(
- ZkStateReader.NODE_NAME_PROP))) {
- sawLiveRecovering = true;
- }
- }
- }
- if (!sawLiveRecovering || cnt == 10) {
- if (!sawLiveRecovering) {
- if (verbose) System.out.println("no one is recoverying");
- } else {
- if (verbose) System.out
- .println("gave up waiting for recovery to finish..");
- }
- cont = false;
- } else {
- Thread.sleep(2000);
- }
- cnt++;
- }
+ super.waitForRecoveriesToFinish(DEFAULT_COLLECTION, zkStateReader, verbose);
}
private void brindDownShardIndexSomeDocsAndRecover() throws Exception,
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SafeStopThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SafeStopThread.java?rev=1244231&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SafeStopThread.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SafeStopThread.java Tue Feb 14 21:58:16 2012
@@ -0,0 +1,23 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public interface SafeStopThread {
+ public void stop();
+ public boolean isClosed();
+}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java?rev=1244231&r1=1244230&r2=1244231&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java Tue Feb 14 21:58:16 2012
@@ -71,6 +71,11 @@ public class ZkCmdExecutor {
Thread.currentThread().interrupt();
throw new InterruptedException();
}
+ if (Thread.currentThread() instanceof SafeStopThread) {
+ if (((SafeStopThread) Thread.currentThread()).isClosed()) {
+ throw new RuntimeException("Interrupted");
+ }
+ }
retryDelay(i);
}
}