You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/02/14 22:58:17 UTC

svn commit: r1244231 - in /lucene/dev/trunk/solr: core/src/java/org/apache/solr/cloud/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/common/cloud/

Author: markrmiller
Date: Tue Feb 14 21:58:16 2012
New Revision: 1244231

URL: http://svn.apache.org/viewvc?rev=1244231&view=rev
Log:
SOLR-3122: allow interrupting zkcmdoperations and some test work

Added:
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SafeStopThread.java   (with props)
Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1244231&r1=1244230&r2=1244231&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Tue Feb 14 21:58:16 2012
@@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.impl
 import org.apache.solr.client.solrj.request.CoreAdminRequest.PrepRecovery;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.SafeStopThread;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -46,7 +47,7 @@ import org.apache.solr.update.UpdateLog.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RecoveryStrategy extends Thread {
+public class RecoveryStrategy extends Thread implements SafeStopThread {
   private static final int MAX_RETRIES = 500;
   private static final int INTERRUPTED = MAX_RETRIES + 1;
   private static final int START_TIMEOUT = 100;
@@ -191,8 +192,14 @@ public class RecoveryStrategy extends Th
         }
         log.info("Sync Recovery was not successful - trying replication");
         UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-        if (ulog == null) return;
+        if (ulog == null) {
+          SolrException.log(log, "No UpdateLog found - cannot recover");
+          recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
+              core.getCoreDescriptor());
+          return;
+        }
         
+        log.info("Begin buffering updates");
         ulog.bufferUpdates();
         replayed = false;
         
@@ -296,4 +303,8 @@ public class RecoveryStrategy extends Th
     return future;
   }
 
+  public boolean isClosed() {
+    return close;
+  }
+
 }

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java?rev=1244231&r1=1244230&r2=1244231&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java Tue Feb 14 21:58:16 2012
@@ -18,15 +18,23 @@ package org.apache.solr.cloud;
  */
 
 import java.io.File;
+import java.util.Map;
 
 import org.apache.solr.BaseDistributedSearchTestCase;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.core.SolrConfig;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 
 public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearchTestCase {
+  
+  protected static final String DEFAULT_COLLECTION = "collection1";
   private static final boolean DEBUG = false;
   protected ZkTestServer zkServer;
 
@@ -44,6 +52,10 @@ public abstract class AbstractDistribute
     zkServer.run();
     
     System.setProperty("zkHost", zkServer.getZkAddress());
+    System.setProperty("enable.update.log", "true");
+    System.setProperty("remove.version.field", "true");
+    System
+    .setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
     
     AbstractZkTestCase.buildZooKeeper(zkServer.getZkHost(), zkServer.getZkAddress(), "solrconfig.xml", "schema.xml");
 
@@ -70,8 +82,83 @@ public abstract class AbstractDistribute
 
     shards = sb.toString();
   }
+  
+  protected void waitForRecoveriesToFinish(String collection, ZkStateReader zkStateReader, boolean verbose)
+      throws KeeperException, InterruptedException {
+    waitForRecoveriesToFinish(collection, zkStateReader, verbose, false);
+  }
+  
+  protected void waitForRecoveriesToFinish(String collection,
+      ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout)
+      throws KeeperException, InterruptedException {
+    boolean cont = true;
+    int cnt = 0;
+    
+    while (cont) {
+      if (verbose) System.out.println("-");
+      boolean sawLiveRecovering = false;
+      zkStateReader.updateCloudState(true);
+      CloudState cloudState = zkStateReader.getCloudState();
+      Map<String,Slice> slices = cloudState.getSlices(collection);
+      for (Map.Entry<String,Slice> entry : slices.entrySet()) {
+        Map<String,ZkNodeProps> shards = entry.getValue().getShards();
+        for (Map.Entry<String,ZkNodeProps> shard : shards.entrySet()) {
+          if (verbose) System.out.println("rstate:"
+              + shard.getValue().get(ZkStateReader.STATE_PROP)
+              + " live:"
+              + cloudState.liveNodesContain(shard.getValue().get(
+                  ZkStateReader.NODE_NAME_PROP)));
+          String state = shard.getValue().get(ZkStateReader.STATE_PROP);
+          if ((state.equals(ZkStateReader.RECOVERING) || state
+              .equals(ZkStateReader.SYNC))
+              && cloudState.liveNodesContain(shard.getValue().get(
+                  ZkStateReader.NODE_NAME_PROP))) {
+            sawLiveRecovering = true;
+          }
+        }
+      }
+      if (!sawLiveRecovering || cnt == 15) {
+        if (!sawLiveRecovering) {
+          if (verbose) System.out.println("no one is recoverying");
+        } else {
+          if (failOnTimeout) {
+            fail("There are still nodes recoverying");
+            return;
+          }
+          if (verbose) System.out
+              .println("gave up waiting for recovery to finish..");
+        }
+        cont = false;
+      } else {
+        Thread.sleep(2000);
+      }
+      cnt++;
+    }
+  }
+
+  protected void assertAllActive(String collection,ZkStateReader zkStateReader)
+      throws KeeperException, InterruptedException {
 
+      zkStateReader.updateCloudState(true);
+      CloudState cloudState = zkStateReader.getCloudState();
+      Map<String,Slice> slices = cloudState.getSlices(collection);
+      if (slices == null) {
+        throw new IllegalArgumentException("Cannot find collection:" + collection);
+      }
+      for (Map.Entry<String,Slice> entry : slices.entrySet()) {
+        Map<String,ZkNodeProps> shards = entry.getValue().getShards();
+        for (Map.Entry<String,ZkNodeProps> shard : shards.entrySet()) {
+
+          String state = shard.getValue().get(ZkStateReader.STATE_PROP);
+          if (!state.equals(ZkStateReader.ACTIVE)) {
+            fail("Not all shards are ACTIVE");
+          }
+        }
+      }
+  }
+  
   @Override
+  @After
   public void tearDown() throws Exception {
     if (DEBUG) {
       printLayout();
@@ -79,6 +166,9 @@ public abstract class AbstractDistribute
     zkServer.shutdown();
     System.clearProperty("zkHost");
     System.clearProperty("collection");
+    System.clearProperty("enable.update.log");
+    System.clearProperty("remove.version.field");
+    System.clearProperty("solr.directoryFactory");
     System.clearProperty("solr.test.sys.prop1");
     System.clearProperty("solr.test.sys.prop2");
     resetExceptionIgnores();
@@ -93,7 +183,5 @@ public abstract class AbstractDistribute
   
   @AfterClass
   public static void afterClass() throws InterruptedException {
-    // wait just a bit for any zk client threads to outlast timeout
-    Thread.sleep(2000);
   }
 }

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java?rev=1244231&r1=1244230&r2=1244231&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java Tue Feb 14 21:58:16 2012
@@ -62,7 +62,8 @@ public class BasicDistributedZkTest exte
   
   private Map<String,List<SolrServer>> otherCollectionClients = new HashMap<String,List<SolrServer>>();
   private Map<String,List<SolrServer>> oneInstanceCollectionClients = new HashMap<String,List<SolrServer>>();
-  private String oneInstanceCollection = "oneInstanceCollection";;
+  private String oneInstanceCollection = "oneInstanceCollection";
+  private String oneInstanceCollection2 = "oneInstanceCollection2";
   
   public BasicDistributedZkTest() {
     fixShardCount = true;
@@ -247,12 +248,63 @@ public class BasicDistributedZkTest exte
     testMultipleCollections();
     testANewCollectionInOneInstance();
     testSearchByCollectionName();
+    testANewCollectionInOneInstanceWithManualShardAssignement();
     // Thread.sleep(10000000000L);
     if (DEBUG) {
       super.printLayout();
     }
   }
 
+  private void testANewCollectionInOneInstanceWithManualShardAssignement() throws Exception {
+    List<SolrServer> collectionClients = new ArrayList<SolrServer>();
+    SolrServer client = clients.get(0);
+    oneInstanceCollectionClients.put(oneInstanceCollection , collectionClients);
+    String baseUrl = ((CommonsHttpSolrServer) client).getBaseURL();
+    createCollection(oneInstanceCollection2, collectionClients, baseUrl, 1, "slice1");
+    createCollection(oneInstanceCollection2, collectionClients, baseUrl, 2, "slice2");
+    createCollection(oneInstanceCollection2, collectionClients, baseUrl, 3, "slice2");
+    createCollection(oneInstanceCollection2, collectionClients, baseUrl, 4, "slice1");
+    
+    SolrServer client1 = createNewSolrServer(oneInstanceCollection2 + "1", baseUrl);
+    SolrServer client2 = createNewSolrServer(oneInstanceCollection2 + "2", baseUrl);
+    SolrServer client3 = createNewSolrServer(oneInstanceCollection2 + "3", baseUrl);
+    SolrServer client4 = createNewSolrServer(oneInstanceCollection2 + "4", baseUrl);
+    
+    client2.add(getDoc(id, "1")); 
+    client3.add(getDoc(id, "2")); 
+    client4.add(getDoc(id, "3")); 
+    
+    // no one should be recovering
+    waitForRecoveriesToFinish(oneInstanceCollection2, solrj.getZkStateReader(), false, true);
+    
+    assertAllActive(oneInstanceCollection2, solrj.getZkStateReader());
+    
+    client1.commit();
+    SolrQuery query = new SolrQuery("*:*");
+    query.set("distrib", false);
+    long oneDocs = client1.query(query).getResults().getNumFound();
+    long twoDocs = client2.query(query).getResults().getNumFound();
+    long threeDocs = client3.query(query).getResults().getNumFound();
+    long fourDocs = client4.query(query).getResults().getNumFound();
+    
+    query.set("collection", oneInstanceCollection2);
+    query.set("distrib", true);
+    long allDocs = solrj.query(query).getResults().getNumFound();
+    
+//    System.out.println("1:" + oneDocs);
+//    System.out.println("2:" + twoDocs);
+//    System.out.println("3:" + threeDocs);
+//    System.out.println("4:" + fourDocs);
+//    System.out.println("All Docs:" + allDocs);
+    
+    assertEquals(oneDocs, threeDocs);
+    assertEquals(twoDocs, fourDocs);
+    assertNotSame(oneDocs, twoDocs);
+    assertEquals(3, allDocs);
+    
+
+  }
+
   private void testSearchByCollectionName() throws SolrServerException {
     SolrServer client = clients.get(0);
     String baseUrl = ((CommonsHttpSolrServer) client).getBaseURL();
@@ -280,6 +332,9 @@ public class BasicDistributedZkTest exte
     SolrServer client3 = createNewSolrServer(oneInstanceCollection + "3", baseUrl);
     SolrServer client4 = createNewSolrServer(oneInstanceCollection + "4", baseUrl);
     
+    waitForRecoveriesToFinish(oneInstanceCollection, solrj.getZkStateReader(), false);
+    assertAllActive(oneInstanceCollection, solrj.getZkStateReader());
+    
     client2.add(getDoc(id, "1")); 
     client3.add(getDoc(id, "2")); 
     client4.add(getDoc(id, "3")); 
@@ -311,6 +366,12 @@ public class BasicDistributedZkTest exte
   private void createCollection(String collection,
       List<SolrServer> collectionClients, String baseUrl, int num)
       throws MalformedURLException, SolrServerException, IOException {
+    createCollection(collection, collectionClients, baseUrl, num, null);
+  }
+  
+  private void createCollection(String collection,
+      List<SolrServer> collectionClients, String baseUrl, int num, String shardId)
+      throws MalformedURLException, SolrServerException, IOException {
     CommonsHttpSolrServer server = new CommonsHttpSolrServer(
         baseUrl);
     Create createCmd = new Create();
@@ -319,6 +380,7 @@ public class BasicDistributedZkTest exte
     createCmd.setNumShards(2);
     createCmd.setDataDir(dataDir.getAbsolutePath() + File.separator
         + collection + num);
+    createCmd.setShardId(shardId);
     server.request(createCmd);
     collectionClients.add(createNewSolrServer(collection, baseUrl));
   }
@@ -389,12 +451,13 @@ public class BasicDistributedZkTest exte
       throws MalformedURLException, SolrServerException, IOException {
     List<SolrServer> collectionClients = new ArrayList<SolrServer>();
     otherCollectionClients.put(collection, collectionClients);
+    int unique = 0;
     for (SolrServer client : clients) {
       CommonsHttpSolrServer server = new CommonsHttpSolrServer(
           ((CommonsHttpSolrServer) client).getBaseURL());
       Create createCmd = new Create();
       createCmd.setCoreName(collection);
-      createCmd.setDataDir(dataDir.getAbsolutePath() + File.separator + collection);
+      createCmd.setDataDir(dataDir.getAbsolutePath() + File.separator + collection + unique++);
       server.request(createCmd);
       collectionClients.add(createNewSolrServer(collection,
           ((CommonsHttpSolrServer) client).getBaseURL()));

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java?rev=1244231&r1=1244230&r2=1244231&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java Tue Feb 14 21:58:16 2012
@@ -65,8 +65,6 @@ public class FullSolrCloudTest extends A
   
   private static final String SHARD2 = "shard2";
   
-  protected static final String DEFAULT_COLLECTION = "collection1";
-  
   private boolean printLayoutOnTearDown = false;
   
   String t1 = "a_t";
@@ -151,16 +149,12 @@ public class FullSolrCloudTest extends A
     System
         .setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
     System.setProperty("solrcloud.update.delay", "0");
-    System.setProperty("enable.update.log", "true");
-    System.setProperty("remove.version.field", "true");
   }
   
   @AfterClass
   public static void afterClass() {
     System.clearProperty("solr.directoryFactory");
     System.clearProperty("solrcloud.update.delay");
-    System.clearProperty("enable.update.log");
-    System.clearProperty("remove.version.field");
   }
   
   public FullSolrCloudTest() {
@@ -655,45 +649,7 @@ public class FullSolrCloudTest extends A
   
   protected void waitForRecoveriesToFinish(boolean verbose)
       throws KeeperException, InterruptedException {
-    boolean cont = true;
-    int cnt = 0;
-    
-    while (cont) {
-      if (verbose) System.out.println("-");
-      boolean sawLiveRecovering = false;
-      zkStateReader.updateCloudState(true);
-      CloudState cloudState = zkStateReader.getCloudState();
-      Map<String,Slice> slices = cloudState.getSlices(DEFAULT_COLLECTION);
-      for (Map.Entry<String,Slice> entry : slices.entrySet()) {
-        Map<String,ZkNodeProps> shards = entry.getValue().getShards();
-        for (Map.Entry<String,ZkNodeProps> shard : shards.entrySet()) {
-          if (verbose) System.out.println("rstate:"
-              + shard.getValue().get(ZkStateReader.STATE_PROP)
-              + " live:"
-              + cloudState.liveNodesContain(shard.getValue().get(
-                  ZkStateReader.NODE_NAME_PROP)));
-          String state = shard.getValue().get(ZkStateReader.STATE_PROP);
-          if ((state.equals(ZkStateReader.RECOVERING)
-              || state.equals(ZkStateReader.SYNC))
-              && cloudState.liveNodesContain(shard.getValue().get(
-                  ZkStateReader.NODE_NAME_PROP))) {
-            sawLiveRecovering = true;
-          }
-        }
-      }
-      if (!sawLiveRecovering || cnt == 10) {
-        if (!sawLiveRecovering) {
-          if (verbose) System.out.println("no one is recoverying");
-        } else {
-          if (verbose) System.out
-              .println("gave up waiting for recovery to finish..");
-        }
-        cont = false;
-      } else {
-        Thread.sleep(2000);
-      }
-      cnt++;
-    }
+    super.waitForRecoveriesToFinish(DEFAULT_COLLECTION, zkStateReader, verbose);
   }
   
   private void brindDownShardIndexSomeDocsAndRecover() throws Exception,

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SafeStopThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SafeStopThread.java?rev=1244231&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SafeStopThread.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SafeStopThread.java Tue Feb 14 21:58:16 2012
@@ -0,0 +1,23 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public interface SafeStopThread {
+  public void stop();
+  public boolean isClosed();
+}

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java?rev=1244231&r1=1244230&r2=1244231&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java Tue Feb 14 21:58:16 2012
@@ -71,6 +71,11 @@ public class ZkCmdExecutor {
           Thread.currentThread().interrupt();
           throw new InterruptedException();
         }
+        if (Thread.currentThread() instanceof SafeStopThread) {
+          if (((SafeStopThread) Thread.currentThread()).isClosed()) {
+            throw new RuntimeException("Interrupted");
+          }
+        }
         retryDelay(i);
       }
     }