You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2012/08/09 21:16:11 UTC

svn commit: r1371391 - in /lucene/dev/branches/branch_4x: ./ dev-tools/ lucene/ lucene/analysis/ lucene/analysis/icu/src/java/org/apache/lucene/collation/ lucene/backwards/ lucene/benchmark/ lucene/core/ lucene/demo/ lucene/facet/ lucene/grouping/ luce...

Author: yonik
Date: Thu Aug  9 19:16:09 2012
New Revision: 1371391

URL: http://svn.apache.org/viewvc?rev=1371391&view=rev
Log:
SOLR-3715: remove sync around tlog serialization

Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/dev-tools/   (props changed)
    lucene/dev/branches/branch_4x/lucene/   (props changed)
    lucene/dev/branches/branch_4x/lucene/BUILD.txt   (props changed)
    lucene/dev/branches/branch_4x/lucene/CHANGES.txt   (props changed)
    lucene/dev/branches/branch_4x/lucene/JRE_VERSION_MIGRATION.txt   (props changed)
    lucene/dev/branches/branch_4x/lucene/LICENSE.txt   (props changed)
    lucene/dev/branches/branch_4x/lucene/MIGRATE.txt   (props changed)
    lucene/dev/branches/branch_4x/lucene/NOTICE.txt   (props changed)
    lucene/dev/branches/branch_4x/lucene/README.txt   (props changed)
    lucene/dev/branches/branch_4x/lucene/analysis/   (props changed)
    lucene/dev/branches/branch_4x/lucene/analysis/icu/src/java/org/apache/lucene/collation/ICUCollationKeyFilterFactory.java   (props changed)
    lucene/dev/branches/branch_4x/lucene/backwards/   (props changed)
    lucene/dev/branches/branch_4x/lucene/benchmark/   (props changed)
    lucene/dev/branches/branch_4x/lucene/build.xml   (props changed)
    lucene/dev/branches/branch_4x/lucene/common-build.xml   (props changed)
    lucene/dev/branches/branch_4x/lucene/core/   (props changed)
    lucene/dev/branches/branch_4x/lucene/demo/   (props changed)
    lucene/dev/branches/branch_4x/lucene/facet/   (props changed)
    lucene/dev/branches/branch_4x/lucene/grouping/   (props changed)
    lucene/dev/branches/branch_4x/lucene/highlighter/   (props changed)
    lucene/dev/branches/branch_4x/lucene/ivy-settings.xml   (props changed)
    lucene/dev/branches/branch_4x/lucene/join/   (props changed)
    lucene/dev/branches/branch_4x/lucene/licenses/   (props changed)
    lucene/dev/branches/branch_4x/lucene/memory/   (props changed)
    lucene/dev/branches/branch_4x/lucene/misc/   (props changed)
    lucene/dev/branches/branch_4x/lucene/module-build.xml   (props changed)
    lucene/dev/branches/branch_4x/lucene/queries/   (props changed)
    lucene/dev/branches/branch_4x/lucene/queryparser/   (props changed)
    lucene/dev/branches/branch_4x/lucene/sandbox/   (props changed)
    lucene/dev/branches/branch_4x/lucene/site/   (props changed)
    lucene/dev/branches/branch_4x/lucene/spatial/   (props changed)
    lucene/dev/branches/branch_4x/lucene/suggest/   (props changed)
    lucene/dev/branches/branch_4x/lucene/test-framework/   (props changed)
    lucene/dev/branches/branch_4x/lucene/tools/   (props changed)
    lucene/dev/branches/branch_4x/solr/   (props changed)
    lucene/dev/branches/branch_4x/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_4x/solr/LICENSE.txt   (props changed)
    lucene/dev/branches/branch_4x/solr/NOTICE.txt   (props changed)
    lucene/dev/branches/branch_4x/solr/README.txt   (props changed)
    lucene/dev/branches/branch_4x/solr/build.xml   (props changed)
    lucene/dev/branches/branch_4x/solr/cloud-dev/   (props changed)
    lucene/dev/branches/branch_4x/solr/common-build.xml   (props changed)
    lucene/dev/branches/branch_4x/solr/contrib/   (props changed)
    lucene/dev/branches/branch_4x/solr/core/   (props changed)
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/TransactionLog.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/util/FastWriter.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/search/TestSolrJ.java
    lucene/dev/branches/branch_4x/solr/dev-tools/   (props changed)
    lucene/dev/branches/branch_4x/solr/example/   (props changed)
    lucene/dev/branches/branch_4x/solr/lib/   (props changed)
    lucene/dev/branches/branch_4x/solr/licenses/   (props changed)
    lucene/dev/branches/branch_4x/solr/licenses/httpclient-LICENSE-ASL.txt   (props changed)
    lucene/dev/branches/branch_4x/solr/licenses/httpclient-NOTICE.txt   (props changed)
    lucene/dev/branches/branch_4x/solr/licenses/httpcore-LICENSE-ASL.txt   (props changed)
    lucene/dev/branches/branch_4x/solr/licenses/httpcore-NOTICE.txt   (props changed)
    lucene/dev/branches/branch_4x/solr/licenses/httpmime-LICENSE-ASL.txt   (props changed)
    lucene/dev/branches/branch_4x/solr/licenses/httpmime-NOTICE.txt   (props changed)
    lucene/dev/branches/branch_4x/solr/scripts/   (props changed)
    lucene/dev/branches/branch_4x/solr/solrj/   (props changed)
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java
    lucene/dev/branches/branch_4x/solr/test-framework/   (props changed)
    lucene/dev/branches/branch_4x/solr/testlogging.properties   (props changed)
    lucene/dev/branches/branch_4x/solr/webapp/   (props changed)

Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1371391&r1=1371390&r2=1371391&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Thu Aug  9 19:16:09 2012
@@ -27,6 +27,13 @@ Upgrading from Solr 4.0.0-BETA
 
 In order to better support distributed search mode, the TermVectorComponent's response format has been changed so that if the schema defines a uniqueKeyField, then that field value is used as the "key" for each document in it's response section, instead of the internal lucene doc id.  Users w/o a uniqueKeyField will continue to see the same response format.  See SOLR-3229 for more details.
 
+Optimizations
+----------------------
+
+* SOLR-3715: improve concurrency of the transaction log by removing
+  synchronization around log record serialization. (yonik)
+
+
 Bug Fixes
 ----------------------
 

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1371391&r1=1371390&r2=1371391&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/TransactionLog.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/TransactionLog.java Thu Aug  9 19:16:09 2012
@@ -34,9 +34,11 @@ import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -149,7 +151,8 @@ public class TransactionLog {
       long start = raf.length();
       channel = raf.getChannel();
       os = Channels.newOutputStream(channel);
-      fos = FastOutputStream.wrap(os);
+      fos = new FastOutputStream(os, new byte[65536], 0);
+      // fos = FastOutputStream.wrap(os);
 
       if (openExisting) {
         if (start > 0) {
@@ -300,93 +303,119 @@ public class TransactionLog {
     numRecords++;
   }
 
+  private void checkWriteHeader(LogCodec codec, SolrInputDocument optional) throws IOException {
+
+    // Unsynchronized access.  We can get away with an unsynchronized access here
+    // since we will never get a false non-zero when the position is in fact 0.
+    // rollback() is the only function that can reset to zero, and it blocks updates.
+    if (fos.size() != 0) return;
+
+    synchronized (this) {
+      if (fos.size() != 0) return;  // check again while synchronized
+      if (optional != null) {
+        addGlobalStrings(optional.getFieldNames());
+      }
+      writeLogHeader(codec);
+    }
+  }
+
+  int lastAddSize;
 
   public long write(AddUpdateCommand cmd, int flags) {
     LogCodec codec = new LogCodec();
-    long pos = 0;
-    synchronized (this) {
-      try {
-        pos = fos.size();   // if we had flushed, this should be equal to channel.position()
-        SolrInputDocument sdoc = cmd.getSolrInputDocument();
+    SolrInputDocument sdoc = cmd.getSolrInputDocument();
 
-        if (pos == 0) { // TODO: needs to be changed if we start writing a header first
-          addGlobalStrings(sdoc.getFieldNames());
-          writeLogHeader(codec);
-          pos = fos.size();
-        }
+    try {
+      checkWriteHeader(codec, sdoc);
+
+      // adaptive buffer sizing
+      int bufSize = lastAddSize;    // unsynchronized access of lastAddSize should be fine
+      bufSize = Math.min(1024*1024, bufSize+(bufSize>>3)+256);
+
+      MemOutputStream out = new MemOutputStream(new byte[bufSize]);
+      codec.init(out);
+      codec.writeTag(JavaBinCodec.ARR, 3);
+      codec.writeInt(UpdateLog.ADD | flags);  // should just take one byte
+      codec.writeLong(cmd.getVersion());
+      codec.writeSolrInputDocument(cmd.getSolrInputDocument());
+      lastAddSize = (int)out.size();
+
+      synchronized (this) {
+        long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
+        assert pos != 0;
 
         /***
-        System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
+         System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
          if (pos != fos.size()) {
-          throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
-        }
+         throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
+         }
          ***/
 
-        codec.init(fos);
-        codec.writeTag(JavaBinCodec.ARR, 3);
-        codec.writeInt(UpdateLog.ADD | flags);  // should just take one byte
-        codec.writeLong(cmd.getVersion());
-        codec.writeSolrInputDocument(cmd.getSolrInputDocument());
-
+        out.writeAll(fos);
         endRecord(pos);
         // fos.flushBuffer();  // flush later
         return pos;
-      } catch (IOException e) {
-        // TODO: reset our file pointer back to "pos", the start of this record.
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error logging add", e);
       }
+
+    } catch (IOException e) {
+      // TODO: reset our file pointer back to "pos", the start of this record.
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error logging add", e);
     }
   }
 
   public long writeDelete(DeleteUpdateCommand cmd, int flags) {
     LogCodec codec = new LogCodec();
-    synchronized (this) {
-      try {
-        long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
-        if (pos == 0) {
-          writeLogHeader(codec);
-          pos = fos.size();
-        }
-        codec.init(fos);
-        codec.writeTag(JavaBinCodec.ARR, 3);
-        codec.writeInt(UpdateLog.DELETE | flags);  // should just take one byte
-        codec.writeLong(cmd.getVersion());
-        BytesRef br = cmd.getIndexedId();
-        codec.writeByteArray(br.bytes, br.offset, br.length);
 
+    try {
+      checkWriteHeader(codec, null);
+
+      BytesRef br = cmd.getIndexedId();
+
+      MemOutputStream out = new MemOutputStream(new byte[20 + br.length]);
+      codec.init(out);
+      codec.writeTag(JavaBinCodec.ARR, 3);
+      codec.writeInt(UpdateLog.DELETE | flags);  // should just take one byte
+      codec.writeLong(cmd.getVersion());
+      codec.writeByteArray(br.bytes, br.offset, br.length);
+
+      synchronized (this) {
+        long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
+        assert pos != 0;
+        out.writeAll(fos);
         endRecord(pos);
         // fos.flushBuffer();  // flush later
-
         return pos;
-      } catch (IOException e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       }
+
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
     }
+
   }
 
   public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) {
     LogCodec codec = new LogCodec();
-    synchronized (this) {
-      try {
-        long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
-        if (pos == 0) {
-          writeLogHeader(codec);
-          pos = fos.size();
-        }
-        codec.init(fos);
-        codec.writeTag(JavaBinCodec.ARR, 3);
-        codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags);  // should just take one byte
-        codec.writeLong(cmd.getVersion());
-        codec.writeStr(cmd.query);
+    try {
+      checkWriteHeader(codec, null);
 
+      MemOutputStream out = new MemOutputStream(new byte[20 + (cmd.query.length())]);
+      codec.init(out);
+      codec.writeTag(JavaBinCodec.ARR, 3);
+      codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags);  // should just take one byte
+      codec.writeLong(cmd.getVersion());
+      codec.writeStr(cmd.query);
+
+      synchronized (this) {
+        long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
+        out.writeAll(fos);
         endRecord(pos);
         // fos.flushBuffer();  // flush later
-
         return pos;
+      }
       } catch (IOException e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       }
-    }
+
   }
 
 
@@ -745,3 +774,32 @@ class ChannelFastInputStream extends Fas
   }
 }
 
+
+class MemOutputStream extends FastOutputStream {
+  public List<byte[]> buffers = new LinkedList<byte[]>();
+  public MemOutputStream(byte[] tempBuffer) {
+    super(null, tempBuffer, 0);
+  }
+
+  @Override
+  public void flush(byte[] arr, int offset, int len) throws IOException {
+    if (arr == buf && offset==0 && len==buf.length) {
+      buffers.add(buf);  // steal the buffer
+      buf = new byte[8192];
+    } else if (len > 0) {
+      byte[] newBuf = new byte[len];
+      System.arraycopy(arr, offset, newBuf, 0, len);
+      buffers.add(newBuf);
+    }
+  }
+
+  public void writeAll(FastOutputStream fos) throws IOException {
+    for (byte[] buffer : buffers) {
+      fos.write(buffer);
+    }
+    if (pos > 0) {
+      fos.write(buf, 0, pos);
+    }
+  }
+}
+

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/util/FastWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/util/FastWriter.java?rev=1371391&r1=1371390&r2=1371391&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/util/FastWriter.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/util/FastWriter.java Thu Aug  9 19:16:09 2012
@@ -52,7 +52,7 @@ public class FastWriter extends Writer {
 
   public void write(char c) throws IOException {
     if (pos >= buf.length) {
-      sink.write(buf,0,pos);
+      flush(buf,0,pos);
       pos=0;
     }
     buf[pos++] = c;
@@ -61,7 +61,7 @@ public class FastWriter extends Writer {
   @Override
   public FastWriter append(char c) throws IOException {
     if (pos >= buf.length) {
-      sink.write(buf,0,pos);
+      flush(buf,0,pos);
       pos=0;
     }
     buf[pos++] = c;
@@ -77,14 +77,14 @@ public class FastWriter extends Writer {
     } else if (len<BUFSIZE) {
       // if the data to write is small enough, buffer it.
       System.arraycopy(cbuf, off, buf, pos, space);
-      sink.write(buf, 0, buf.length);
+      flush(buf, 0, buf.length);
       pos = len-space;
       System.arraycopy(cbuf, off+space, buf, 0, pos);
     } else {
-      sink.write(buf,0,pos);  // flush
+      flush(buf,0,pos);  // flush
       pos=0;
       // don't buffer, just write to sink
-      sink.write(cbuf, off, len);
+      flush(cbuf, off, len);
     }
   }
 
@@ -97,32 +97,40 @@ public class FastWriter extends Writer {
     } else if (len<BUFSIZE) {
       // if the data to write is small enough, buffer it.
       str.getChars(off, off+space, buf, pos);
-      sink.write(buf, 0, buf.length);
+      flush(buf, 0, buf.length);
       str.getChars(off+space, off+len, buf, 0);
       pos = len-space;
     } else {
-      sink.write(buf,0,pos);  // flush
+      flush(buf,0,pos);  // flush
       pos=0;
       // don't buffer, just write to sink
-      sink.write(str, off, len);
+      flush(str, off, len);
     }
   }
 
   @Override
   public void flush() throws IOException {
-    sink.write(buf,0,pos);
+    flush(buf, 0, pos);
     pos=0;
-    sink.flush();
+    if (sink != null) sink.flush();
+  }
+
+  public void flush(char[] buf, int offset, int len) throws IOException {
+    sink.write(buf, offset, len);
+  }
+
+  public void flush(String str, int offset, int len) throws IOException {
+    sink.write(str, offset, len);
   }
 
   @Override
   public void close() throws IOException {
     flush();
-    sink.close();
+    if (sink != null) sink.close();
   }
 
   public void flushBuffer() throws IOException {
-    sink.write(buf, 0, pos);
+    flush(buf, 0, pos);
     pos=0;
   }
 }

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/search/TestSolrJ.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/search/TestSolrJ.java?rev=1371391&r1=1371390&r2=1371391&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/search/TestSolrJ.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/search/TestSolrJ.java Thu Aug  9 19:16:09 2012
@@ -18,24 +18,149 @@
 package org.apache.solr.search;
 
 
-import org.apache.lucene.util.OpenBitSet;
 import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrServer;
 import org.apache.solr.client.solrj.impl.HttpSolrServer;
-import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.request.SolrQueryRequest;
-import org.junit.BeforeClass;
-import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
+import java.util.Random;
+
 
 public class TestSolrJ extends SolrTestCaseJ4 {
 
-  public void testSolrJ() {
+  public void testSolrJ() throws Exception {
+                          // docs, producers, connections, sleep_time
+    //  main(new String[] {"1000000","4", "1", "0"});
+
     // doCommitPerf();
   }
 
+  public static SolrServer server;
+  public static String idField = "id";
+  public static Exception ex;
+
+  public static void main(String[] args) throws Exception {
+    // String addr = "http://odin.local:80/solr";
+    // String addr = "http://odin.local:8983/solr";
+    String addr = "http://localhost:8983/solr";
+
+    int i = 0;
+    final int nDocs = Integer.parseInt(args[i++]);
+    final int nProducers = Integer.parseInt(args[i++]);
+    final int nConnections = Integer.parseInt(args[i++]);
+    final int maxSleep = Integer.parseInt(args[i++]);
+
+    ConcurrentUpdateSolrServer sserver = null;
+
+    // server = sserver = new ConcurrentUpdateSolrServer(addr,32,8);
+    server = sserver = new ConcurrentUpdateSolrServer(addr,64,nConnections);
+
+    server.deleteByQuery("*:*");
+    server.commit();
+
+    long start = System.currentTimeMillis();
+
+    final int docsPerThread = nDocs / nProducers;
+
+    Thread[] threads = new Thread[nProducers];
+
+    for (int threadNum = 0; threadNum<nProducers; threadNum++) {
+      final int base = threadNum * docsPerThread;
+
+      threads[threadNum] = new Thread("add-thread"+i) {
+        public void run(){
+          try {
+            indexDocs(base, docsPerThread, maxSleep);
+          } catch (Exception e) {
+            System.out.println("###############################CAUGHT EXCEPTION");
+            e.printStackTrace();
+            ex = e;
+          }
+        }
+      };
+      threads[threadNum].start();
+    }
+
+    // optional: wait for commit?
+
+    for (int threadNum = 0; threadNum<nProducers; threadNum++) {
+      threads[threadNum].join();
+    }
+
+    if (sserver != null) {
+      sserver.blockUntilFinished();
+    }
+
+    long end = System.currentTimeMillis();
+    System.out.println("time="+(end-start) + " throughput="+(nDocs*1000/(end-start)) + " Exception="+ex);
+
+    // should server threads be marked as daemon?
+    // need a server.close()!!!
+  }
+
+  public static SolrInputDocument getDocument(int docnum) {
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.setField(idField, docnum );
+    doc.setField("cat", Integer.toString(docnum&0x0f) );
+    doc.setField("name", "my name is " + Integer.toString(docnum&0xff) );
+    doc.setField("foo_t", "now is the time for all good men to come to the aid of their country" );
+    doc.setField("foo_i", Integer.toString(docnum&0x0f) );
+    doc.setField("foo_s", Integer.toString(docnum&0xff) );
+    doc.setField("foo_b", Boolean.toString( (docnum&0x01) == 1) );
+    doc.setField("parent_s", Integer.toString(docnum-1) );
+    doc.setField("price", Integer.toString(docnum >> 4));
+
+    int golden = (int)2654435761L;
+    int h = docnum * golden;
+    int n = (h & 0xff) + 1;
+    List lst = new ArrayList(n);
+    for (int i=0; i<n; i++) {
+      h = (h+i) * golden;
+      lst.add(h & 0xfff);
+    }
+
+    doc.setField("num_is", lst);
+    return doc;
+  }
+
+  public static void indexDocs(int base, int count, int maxSleep) throws IOException, SolrServerException {
+    Random r = new Random();
+
+    for (int i=base; i<count+base; i++) {
+      if ((i & 0xfffff) == 0) {
+        System.out.print("\n% " + new Date()+ "\t" + i + "\t");
+        System.out.flush();
+      }
+
+      if ((i & 0xffff) == 0) {
+        System.out.print(".");
+        System.out.flush();
+      }
+
+      SolrInputDocument doc = getDocument(i);
+      server.add(doc);
+
+      if (maxSleep > 0) {
+        int sleep = r.nextInt(maxSleep);
+        try {
+          Thread.sleep(sleep);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          e.printStackTrace();
+          throw new RuntimeException(e);
+        }
+      }
+
+    }
+  }
+
+
   public void doCommitPerf() throws Exception {
     HttpSolrServer client = new HttpSolrServer("http://localhost:8983/solr");
 
@@ -55,4 +180,7 @@ public class TestSolrJ extends SolrTestC
     System.out.println("TIME: " + (end-start));
   }
 
+
+
+
 }

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java?rev=1371391&r1=1371390&r2=1371391&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java Thu Aug  9 19:16:09 2012
@@ -23,10 +23,10 @@ import java.io.*;
  *  Internal Solr use only, subject to change.
  */
 public class FastOutputStream extends OutputStream implements DataOutput {
-  private final OutputStream out;
-  private final byte[] buf;
-  private long written;  // how many bytes written to the underlying stream
-  private int pos;
+  protected final OutputStream out;
+  protected byte[] buf;
+  protected long written;  // how many bytes written to the underlying stream
+  protected int pos;
 
   public FastOutputStream(OutputStream w) {
   // use default BUFSIZE of BufferedOutputStream so if we wrap that
@@ -57,7 +57,7 @@ public class FastOutputStream extends Ou
 
   public void write(byte b) throws IOException {
     if (pos >= buf.length) {
-      out.write(buf);
+      flush(buf, 0, buf.length);
       written += pos;
       pos=0;
     }
@@ -73,18 +73,18 @@ public class FastOutputStream extends Ou
     } else if (len<buf.length) {
       // if the data to write is small enough, buffer it.
       System.arraycopy(arr, off, buf, pos, space);
-      out.write(buf);
+      flush(buf, 0, buf.length);
       written += buf.length;
       pos = len-space;
       System.arraycopy(arr, off+space, buf, 0, pos);
     } else {
       if (pos>0) {
-        out.write(buf,0,pos);  // flush
+        flush(buf,0,pos);  // flush
         written += pos;
         pos=0;
       }
       // don't buffer, just write to sink
-      out.write(arr, off, len);
+      flush(arr, off, len);
       written += len;            
     }
   }
@@ -168,13 +168,13 @@ public class FastOutputStream extends Ou
   @Override
   public void flush() throws IOException {
     flushBuffer();
-    out.flush();
+    if (out != null) out.flush();
   }
 
   @Override
   public void close() throws IOException {
     flushBuffer();
-    out.close();
+    if (out != null) out.close();
   }
 
   /** Only flushes the buffer of the FastOutputStream, not that of the
@@ -182,12 +182,17 @@ public class FastOutputStream extends Ou
    */
   public void flushBuffer() throws IOException {
     if (pos > 0) {
-      out.write(buf, 0, pos);
+      flush(buf, 0, pos);
       written += pos;
       pos=0;
     }
   }
 
+  /** All writes to the sink will go through this method */
+  public void flush(byte[] buf, int offset, int len) throws IOException {
+    out.write(buf, offset, len);
+  }
+
   public long size() {
     return written + pos;
   }