You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2014/09/24 16:47:38 UTC

svn commit: r1627340 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/handler/ core/src/test-files/solr/collection1/conf/ core/src/test/org/apache/solr/handler/ solrj/src/java/org/apache/solr/common/params/

Author: noble
Date: Wed Sep 24 14:47:37 2014
New Revision: 1627340

URL: http://svn.apache.org/r1627340
Log:
SOLR-6485

Added:
    lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-master-throttled.xml   (with props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1627340&r1=1627339&r2=1627340&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Wed Sep 24 14:47:37 2014
@@ -135,6 +135,9 @@ New Features
   of arbitrary functions, ie: stats.field={!func}product(price,popularity)
   (hossman)
 
+* SOLR-6485: ReplicationHandler should have an option to throttle the speed of
+  replication (Varun Thacker, NOble Paul)
+
 Bug Fixes
 ----------------------
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=1627340&r1=1627339&r2=1627340&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java Wed Sep 24 14:47:37 2014
@@ -53,9 +53,10 @@ import org.apache.lucene.index.IndexWrit
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RateLimiter;
 import org.apache.solr.client.solrj.impl.BinaryResponseParser;
-import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
@@ -64,8 +65,8 @@ import org.apache.solr.common.util.Named
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.core.CloseHook;
-import org.apache.solr.core.DirectoryFactory.DirContext;
 import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.DirectoryFactory.DirContext;
 import org.apache.solr.core.IndexDeletionPolicyWrapper;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.SolrDeletionPolicy;
@@ -1066,7 +1067,7 @@ public class ReplicationHandler extends 
             core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getGeneration());
           }
           if(oldCommitPoint != null){
-            core.getDeletionPolicy().releaseCommitPoint(oldCommitPoint.getGeneration());
+            core.getDeletionPolicy().releaseCommitPointAndExtendReserve(oldCommitPoint.getGeneration());
           }
           ***/
         }
@@ -1094,6 +1095,9 @@ public class ReplicationHandler extends 
     };
   }
 
+  /**This class is used to read and send files in the lucene index
+   *
+   */
   private class DirectoryFileStream {
     protected SolrParams params;
 
@@ -1102,40 +1106,83 @@ public class ReplicationHandler extends 
     protected Long indexGen;
     protected IndexDeletionPolicyWrapper delPolicy;
 
+    protected String fileName;
+    protected String cfileName;
+    protected String sOffset;
+    protected String sLen;
+    protected String compress;
+    protected boolean useChecksum;
+
+    protected long offset = -1;
+    protected int len = -1;
+
+    protected Checksum checksum;
+
+    private RateLimiter rateLimiter;
+
+    byte[] buf;
+
     public DirectoryFileStream(SolrParams solrParams) {
       params = solrParams;
       delPolicy = core.getDeletionPolicy();
+
+      fileName = params.get(FILE);
+      cfileName = params.get(CONF_FILE_SHORT);
+      sOffset = params.get(OFFSET);
+      sLen = params.get(LEN);
+      compress = params.get(COMPRESSION);
+      useChecksum = params.getBool(CHECKSUM, false);
+      indexGen = params.getLong(GENERATION, null);
+      if (useChecksum) {
+        checksum = new Adler32();
+      }
+      //No throttle if MAX_WRITE_PER_SECOND is not specified
+      double maxWriteMBPerSec = params.getDouble(MAX_WRITE_PER_SECOND, Double.MAX_VALUE);
+      rateLimiter = new RateLimiter.SimpleRateLimiter(maxWriteMBPerSec);
     }
 
-    public void write(OutputStream out) throws IOException {
-      String fileName = params.get(FILE);
-      String cfileName = params.get(CONF_FILE_SHORT);
-      String sOffset = params.get(OFFSET);
-      String sLen = params.get(LEN);
-      String compress = params.get(COMPRESSION);
-      String sChecksum = params.get(CHECKSUM);
-      String sGen = params.get(GENERATION);
-      if (sGen != null) indexGen = Long.parseLong(sGen);
+    protected void initWrite() throws IOException {
+      if (sOffset != null) offset = Long.parseLong(sOffset);
+      if (sLen != null) len = Integer.parseInt(sLen);
+      if (fileName == null && cfileName == null) {
+        // no filename do nothing
+        writeNothingAndFlush();
+      }
+      buf = new byte[(len == -1 || len > PACKET_SZ) ? PACKET_SZ : len];
+
+      //reserve commit point till write is complete
+      if(indexGen != null) {
+        delPolicy.saveCommitPoint(indexGen);
+      }
+    }
+
+    protected void createOutputStream(OutputStream out) {
       if (Boolean.parseBoolean(compress)) {
         fos = new FastOutputStream(new DeflaterOutputStream(out));
       } else {
         fos = new FastOutputStream(out);
       }
+    }
+
+    protected void releaseCommitPointAndExtendReserve() {
+      if(indexGen != null) {
+        //release the commit point as the write is complete
+        delPolicy.releaseCommitPoint(indexGen);
+
+        //Reserve the commit point for another 10s for the next file to be to fetched.
+        //We need to keep extending the commit reservation between requests so that the replica can fetch
+        //all the files correctly.
+        delPolicy.setReserveDuration(indexGen, reserveCommitDuration);
+      }
+
+    }
+    public void write(OutputStream out) throws IOException {
+      createOutputStream(out);
 
-      int packetsWritten = 0;
       IndexInput in = null;
       try {
-        long offset = -1;
-        int len = -1;
-        // check if checksum is requested
-        boolean useChecksum = Boolean.parseBoolean(sChecksum);
-        if (sOffset != null) offset = Long.parseLong(sOffset);
-        if (sLen != null) len = Integer.parseInt(sLen);
-        if (fileName == null && cfileName == null) {
-          // no filename do nothing
-          writeNothing();
-        }
-        
+        initWrite();
+
         RefCounted<SolrIndexSearcher> sref = core.getSearcher();
         Directory dir;
         try {
@@ -1147,17 +1194,15 @@ public class ReplicationHandler extends 
         in = dir.openInput(fileName, IOContext.READONCE);
         // if offset is mentioned move the pointer to that point
         if (offset != -1) in.seek(offset);
-        byte[] buf = new byte[(len == -1 || len > PACKET_SZ) ? PACKET_SZ : len];
-        Checksum checksum = null;
-        if (useChecksum) checksum = new Adler32();
         
         long filelen = dir.fileLength(fileName);
+        long maxBytesBeforePause = 0;
+
         while (true) {
           offset = offset == -1 ? 0 : offset;
           int read = (int) Math.min(buf.length, filelen - offset);
           in.readBytes(buf, 0, read);
-          
-          fos.writeInt((int) read);
+          fos.writeInt(read);
           if (useChecksum) {
             checksum.reset();
             checksum.update(buf, 0, read);
@@ -1165,13 +1210,15 @@ public class ReplicationHandler extends 
           }
           fos.write(buf, 0, read);
           fos.flush();
-          if (indexGen != null && (packetsWritten % 5 == 0)) {
-            // after every 5 packets reserve the commitpoint for some time
-            delPolicy.setReserveDuration(indexGen, reserveCommitDuration);
+
+          //Pause if necessary
+          maxBytesBeforePause += read;
+          if (maxBytesBeforePause >= rateLimiter.getMinPauseCheckBytes()) {
+            rateLimiter.pause(maxBytesBeforePause);
+            maxBytesBeforePause = 0;
           }
-          packetsWritten++;
           if (read != buf.length) {
-            writeNothing();
+            writeNothingAndFlush();
             fos.close();
             break;
           }
@@ -1184,6 +1231,7 @@ public class ReplicationHandler extends 
         if (in != null) {
           in.close();
         }
+        releaseCommitPointAndExtendReserve();
       }
     }
 
@@ -1191,12 +1239,14 @@ public class ReplicationHandler extends 
     /**
      * Used to write a marker for EOF
      */
-    protected void writeNothing() throws IOException {
+    protected void writeNothingAndFlush() throws IOException {
       fos.writeInt(0);
       fos.flush();
     }
   }
 
+  /**This is used to write files in the conf directory.
+   */
   private class LocalFsFileStream extends DirectoryFileStream {
 
     public LocalFsFileStream(SolrParams solrParams) {
@@ -1205,39 +1255,13 @@ public class ReplicationHandler extends 
 
     @Override
     public void write(OutputStream out) throws IOException {
-      String fileName = params.get(FILE);
-      String cfileName = params.get(CONF_FILE_SHORT);
-      String sOffset = params.get(OFFSET);
-      String sLen = params.get(LEN);
-      String compress = params.get(COMPRESSION);
-      String sChecksum = params.get(CHECKSUM);
-      String sGen = params.get(GENERATION);
-      if (sGen != null) indexGen = Long.parseLong(sGen);
-      if (Boolean.parseBoolean(compress)) {
-        fos = new FastOutputStream(new DeflaterOutputStream(out));
-      } else {
-        fos = new FastOutputStream(out);
-      }
+      createOutputStream(out);
       FileInputStream inputStream = null;
-      int packetsWritten = 0;
       try {
-        long offset = -1;
-        int len = -1;
-        //check if checksum is requested
-        boolean useChecksum = Boolean.parseBoolean(sChecksum);
-        if (sOffset != null)
-          offset = Long.parseLong(sOffset);
-        if (sLen != null)
-          len = Integer.parseInt(sLen);
-        if (fileName == null && cfileName == null) {
-          //no filename do nothing
-          writeNothing();
-        }
-
-        File file = null;
+        initWrite();
   
         //if if is a conf file read from config diectory
-        file = new File(core.getResourceLoader().getConfigDir(), cfileName);
+        File file = new File(core.getResourceLoader().getConfigDir(), cfileName);
 
         if (file.exists() && file.canRead()) {
           inputStream = new FileInputStream(file);
@@ -1245,17 +1269,13 @@ public class ReplicationHandler extends 
           //if offset is mentioned move the pointer to that point
           if (offset != -1)
             channel.position(offset);
-          byte[] buf = new byte[(len == -1 || len > PACKET_SZ) ? PACKET_SZ : len];
-          Checksum checksum = null;
-          if (useChecksum)
-            checksum = new Adler32();
           ByteBuffer bb = ByteBuffer.wrap(buf);
 
           while (true) {
             bb.clear();
             long bytesRead = channel.read(bb);
             if (bytesRead <= 0) {
-              writeNothing();
+              writeNothingAndFlush();
               fos.close();
               break;
             }
@@ -1267,19 +1287,15 @@ public class ReplicationHandler extends 
             }
             fos.write(buf, 0, (int) bytesRead);
             fos.flush();
-            if (indexGen != null && (packetsWritten % 5 == 0)) {
-              //after every 5 packets reserve the commitpoint for some time
-              delPolicy.setReserveDuration(indexGen, reserveCommitDuration);
-            }
-            packetsWritten++;
           }
         } else {
-          writeNothing();
+          writeNothingAndFlush();
         }
       } catch (IOException e) {
         LOG.warn("Exception while writing response for params: " + params, e);
       } finally {
         IOUtils.closeQuietly(inputStream);
+        releaseCommitPointAndExtendReserve();
       }
     }
   } 
@@ -1328,6 +1344,8 @@ public class ReplicationHandler extends 
 
   public static final String SIZE = "size";
 
+  public static final String MAX_WRITE_PER_SECOND = "maxWriteMBPerSec";
+
   public static final String CONF_FILE_SHORT = "cf";
 
   public static final String CHECKSUM = "checksum";

Added: lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-master-throttled.xml
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-master-throttled.xml?rev=1627340&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-master-throttled.xml (added)
+++ lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-master-throttled.xml Wed Sep 24 14:47:37 2014
@@ -0,0 +1,68 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<config>
+  <luceneMatchVersion>${tests.luceneMatchVersion:LUCENE_CURRENT}</luceneMatchVersion>
+  <dataDir>${solr.data.dir:}</dataDir>
+  <directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+
+  <xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+  </updateHandler>
+
+  <requestHandler name="standard" class="solr.StandardRequestHandler">
+    <bool name="httpCaching">true</bool>
+  </requestHandler>
+
+  <requestHandler name="/replication" class="solr.ReplicationHandler">
+    <lst name="defaults">
+      <str name="maxWriteMBPerSec">0.1</str>
+    </lst>
+  </requestHandler>
+
+  <!-- test query parameter defaults -->
+  <requestHandler name="defaults" class="solr.StandardRequestHandler">
+    <lst name="defaults">
+      <int name="rows">4</int>
+      <bool name="hl">true</bool>
+      <str name="hl.fl">text,name,subject,title,whitetok</str>
+    </lst>
+  </requestHandler>
+
+  <!-- test query parameter defaults -->
+  <requestHandler name="lazy" class="solr.StandardRequestHandler" startup="lazy">
+    <lst name="defaults">
+      <int name="rows">4</int>
+      <bool name="hl">true</bool>
+      <str name="hl.fl">text,name,subject,title,whitetok</str>
+    </lst>
+  </requestHandler>
+
+  <requestHandler name="/update" class="solr.UpdateRequestHandler"  />
+
+  <!-- enable streaming for testing... -->
+  <requestDispatcher handleSelect="true">
+    <requestParsers enableRemoteStreaming="true" multipartUploadLimitInKB="2048"/>
+    <httpCaching lastModifiedFrom="openTime" etagSeed="Solr" never304="false">
+      <cacheControl>max-age=30, public</cacheControl>
+    </httpCaching>
+  </requestDispatcher>
+
+</config>

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java?rev=1627340&r1=1627339&r2=1627340&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java Wed Sep 24 14:47:37 2014
@@ -28,12 +28,16 @@ import java.io.Writer;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.lucene.util.TestUtil;
@@ -485,13 +489,9 @@ public class TestReplicationHandler exte
   private void invokeReplicationCommand(int pJettyPort, String pCommand) throws IOException
   {
     String masterUrl = buildUrl(pJettyPort) + "/replication?command=" + pCommand;
-    try {
-      URL u = new URL(masterUrl);
-      InputStream stream = u.openStream();
-      stream.close();
-    } catch (IOException e) {
-      //e.printStackTrace();
-    }    
+    URL u = new URL(masterUrl);
+    InputStream stream = u.openStream();
+    stream.close();
   }
   
   @Test
@@ -639,14 +639,6 @@ public class TestReplicationHandler exte
     
     // todo: make SolrJ easier to pass arbitrary params to
     // TODO: precommit WILL screw with the rest of this test
-    String masterUrl = buildUrl(masterJetty.getLocalPort()) + "/update?prepareCommit=true";
-    URL url = new URL(masterUrl);
-//    InputStream stream = url.openStream();
-//    try {
-//      stream.close();
-//    } catch (IOException e) {
-//      //e.printStackTrace();
-//    }
 
     masterClient.commit();
 
@@ -655,15 +647,11 @@ public class TestReplicationHandler exte
     assertEquals(nDocs, masterQueryResult.getNumFound());
 
     // snappull
-    masterUrl = buildUrl(slaveJetty.getLocalPort()) + "/replication?command=fetchindex&masterUrl=";
+    String masterUrl = buildUrl(slaveJetty.getLocalPort()) + "/replication?command=fetchindex&masterUrl=";
     masterUrl += buildUrl(masterJetty.getLocalPort()) + "/replication";
-    url = new URL(masterUrl);
+    URL url = new URL(masterUrl);
     InputStream stream = url.openStream();
-    try {
-      stream.close();
-    } catch (IOException e) {
-      //e.printStackTrace();
-    }
+    stream.close();
     
     //get docs from slave and check if number is equal to master
     NamedList slaveQueryRsp = rQuery(nDocs, "*:*", slaveClient);
@@ -1005,7 +993,7 @@ public class TestReplicationHandler exte
     pullFromTo(slaveJetty, masterJetty);
   }
   
-  private void pullFromTo(JettySolrRunner from, JettySolrRunner to) throws MalformedURLException, IOException {
+  private void pullFromTo(JettySolrRunner from, JettySolrRunner to) throws IOException {
     String masterUrl;
     URL url;
     InputStream stream;
@@ -1014,11 +1002,7 @@ public class TestReplicationHandler exte
         + buildUrl(from.getLocalPort()) + "/replication";
     url = new URL(masterUrl);
     stream = url.openStream();
-    try {
-      stream.close();
-    } catch (IOException e) {
-      // e.printStackTrace();
-    }
+    stream.close();
   }
 
   @Test
@@ -1295,6 +1279,110 @@ public class TestReplicationHandler exte
     checkForSingleIndex(masterJetty);
     checkForSingleIndex(slaveJetty);
   }
+
+  @Test
+  public void testRateLimitedReplication() throws Exception {
+
+    //clean index
+    masterClient.deleteByQuery("*:*");
+    slaveClient.deleteByQuery("*:*");
+    masterClient.commit();
+    slaveClient.commit();
+
+    masterJetty.stop();
+    slaveJetty.stop();
+
+    //Start master with the new solrconfig
+    master.copyConfigFile(CONF_DIR + "solrconfig-master-throttled.xml", "solrconfig.xml");
+    useFactory(null);
+    masterJetty = createJetty(master);
+    masterClient.shutdown();
+    masterClient = createNewSolrServer(masterJetty.getLocalPort());
+
+    //index docs
+    final int totalDocs = TestUtil.nextInt(random(), 50, 100);
+    for (int i = 0; i < totalDocs; i++)
+      index(masterClient, "id", i, "name", TestUtil.randomSimpleString(random(), 1000 , 5000));
+
+    masterClient.commit();
+
+    //Check Index Size
+    String dataDir = master.getDataDir();
+    masterClient.shutdown();
+    masterJetty.stop();
+
+    Directory dir = FSDirectory.open(Paths.get(dataDir, "index"));
+    String[] files = dir.listAll();
+    long totalBytes = 0;
+    for(String file : files) {
+      totalBytes += dir.fileLength(file);
+    }
+
+    float approximateTimeInSeconds = Math.round( totalBytes/1024/1024/0.1 ); // maxWriteMBPerSec=0.1 in solrconfig
+
+    //Start again and replicate the data
+    useFactory(null);
+    masterJetty = createJetty(master);
+    masterClient = createNewSolrServer(masterJetty.getLocalPort());
+
+    //start slave
+    slave.setTestPort(masterJetty.getLocalPort());
+    slave.copyConfigFile(CONF_DIR + "solrconfig-slave1.xml", "solrconfig.xml");
+    slaveJetty = createJetty(slave);
+    slaveClient.shutdown();
+    slaveClient = createNewSolrServer(slaveJetty.getLocalPort());
+
+    long startTime = System.nanoTime();
+
+    pullFromMasterToSlave();
+
+    //Add a few more docs in the master. Just to make sure that we are replicating the correct index point
+    //These extra docs should not get replicated
+    new Thread(new AddExtraDocs(masterClient, totalDocs)).start();
+
+    //Wait and make sure that it actually replicated correctly.
+    NamedList slaveQueryRsp = rQuery(totalDocs, "*:*", slaveClient);
+    SolrDocumentList slaveQueryResult = (SolrDocumentList) slaveQueryRsp.get("response");
+    assertEquals(totalDocs, slaveQueryResult.getNumFound());
+
+    long timeTaken = System.nanoTime() - startTime;
+
+    long timeTakenInSeconds = TimeUnit.SECONDS.convert(timeTaken, TimeUnit.NANOSECONDS);
+
+    //Let's make sure it took more than approximateTimeInSeconds to make sure that it was throttled
+    boolean isElapsed = false;
+    if(timeTakenInSeconds - approximateTimeInSeconds > 0) {
+      isElapsed = true;
+    }
+    assertTrue(isElapsed);
+  }
+
+  private class AddExtraDocs implements Runnable {
+
+    SolrServer masterClient;
+    int startId;
+    public AddExtraDocs(SolrServer masterClient, int startId) {
+      this.masterClient = masterClient;
+      this.startId = startId;
+    }
+
+    @Override
+    public void run() {
+      final int totalDocs = TestUtil.nextInt(random(), 1, 10);
+      for (int i = 0; i < totalDocs; i++) {
+        try {
+          index(masterClient, "id", i + startId, "name", TestUtil.randomSimpleString(random(), 1000 , 5000));
+        } catch (Exception e) {
+          //Do nothing. Wasn't able to add doc.
+        }
+      }
+      try {
+        masterClient.commit();
+      } catch (Exception e) {
+        //Do nothing. No extra doc got committed.
+      }
+    }
+  }
   
   /**
    * character copy of file using UTF-8. If port is non-null, will be substituted any time "TEST_PORT" is found.

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java?rev=1627340&r1=1627339&r2=1627340&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java Wed Sep 24 14:47:37 2014
@@ -121,6 +121,17 @@ public abstract class SolrParams impleme
     }
   }
 
+  /** Returns the Long value of the param, or null if not set */
+  public Long getLong(String param, Long def) {
+    String val = get(param);
+    try {
+      return val== null ? def : Long.parseLong(val);
+    }
+    catch( Exception ex ) {
+      throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, ex.getMessage(), ex );
+    }
+  }
+
   /** Returns the int value of the param, or def if not set */
   public int getInt(String param, int def) {
     String val = get(param);