You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2015/03/24 04:19:38 UTC

svn commit: r1668779 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/handler/admin/ core/src/java/org/apache/solr/handler/component/ core/src/java/org/apache/solr...

Author: markrmiller
Date: Tue Mar 24 03:19:37 2015
New Revision: 1668779

URL: http://svn.apache.org/r1668779
Log:
SOLR-7134: Replication can still cause index corruption.

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTestUtil.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsWriteToMultipleCollectionsTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
    lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/StopableIndexingThread.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1668779&r1=1668778&r2=1668779&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Tue Mar 24 03:19:37 2015
@@ -282,6 +282,8 @@ Bug Fixes
 * SOLR-7286: Using HDFS's FileSystem.newInstance does not guarantee a new instance.
   (Mark Miller)
 
+* SOLR-7134: Replication can still cause index corruption. (Mark Miller, shalin, Mike Drob)
+
 Optimizations
 ----------------------
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1668779&r1=1668778&r2=1668779&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Tue Mar 24 03:19:37 2015
@@ -305,7 +305,7 @@ final class ShardLeaderElectionContext e
             searchHolder.decref();
           }
         } catch (Exception e) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
+          log.error("Error in solrcloud_debug block", e);
         }
       }
       if (!success) {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1668779&r1=1668778&r2=1668779&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Tue Mar 24 03:19:37 2015
@@ -160,8 +160,7 @@ public class RecoveryStrategy extends Th
     boolean success = replicationHandler.doFetch(solrParams, false);
     
     if (!success) {
-      throw new SolrException(ErrorCode.SERVER_ERROR,
-          "Replication for recovery failed.");
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
     }
     
     // solrcloud_debug
@@ -179,7 +178,7 @@ public class RecoveryStrategy extends Th
               + " from "
               + leaderUrl
               + " gen:"
-              + core.getDeletionPolicy().getLatestCommit().getGeneration()
+              + core.getDeletionPolicy().getLatestCommit() != null ? "null" : core.getDeletionPolicy().getLatestCommit().getGeneration()
               + " data:" + core.getDataDir()
               + " index:" + core.getIndexDir()
               + " newIndex:" + core.getNewIndexDir()
@@ -189,7 +188,7 @@ public class RecoveryStrategy extends Th
           searchHolder.decref();
         }
       } catch (Exception e) {
-        throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
+        log.debug("Error in solrcloud_debug block", e);
       }
     }
 
@@ -409,7 +408,7 @@ public class RecoveryStrategy extends Th
                   searchHolder.decref();
                 }
               } catch (Exception e) {
-                throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
+                log.debug("Error in solrcloud_debug block", e);
               }
             }
 
@@ -557,7 +556,7 @@ public class RecoveryStrategy extends Th
           searchHolder.decref();
         }
       } catch (Exception e) {
-        throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
+        log.debug("Error in solrcloud_debug block", e);
       }
     }
     

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java?rev=1668779&r1=1668778&r2=1668779&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java Tue Mar 24 03:19:37 2015
@@ -245,25 +245,37 @@ public class IndexFetcher {
     }
   }
 
-  private boolean successfulInstall = false;
-
+  boolean fetchLatestIndex(final SolrCore core, boolean forceReplication) throws IOException, InterruptedException {
+    return fetchLatestIndex(core, forceReplication, false);
+  }
+  
   /**
    * This command downloads all the necessary files from master to install a index commit point. Only changed files are
    * downloaded. It also downloads the conf files (if they are modified).
    *
    * @param core the SolrCore
    * @param forceReplication force a replication in all cases 
+   * @param forceCoreReload force a core reload in all cases
    * @return true on success, false if slave is already in sync
    * @throws IOException if an exception occurs
    */
-  boolean fetchLatestIndex(final SolrCore core, boolean forceReplication) throws IOException, InterruptedException {
-    successfulInstall = false;
+   boolean fetchLatestIndex(final SolrCore core, boolean forceReplication, boolean forceCoreReload) throws IOException, InterruptedException {
+    boolean cleanupDone = false;
+    boolean successfulInstall = false;
     replicationStartTime = System.currentTimeMillis();
     Directory tmpIndexDir = null;
     String tmpIndex = null;
     Directory indexDir = null;
     String indexDirPath = null;
     boolean deleteTmpIdxDir = true;
+    
+    if (!core.getSolrCoreState().getLastReplicateIndexSuccess()) {
+      // if the last replication was not a success, we force a full replication
+      // when we are a bit more confident we may want to try a partial replication
+      // if the error is connection related or something, but we have to be careful
+      forceReplication = true;
+    }
+    
     try {
       //get the current 'replicateable' index version in the master
       NamedList response = null;
@@ -404,6 +416,7 @@ public class IndexFetcher {
               + " secs");
           Collection<Map<String,Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload);
           if (!modifiedConfFiles.isEmpty()) {
+            reloadCore = true;
             downloadConfFiles(confFilesToDownload, latestGeneration);
             if (isFullCopyNeeded) {
               successfulInstall = modifyIndexProps(tmpIdxDirName);
@@ -426,7 +439,6 @@ public class IndexFetcher {
               logReplicationTimeAndConfFiles(modifiedConfFiles,
                   successfulInstall);// write to a file time of replication and
                                      // conf files.
-              reloadCore = true;
             }
           } else {
             terminateAndWaitFsyncService();
@@ -448,7 +460,8 @@ public class IndexFetcher {
         }
         
         // we must reload the core after we open the IW back up
-        if (reloadCore) {
+       if (successfulInstall && (reloadCore || forceCoreReload)) {
+          LOG.info("Reloading SolrCore {}", core.getName());
           reloadCore();
         }
 
@@ -469,6 +482,16 @@ public class IndexFetcher {
           openNewSearcherAndUpdateCommitPoint();
         }
         
+        if (!isFullCopyNeeded && !forceReplication && !successfulInstall) {
+          cleanup(core, tmpIndexDir, indexDir, deleteTmpIdxDir, successfulInstall);
+          cleanupDone = true;
+          // we try with a full copy of the index
+          LOG.warn(
+              "Replication attempt was not successful - trying a full index replication reloadCore={}",
+              reloadCore);
+          successfulInstall = fetchLatestIndex(core, true, reloadCore);
+        }
+        
         replicationStartTime = 0;
         return successfulInstall;
       } catch (ReplicationHandlerException e) {
@@ -482,41 +505,51 @@ public class IndexFetcher {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Index fetch failed : ", e);
       }
     } finally {
-      try {
-        if (!successfulInstall) {
-          try {
-            logReplicationTimeAndConfFiles(null, successfulInstall);
-          } catch(Exception e) {
-            LOG.error("caught", e);
-          }
-        }
-        filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;
-        replicationStartTime = 0;
-        dirFileFetcher = null;
-        localFileFetcher = null;
-        if (fsyncService != null && !fsyncService.isShutdown()) fsyncService
-            .shutdownNow();
-        fsyncService = null;
-        stop = false;
-        fsyncException = null;
-      } finally {
-        if (deleteTmpIdxDir && tmpIndexDir != null) {
-          try {
-            core.getDirectoryFactory().doneWithDirectory(tmpIndexDir);
-            core.getDirectoryFactory().remove(tmpIndexDir);
-          } catch (IOException e) {
-            SolrException.log(LOG, "Error removing directory " + tmpIndexDir, e);
-          }
-        }
+      if (!cleanupDone) {
+        cleanup(core, tmpIndexDir, indexDir, deleteTmpIdxDir, successfulInstall);
+      }
+    }
+  }
         
-        if (tmpIndexDir != null) {
-          core.getDirectoryFactory().release(tmpIndexDir);
+  private void cleanup(final SolrCore core, Directory tmpIndexDir,
+      Directory indexDir, boolean deleteTmpIdxDir, boolean successfulInstall) throws IOException {
+    try {
+      if (!successfulInstall) {
+        try {
+          logReplicationTimeAndConfFiles(null, successfulInstall);
+        } catch (Exception e) {
+          LOG.error("caught", e);
         }
-        
-        if (indexDir != null) {
-          core.getDirectoryFactory().release(indexDir);
+      }
+      
+      core.getUpdateHandler().getSolrCoreState().setLastReplicateIndexSuccess(successfulInstall);
+      
+      filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;
+      replicationStartTime = 0;
+      dirFileFetcher = null;
+      localFileFetcher = null;
+      if (fsyncService != null && !fsyncService.isShutdown()) fsyncService
+          .shutdownNow();
+      fsyncService = null;
+      stop = false;
+      fsyncException = null;
+    } finally {
+      if (deleteTmpIdxDir && tmpIndexDir != null) {
+        try {
+          core.getDirectoryFactory().doneWithDirectory(tmpIndexDir);
+          core.getDirectoryFactory().remove(tmpIndexDir);
+        } catch (IOException e) {
+          SolrException.log(LOG, "Error removing directory " + tmpIndexDir, e);
         }
       }
+      
+      if (tmpIndexDir != null) {
+        core.getDirectoryFactory().release(tmpIndexDir);
+      }
+      
+      if (indexDir != null) {
+        core.getDirectoryFactory().release(indexDir);
+      }
     }
   }
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1668779&r1=1668778&r2=1668779&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Tue Mar 24 03:19:37 2015
@@ -832,7 +832,7 @@ public class CoreAdminHandler extends Re
               searchHolder.decref();
             }
           } catch (Exception e) {
-            throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
+            log.debug("Error in solrcloud_debug block", e);
           }
         }
         if (!success) {
@@ -1011,7 +1011,7 @@ public class CoreAdminHandler extends Re
               searchHolder.decref();
             }
           } catch (Exception e) {
-            throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
+            log.debug("Error in solrcloud_debug block", e);
           }
         }
       }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java?rev=1668779&r1=1668778&r2=1668779&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java Tue Mar 24 03:19:37 2015
@@ -29,6 +29,7 @@ import org.apache.lucene.index.Indexable
 import org.apache.lucene.index.StorableField;
 import org.apache.lucene.index.StoredDocument;
 import org.apache.lucene.index.Term;
+import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefBuilder;
 import org.apache.solr.client.solrj.SolrResponse;
@@ -98,6 +99,25 @@ public class RealTimeGetComponent extend
 
     val = params.get("getUpdates");
     if (val != null) {
+      // solrcloud_debug
+      if (log.isDebugEnabled()) {
+        try {
+          RefCounted<SolrIndexSearcher> searchHolder = req.getCore()
+              .getNewestSearcher(false);
+          SolrIndexSearcher searcher = searchHolder.get();
+          try {
+            log.debug(req.getCore().getCoreDescriptor()
+                .getCoreContainer().getZkController().getNodeName()
+                + " min count to sync to (from most recent searcher view) "
+                + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
+          } finally {
+            searchHolder.decref();
+          }
+        } catch (Exception e) {
+          log.debug("Error in solrcloud_debug block", e);
+        }
+      }
+      
       processGetUpdates(rb);
       return;
     }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java?rev=1668779&r1=1668778&r2=1668779&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java Tue Mar 24 03:19:37 2015
@@ -53,6 +53,7 @@ public final class DefaultSolrCoreState
 
   private volatile boolean recoveryRunning;
   private RecoveryStrategy recoveryStrat;
+  private volatile boolean lastReplicationSuccess = true;
 
   private RefCounted<IndexWriter> refCntWriter;
 
@@ -377,5 +378,14 @@ public final class DefaultSolrCoreState
     return leaderThrottle;
   }
   
+  @Override
+  public boolean getLastReplicateIndexSuccess() {
+    return lastReplicationSuccess;
+  }
+
+  @Override
+  public void setLastReplicateIndexSuccess(boolean success) {
+    this.lastReplicationSuccess = success;
+  }
   
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java?rev=1668779&r1=1668778&r2=1668779&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java Tue Mar 24 03:19:37 2015
@@ -495,7 +495,7 @@ public class PeerSync  {
             cmd.setVersion(version);
             cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
             if (debug) {
-              log.debug(msg() + "add " + cmd);
+              log.debug(msg() + "add " + cmd + " id " + sdoc.getField("id"));
             }
             proc.processAdd(cmd);
             break;
@@ -508,7 +508,7 @@ public class PeerSync  {
             cmd.setVersion(version);
             cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
             if (debug) {
-              log.debug(msg() + "delete " + cmd);
+              log.debug(msg() + "delete " + cmd + " " + new BytesRef(idBytes).utf8ToString());
             }
             proc.processDelete(cmd);
             break;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCoreState.java?rev=1668779&r1=1668778&r2=1668779&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCoreState.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCoreState.java Tue Mar 24 03:19:37 2015
@@ -146,4 +146,7 @@ public abstract class SolrCoreState {
    */
   public abstract ActionThrottle getLeaderThrottle();
 
+  public abstract boolean getLastReplicateIndexSuccess();
+
+  public abstract void setLastReplicateIndexSuccess(boolean success);
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1668779&r1=1668778&r2=1668779&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Tue Mar 24 03:19:37 2015
@@ -1081,6 +1081,7 @@ public class DistributedUpdateProcessor
               Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
               if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
                 // This update is a repeat, or was reordered.  We need to drop this update.
+                log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
                 return true;
               }
 
@@ -1568,6 +1569,7 @@ public class DistributedUpdateProcessor
               Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
               if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
                 // This update is a repeat, or was reordered.  We need to drop this update.
+                log.debug("Dropping delete update due to version {}", idBytes.utf8ToString());
                 return true;
               }
             }

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java?rev=1668779&r1=1668778&r2=1668779&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java Tue Mar 24 03:19:37 2015
@@ -112,8 +112,21 @@ public class ChaosMonkeySafeLeaderTest e
     
     List<StopableIndexingThread> threads = new ArrayList<>();
     int threadCount = 2;
+    int batchSize = 1;
+    if (random().nextBoolean()) {
+      batchSize = random().nextInt(98) + 2;
+    }
+    
+    boolean pauseBetweenUpdates = TEST_NIGHTLY ? random().nextBoolean() : true;
+    int maxUpdates = -1;
+    if (!pauseBetweenUpdates) {
+      maxUpdates = 1000 + random().nextInt(1000);
+    } else {
+      maxUpdates = 15000;
+    }
+    
     for (int i = 0; i < threadCount; i++) {
-      StopableIndexingThread indexThread = new StopableIndexingThread(controlClient, cloudClient, Integer.toString(i), true);
+      StopableIndexingThread indexThread = new StopableIndexingThread(controlClient, cloudClient, Integer.toString(i), true, maxUpdates, batchSize, pauseBetweenUpdates); // random().nextInt(999) + 1
       threads.add(indexThread);
       indexThread.start();
     }
@@ -158,7 +171,7 @@ public class ChaosMonkeySafeLeaderTest e
 
     waitForThingsToLevelOut(180000);
 
-    checkShardConsistency(true, true);
+    checkShardConsistency(batchSize == 1, true);
     
     if (VERBOSE) System.out.println("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound() + "\n\n");
     

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java?rev=1668779&r1=1668778&r2=1668779&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java Tue Mar 24 03:19:37 2015
@@ -72,10 +72,10 @@ public class RecoveryZkTest extends Abst
       maxDoc = maxDocNightlyList[random().nextInt(maxDocList.length - 1)];
     }
     
-    indexThread = new StopableIndexingThread(controlClient, cloudClient, "1", true, maxDoc);
+    indexThread = new StopableIndexingThread(controlClient, cloudClient, "1", true, maxDoc, 1, true);
     indexThread.start();
     
-    indexThread2 = new StopableIndexingThread(controlClient, cloudClient, "2", true, maxDoc);
+    indexThread2 = new StopableIndexingThread(controlClient, cloudClient, "2", true, maxDoc, 1, true);
     
     indexThread2.start();
 

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTestUtil.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTestUtil.java?rev=1668779&r1=1668778&r2=1668779&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTestUtil.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTestUtil.java Tue Mar 24 03:19:37 2015
@@ -18,6 +18,8 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -37,6 +39,7 @@ import org.apache.solr.common.util.IOUti
  */
 
 public class HdfsTestUtil {
+  private static Logger log = LoggerFactory.getLogger(HdfsTestUtil.class);
   
   private static Locale savedLocale;
   
@@ -131,7 +134,15 @@ public class HdfsTestUtil {
       if (timer != null) {
         timer.cancel();
       }
-      dfsCluster.shutdown();
+      try {
+        dfsCluster.shutdown();
+      } catch (Error e) {
+        // Added in SOLR-7134
+        // Rarely, this can fail to either a NullPointerException
+        // or a class not found exception. The later may fixable
+        // by adding test dependencies.
+        log.warn("Exception shutting down dfsCluster", e);
+      }
     }
     
     // TODO: we HACK around HADOOP-9643

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsWriteToMultipleCollectionsTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsWriteToMultipleCollectionsTest.java?rev=1668779&r1=1668778&r2=1668779&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsWriteToMultipleCollectionsTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsWriteToMultipleCollectionsTest.java Tue Mar 24 03:19:37 2015
@@ -105,7 +105,7 @@ public class HdfsWriteToMultipleCollecti
       CloudSolrClient client = new CloudSolrClient(zkServer.getZkAddress());
       client.setDefaultCollection(ACOLLECTION + i);
       cloudClients.add(client);
-      StopableIndexingThread indexThread = new StopableIndexingThread(null, client, "1", true, docCount);
+      StopableIndexingThread indexThread = new StopableIndexingThread(null, client, "1", true, docCount, 1, true);
       threads.add(indexThread);
       indexThread.start();
     }

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java?rev=1668779&r1=1668778&r2=1668779&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java Tue Mar 24 03:19:37 2015
@@ -301,8 +301,9 @@ public class TestReplicationHandler exte
                     details.get("slave"));
       // SOLR-2677: assert not false negatives
       Object timesFailed = ((NamedList)details.get("slave")).get(IndexFetcher.TIMES_FAILED);
-      assertEquals("slave has fetch error count",
-                   null, timesFailed);
+      // SOLR-7134: we can have a fail because some mock index files have no checksum, will
+      // always be downloaded, and may not be able to be moved into the existing index
+      assertTrue("slave has fetch error count: " + (String)timesFailed, timesFailed == null || ((String) timesFailed).equals("1"));
 
       if (3 != i) {
         // index & fetch

Modified: lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/StopableIndexingThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/StopableIndexingThread.java?rev=1668779&r1=1668778&r2=1668779&view=diff
==============================================================================
--- lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/StopableIndexingThread.java (original)
+++ lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/StopableIndexingThread.java Tue Mar 24 03:19:37 2015
@@ -42,18 +42,23 @@ public class StopableIndexingThread exte
   private SolrClient cloudClient;
   private int numDeletes;
   private int numAdds;
-
+  private List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
+  private int batchSize;
+  private boolean pauseBetweenUpdates;
+  
   public StopableIndexingThread(SolrClient controlClient, SolrClient cloudClient, String id, boolean doDeletes) {
-    this(controlClient, cloudClient, id, doDeletes, -1);
+    this(controlClient, cloudClient, id, doDeletes, -1, 1, true);
   }
   
-  public StopableIndexingThread(SolrClient controlClient, SolrClient cloudClient, String id, boolean doDeletes, int numCycles) {
+  public StopableIndexingThread(SolrClient controlClient, SolrClient cloudClient, String id, boolean doDeletes, int numCycles, int batchSize, boolean pauseBetweenUpdates) {
     super("StopableIndexingThread");
     this.controlClient = controlClient;
     this.cloudClient = cloudClient;
     this.id = id;
     this.doDeletes = doDeletes;
     this.numCycles = numCycles;
+    this.batchSize = batchSize;
+    this.pauseBetweenUpdates = pauseBetweenUpdates;
     setDaemon(true);
   }
   
@@ -100,8 +105,17 @@ public class StopableIndexingThread exte
       
       try {
         numAdds++;
-        indexr("id", id, i1, 50, t1,
+        SolrInputDocument doc = new SolrInputDocument();
+        addFields(doc, "id", id, i1, 50, t1,
             "to come to the aid of their country.");
+        addFields(doc, "rnd_b", true);
+        
+        docs.add(doc);
+        
+        if (docs.size() >= batchSize)  {
+          indexDocs(docs);
+          docs.clear();
+        }
       } catch (Exception e) {
         addFailed = true;
         System.err.println("REQUEST FAILED for id=" + id);
@@ -117,10 +131,12 @@ public class StopableIndexingThread exte
         deletes.add(id);
       }
       
-      try {
-        Thread.currentThread().sleep(AbstractFullDistribZkTestBase.random().nextInt(100));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
+      if (docs.size() > 0 && pauseBetweenUpdates) {
+        try {
+          Thread.currentThread().sleep(AbstractFullDistribZkTestBase.random().nextInt(500) + 50);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
       }
     }
     
@@ -151,26 +167,18 @@ public class StopableIndexingThread exte
     }
   }
   
-  protected void indexr(Object... fields) throws Exception {
-    SolrInputDocument doc = new SolrInputDocument();
-    addFields(doc, fields);
-    addFields(doc, "rnd_b", true);
-    indexDoc(doc);
-  }
-  
-  protected void indexDoc(SolrInputDocument doc) throws IOException,
+  protected void indexDocs(List<SolrInputDocument> docs) throws IOException,
       SolrServerException {
     
     if (controlClient != null) {
       UpdateRequest req = new UpdateRequest();
-      req.add(doc);
+      req.add(docs);
       req.setParam("CONTROL", "TRUE");
       req.process(controlClient);
     }
-
     
     UpdateRequest ureq = new UpdateRequest();
-    ureq.add(doc);
+    ureq.add(docs);
     ureq.process(cloudClient);
   }