You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2012/08/09 20:53:20 UTC
svn commit: r1371379 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/update/ core/src/java/org/apache/solr/util/
core/src/test/org/apache/solr/search/
solrj/src/java/org/apache/solr/common/util/
Author: yonik
Date: Thu Aug 9 18:53:19 2012
New Revision: 1371379
URL: http://svn.apache.org/viewvc?rev=1371379&view=rev
Log:
SOLR-3715: remove sync around tlog serialization
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/FastWriter.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestSolrJ.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1371379&r1=1371378&r2=1371379&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Thu Aug 9 18:53:19 2012
@@ -31,6 +31,13 @@ Upgrading from Solr 4.0.0-BETA
In order to better support distributed search mode, the TermVectorComponent's response format has been changed so that if the schema defines a uniqueKeyField, then that field value is used as the "key" for each document in it's response section, instead of the internal lucene doc id. Users w/o a uniqueKeyField will continue to see the same response format. See SOLR-3229 for more details.
+Optimizations
+----------------------
+
+* SOLR-3715: improve concurrency of the transaction log by removing
+ synchronization around log record serialization. (yonik)
+
+
Bug Fixes
----------------------
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1371379&r1=1371378&r2=1371379&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java Thu Aug 9 18:53:19 2012
@@ -34,9 +34,11 @@ import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -149,7 +151,8 @@ public class TransactionLog {
long start = raf.length();
channel = raf.getChannel();
os = Channels.newOutputStream(channel);
- fos = FastOutputStream.wrap(os);
+ fos = new FastOutputStream(os, new byte[65536], 0);
+ // fos = FastOutputStream.wrap(os);
if (openExisting) {
if (start > 0) {
@@ -300,93 +303,119 @@ public class TransactionLog {
numRecords++;
}
+ private void checkWriteHeader(LogCodec codec, SolrInputDocument optional) throws IOException {
+
+ // Unsynchronized access. We can get away with an unsynchronized access here
+ // since we will never get a false non-zero when the position is in fact 0.
+ // rollback() is the only function that can reset to zero, and it blocks updates.
+ if (fos.size() != 0) return;
+
+ synchronized (this) {
+ if (fos.size() != 0) return; // check again while synchronized
+ if (optional != null) {
+ addGlobalStrings(optional.getFieldNames());
+ }
+ writeLogHeader(codec);
+ }
+ }
+
+ int lastAddSize;
public long write(AddUpdateCommand cmd, int flags) {
LogCodec codec = new LogCodec();
- long pos = 0;
- synchronized (this) {
- try {
- pos = fos.size(); // if we had flushed, this should be equal to channel.position()
- SolrInputDocument sdoc = cmd.getSolrInputDocument();
+ SolrInputDocument sdoc = cmd.getSolrInputDocument();
- if (pos == 0) { // TODO: needs to be changed if we start writing a header first
- addGlobalStrings(sdoc.getFieldNames());
- writeLogHeader(codec);
- pos = fos.size();
- }
+ try {
+ checkWriteHeader(codec, sdoc);
+
+ // adaptive buffer sizing
+ int bufSize = lastAddSize; // unsynchronized access of lastAddSize should be fine
+ bufSize = Math.min(1024*1024, bufSize+(bufSize>>3)+256);
+
+ MemOutputStream out = new MemOutputStream(new byte[bufSize]);
+ codec.init(out);
+ codec.writeTag(JavaBinCodec.ARR, 3);
+ codec.writeInt(UpdateLog.ADD | flags); // should just take one byte
+ codec.writeLong(cmd.getVersion());
+ codec.writeSolrInputDocument(cmd.getSolrInputDocument());
+ lastAddSize = (int)out.size();
+
+ synchronized (this) {
+ long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
+ assert pos != 0;
/***
- System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
+ System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
if (pos != fos.size()) {
- throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
- }
+ throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
+ }
***/
- codec.init(fos);
- codec.writeTag(JavaBinCodec.ARR, 3);
- codec.writeInt(UpdateLog.ADD | flags); // should just take one byte
- codec.writeLong(cmd.getVersion());
- codec.writeSolrInputDocument(cmd.getSolrInputDocument());
-
+ out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
- } catch (IOException e) {
- // TODO: reset our file pointer back to "pos", the start of this record.
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error logging add", e);
}
+
+ } catch (IOException e) {
+ // TODO: reset our file pointer back to "pos", the start of this record.
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error logging add", e);
}
}
public long writeDelete(DeleteUpdateCommand cmd, int flags) {
LogCodec codec = new LogCodec();
- synchronized (this) {
- try {
- long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
- if (pos == 0) {
- writeLogHeader(codec);
- pos = fos.size();
- }
- codec.init(fos);
- codec.writeTag(JavaBinCodec.ARR, 3);
- codec.writeInt(UpdateLog.DELETE | flags); // should just take one byte
- codec.writeLong(cmd.getVersion());
- BytesRef br = cmd.getIndexedId();
- codec.writeByteArray(br.bytes, br.offset, br.length);
+ try {
+ checkWriteHeader(codec, null);
+
+ BytesRef br = cmd.getIndexedId();
+
+ MemOutputStream out = new MemOutputStream(new byte[20 + br.length]);
+ codec.init(out);
+ codec.writeTag(JavaBinCodec.ARR, 3);
+ codec.writeInt(UpdateLog.DELETE | flags); // should just take one byte
+ codec.writeLong(cmd.getVersion());
+ codec.writeByteArray(br.bytes, br.offset, br.length);
+
+ synchronized (this) {
+ long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
+ assert pos != 0;
+ out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
-
return pos;
- } catch (IOException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
+
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
+
}
public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) {
LogCodec codec = new LogCodec();
- synchronized (this) {
- try {
- long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
- if (pos == 0) {
- writeLogHeader(codec);
- pos = fos.size();
- }
- codec.init(fos);
- codec.writeTag(JavaBinCodec.ARR, 3);
- codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags); // should just take one byte
- codec.writeLong(cmd.getVersion());
- codec.writeStr(cmd.query);
+ try {
+ checkWriteHeader(codec, null);
+ MemOutputStream out = new MemOutputStream(new byte[20 + (cmd.query.length())]);
+ codec.init(out);
+ codec.writeTag(JavaBinCodec.ARR, 3);
+ codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags); // should just take one byte
+ codec.writeLong(cmd.getVersion());
+ codec.writeStr(cmd.query);
+
+ synchronized (this) {
+ long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
+ out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
-
return pos;
+ }
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
- }
+
}
@@ -745,3 +774,32 @@ class ChannelFastInputStream extends Fas
}
}
+
+class MemOutputStream extends FastOutputStream {
+ public List<byte[]> buffers = new LinkedList<byte[]>();
+ public MemOutputStream(byte[] tempBuffer) {
+ super(null, tempBuffer, 0);
+ }
+
+ @Override
+ public void flush(byte[] arr, int offset, int len) throws IOException {
+ if (arr == buf && offset==0 && len==buf.length) {
+ buffers.add(buf); // steal the buffer
+ buf = new byte[8192];
+ } else if (len > 0) {
+ byte[] newBuf = new byte[len];
+ System.arraycopy(arr, offset, newBuf, 0, len);
+ buffers.add(newBuf);
+ }
+ }
+
+ public void writeAll(FastOutputStream fos) throws IOException {
+ for (byte[] buffer : buffers) {
+ fos.write(buffer);
+ }
+ if (pos > 0) {
+ fos.write(buf, 0, pos);
+ }
+ }
+}
+
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/FastWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/FastWriter.java?rev=1371379&r1=1371378&r2=1371379&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/FastWriter.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/FastWriter.java Thu Aug 9 18:53:19 2012
@@ -52,7 +52,7 @@ public class FastWriter extends Writer {
public void write(char c) throws IOException {
if (pos >= buf.length) {
- sink.write(buf,0,pos);
+ flush(buf,0,pos);
pos=0;
}
buf[pos++] = c;
@@ -61,7 +61,7 @@ public class FastWriter extends Writer {
@Override
public FastWriter append(char c) throws IOException {
if (pos >= buf.length) {
- sink.write(buf,0,pos);
+ flush(buf,0,pos);
pos=0;
}
buf[pos++] = c;
@@ -77,14 +77,14 @@ public class FastWriter extends Writer {
} else if (len<BUFSIZE) {
// if the data to write is small enough, buffer it.
System.arraycopy(cbuf, off, buf, pos, space);
- sink.write(buf, 0, buf.length);
+ flush(buf, 0, buf.length);
pos = len-space;
System.arraycopy(cbuf, off+space, buf, 0, pos);
} else {
- sink.write(buf,0,pos); // flush
+ flush(buf,0,pos); // flush
pos=0;
// don't buffer, just write to sink
- sink.write(cbuf, off, len);
+ flush(cbuf, off, len);
}
}
@@ -97,32 +97,40 @@ public class FastWriter extends Writer {
} else if (len<BUFSIZE) {
// if the data to write is small enough, buffer it.
str.getChars(off, off+space, buf, pos);
- sink.write(buf, 0, buf.length);
+ flush(buf, 0, buf.length);
str.getChars(off+space, off+len, buf, 0);
pos = len-space;
} else {
- sink.write(buf,0,pos); // flush
+ flush(buf,0,pos); // flush
pos=0;
// don't buffer, just write to sink
- sink.write(str, off, len);
+ flush(str, off, len);
}
}
@Override
public void flush() throws IOException {
- sink.write(buf,0,pos);
+ flush(buf, 0, pos);
pos=0;
- sink.flush();
+ if (sink != null) sink.flush();
+ }
+
+ public void flush(char[] buf, int offset, int len) throws IOException {
+ sink.write(buf, offset, len);
+ }
+
+ public void flush(String str, int offset, int len) throws IOException {
+ sink.write(str, offset, len);
}
@Override
public void close() throws IOException {
flush();
- sink.close();
+ if (sink != null) sink.close();
}
public void flushBuffer() throws IOException {
- sink.write(buf, 0, pos);
+ flush(buf, 0, pos);
pos=0;
}
}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestSolrJ.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestSolrJ.java?rev=1371379&r1=1371378&r2=1371379&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestSolrJ.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestSolrJ.java Thu Aug 9 18:53:19 2012
@@ -18,24 +18,149 @@
package org.apache.solr.search;
-import org.apache.lucene.util.OpenBitSet;
import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
-import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.request.SolrQueryRequest;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
+import java.util.Random;
+
public class TestSolrJ extends SolrTestCaseJ4 {
- public void testSolrJ() {
+ public void testSolrJ() throws Exception {
+ // docs, producers, connections, sleep_time
+ // main(new String[] {"1000000","4", "1", "0"});
+
// doCommitPerf();
}
+ public static SolrServer server;
+ public static String idField = "id";
+ public static Exception ex;
+
+ public static void main(String[] args) throws Exception {
+ // String addr = "http://odin.local:80/solr";
+ // String addr = "http://odin.local:8983/solr";
+ String addr = "http://localhost:8983/solr";
+
+ int i = 0;
+ final int nDocs = Integer.parseInt(args[i++]);
+ final int nProducers = Integer.parseInt(args[i++]);
+ final int nConnections = Integer.parseInt(args[i++]);
+ final int maxSleep = Integer.parseInt(args[i++]);
+
+ ConcurrentUpdateSolrServer sserver = null;
+
+ // server = sserver = new ConcurrentUpdateSolrServer(addr,32,8);
+ server = sserver = new ConcurrentUpdateSolrServer(addr,64,nConnections);
+
+ server.deleteByQuery("*:*");
+ server.commit();
+
+ long start = System.currentTimeMillis();
+
+ final int docsPerThread = nDocs / nProducers;
+
+ Thread[] threads = new Thread[nProducers];
+
+ for (int threadNum = 0; threadNum<nProducers; threadNum++) {
+ final int base = threadNum * docsPerThread;
+
+ threads[threadNum] = new Thread("add-thread"+i) {
+ public void run(){
+ try {
+ indexDocs(base, docsPerThread, maxSleep);
+ } catch (Exception e) {
+ System.out.println("###############################CAUGHT EXCEPTION");
+ e.printStackTrace();
+ ex = e;
+ }
+ }
+ };
+ threads[threadNum].start();
+ }
+
+ // optional: wait for commit?
+
+ for (int threadNum = 0; threadNum<nProducers; threadNum++) {
+ threads[threadNum].join();
+ }
+
+ if (sserver != null) {
+ sserver.blockUntilFinished();
+ }
+
+ long end = System.currentTimeMillis();
+ System.out.println("time="+(end-start) + " throughput="+(nDocs*1000/(end-start)) + " Exception="+ex);
+
+ // should server threads be marked as daemon?
+ // need a server.close()!!!
+ }
+
+ public static SolrInputDocument getDocument(int docnum) {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.setField(idField, docnum );
+ doc.setField("cat", Integer.toString(docnum&0x0f) );
+ doc.setField("name", "my name is " + Integer.toString(docnum&0xff) );
+ doc.setField("foo_t", "now is the time for all good men to come to the aid of their country" );
+ doc.setField("foo_i", Integer.toString(docnum&0x0f) );
+ doc.setField("foo_s", Integer.toString(docnum&0xff) );
+ doc.setField("foo_b", Boolean.toString( (docnum&0x01) == 1) );
+ doc.setField("parent_s", Integer.toString(docnum-1) );
+ doc.setField("price", Integer.toString(docnum >> 4));
+
+ int golden = (int)2654435761L;
+ int h = docnum * golden;
+ int n = (h & 0xff) + 1;
+ List lst = new ArrayList(n);
+ for (int i=0; i<n; i++) {
+ h = (h+i) * golden;
+ lst.add(h & 0xfff);
+ }
+
+ doc.setField("num_is", lst);
+ return doc;
+ }
+
+ public static void indexDocs(int base, int count, int maxSleep) throws IOException, SolrServerException {
+ Random r = new Random();
+
+ for (int i=base; i<count+base; i++) {
+ if ((i & 0xfffff) == 0) {
+ System.out.print("\n% " + new Date()+ "\t" + i + "\t");
+ System.out.flush();
+ }
+
+ if ((i & 0xffff) == 0) {
+ System.out.print(".");
+ System.out.flush();
+ }
+
+ SolrInputDocument doc = getDocument(i);
+ server.add(doc);
+
+ if (maxSleep > 0) {
+ int sleep = r.nextInt(maxSleep);
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+ }
+
+
public void doCommitPerf() throws Exception {
HttpSolrServer client = new HttpSolrServer("http://localhost:8983/solr");
@@ -55,4 +180,7 @@ public class TestSolrJ extends SolrTestC
System.out.println("TIME: " + (end-start));
}
+
+
+
}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java?rev=1371379&r1=1371378&r2=1371379&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java Thu Aug 9 18:53:19 2012
@@ -23,10 +23,10 @@ import java.io.*;
* Internal Solr use only, subject to change.
*/
public class FastOutputStream extends OutputStream implements DataOutput {
- private final OutputStream out;
- private final byte[] buf;
- private long written; // how many bytes written to the underlying stream
- private int pos;
+ protected final OutputStream out;
+ protected byte[] buf;
+ protected long written; // how many bytes written to the underlying stream
+ protected int pos;
public FastOutputStream(OutputStream w) {
// use default BUFSIZE of BufferedOutputStream so if we wrap that
@@ -57,7 +57,7 @@ public class FastOutputStream extends Ou
public void write(byte b) throws IOException {
if (pos >= buf.length) {
- out.write(buf);
+ flush(buf, 0, buf.length);
written += pos;
pos=0;
}
@@ -73,18 +73,18 @@ public class FastOutputStream extends Ou
} else if (len<buf.length) {
// if the data to write is small enough, buffer it.
System.arraycopy(arr, off, buf, pos, space);
- out.write(buf);
+ flush(buf, 0, buf.length);
written += buf.length;
pos = len-space;
System.arraycopy(arr, off+space, buf, 0, pos);
} else {
if (pos>0) {
- out.write(buf,0,pos); // flush
+ flush(buf,0,pos); // flush
written += pos;
pos=0;
}
// don't buffer, just write to sink
- out.write(arr, off, len);
+ flush(arr, off, len);
written += len;
}
}
@@ -168,13 +168,13 @@ public class FastOutputStream extends Ou
@Override
public void flush() throws IOException {
flushBuffer();
- out.flush();
+ if (out != null) out.flush();
}
@Override
public void close() throws IOException {
flushBuffer();
- out.close();
+ if (out != null) out.close();
}
/** Only flushes the buffer of the FastOutputStream, not that of the
@@ -182,12 +182,17 @@ public class FastOutputStream extends Ou
*/
public void flushBuffer() throws IOException {
if (pos > 0) {
- out.write(buf, 0, pos);
+ flush(buf, 0, pos);
written += pos;
pos=0;
}
}
+ /** All writes to the sink will go through this method */
+ public void flush(byte[] buf, int offset, int len) throws IOException {
+ out.write(buf, offset, len);
+ }
+
public long size() {
return written + pos;
}