You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/20 13:41:45 UTC
[lucene-solr] 02/02: @1042 Cleanup TransactionLog/Updatelog
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 6b43489569d7a94dabb171d1f22153d4eac8e98f
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Oct 19 23:22:29 2020 -0500
@1042 Cleanup TransactionLog/Updatelog
---
.../org/apache/solr/update/TransactionLog.java | 221 +++++++++------------
.../src/java/org/apache/solr/update/UpdateLog.java | 2 -
.../apache/solr/cloud/MultiThreadedOCPTest.java | 1 +
.../collections/TestLocalFSCloudBackupRestore.java | 1 +
.../org/apache/solr/search/TestSolrCachePerf.java | 1 +
5 files changed, 98 insertions(+), 128 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
index ed69d85..3a4625b 100644
--- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
@@ -117,10 +117,7 @@ public class TransactionLog implements Closeable {
}
// no need to synchronize globalStringMap - it's only updated before the first record is written to the log
- Integer idx;
- synchronized (globalStringList) {
- idx = globalStringMap.get(s.toString());
- }
+ Integer idx = globalStringMap.get(s.toString());
if (idx == null) {
// write a normal string
writeStr(s);
@@ -135,9 +132,7 @@ public class TransactionLog implements Closeable {
int idx = readSize(fis);
if (idx != 0) {// idx != 0 is the index of the extern string
// no need to synchronize globalStringList - it's only updated before the first record is written to the log
- synchronized (globalStringList) {
- return globalStringList.get(idx - 1);
- }
+ return globalStringList.get(idx - 1);
} else {// idx == 0 means it has a string value
// this shouldn't happen with this codec subclass.
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log");
@@ -186,7 +181,7 @@ public class TransactionLog implements Closeable {
long start = raf.length();
channel = raf.getChannel();
os = Channels.newOutputStream(channel);
- fos = new FastOutputStream(os, new byte[65536 * 2], 0);
+ fos = new FastOutputStream(os, new byte[65536], 0);
// fos = FastOutputStream.wrap(os);
if (openExisting) {
@@ -283,7 +278,8 @@ public class TransactionLog implements Closeable {
protected void addGlobalStrings(Collection<String> strings) {
if (strings == null) return;
- synchronized (globalStringList) {
+ fosLock.lock();
+ try {
int origSize = globalStringMap.size();
for (String s : strings) {
Integer idx = null;
@@ -295,12 +291,17 @@ public class TransactionLog implements Closeable {
globalStringMap.put(s, globalStringList.size());
}
assert globalStringMap.size() == globalStringList.size();
+ } finally {
+ fosLock.unlock();
}
}
Collection<String> getGlobalStrings() {
- synchronized (globalStringList) {
+ fosLock.lock();
+ try {
return new ArrayList<>(globalStringList);
+ } finally {
+ fosLock.unlock();
}
}
@@ -708,29 +709,32 @@ public class TransactionLog implements Closeable {
}
fos.flushBuffer();
+ } finally {
+ fosLock.unlock();
+ }
+ if (pos == 0) {
+ readHeader(fis);
- if (pos == 0) {
- readHeader(fis);
-
- // shouldn't currently happen - header and first record are currently written at the same time
- synchronized (TransactionLog.this) {
- if (fis.position() >= fos.size()) {
- return null;
- }
- pos = fis.position();
+ // shouldn't currently happen - header and first record are currently written at the same time
+ fosLock.lock();
+ try {
+ if (fis.position() >= fos.size()) {
+ return null;
}
+ pos = fis.position();
+ } finally {
+ fosLock.unlock();
}
+ }
- Object o = codec.readVal(fis);
+ Object o = codec.readVal(fis);
- // skip over record size
- int size = fis.readInt();
- assert size == fis.position() - pos - 4;
+ // skip over record size
+ int size = fis.readInt();
+ assert size == fis.position() - pos - 4;
+
+ return o;
- return o;
- } finally {
- fosLock.unlock();
- }
}
public void close() {
@@ -771,39 +775,34 @@ public class TransactionLog implements Closeable {
@Override
public Object next() throws IOException, InterruptedException {
- fosLock.lock();
- try {
- if (versionToPos == null) {
- versionToPos = new TreeMap<>();
- Object o;
- long pos = startingPos;
-
- long lastVersion = Long.MIN_VALUE;
- while ((o = super.next()) != null) {
- @SuppressWarnings({"rawtypes"}) List entry = (List) o;
- long version = (Long) entry.get(UpdateLog.VERSION_IDX);
- version = Math.abs(version);
- versionToPos.put(version, pos);
- pos = currentPos();
-
- if (version < lastVersion) inOrder = false;
- lastVersion = version;
- }
- fis.seek(startingPos);
-
+ if (versionToPos == null) {
+ versionToPos = new TreeMap<>();
+ Object o;
+ long pos = startingPos;
+
+ long lastVersion = Long.MIN_VALUE;
+ while ((o = super.next()) != null) {
+ @SuppressWarnings({"rawtypes"})
+ List entry = (List) o;
+ long version = (Long) entry.get(UpdateLog.VERSION_IDX);
+ version = Math.abs(version);
+ versionToPos.put(version, pos);
+ pos = currentPos();
+
+ if (version < lastVersion) inOrder = false;
+ lastVersion = version;
}
+ fis.seek(startingPos);
+ }
- if (inOrder) {
- return super.next();
- } else {
- if (iterator == null) iterator = versionToPos.values().iterator();
- if (!iterator.hasNext()) return null;
- long pos = iterator.next();
- if (pos != currentPos()) fis.seek(pos);
- return super.next();
- }
- } finally {
- fosLock.unlock();
+ if (inOrder) {
+ return super.next();
+ } else {
+ if (iterator == null) iterator = versionToPos.values().iterator();
+ if (!iterator.hasNext()) return null;
+ long pos = iterator.next();
+ if (pos != currentPos()) fis.seek(pos);
+ return super.next();
}
}
}
@@ -869,43 +868,38 @@ public class TransactionLog implements Closeable {
* @throws IOException If there is a low-level I/O error.
*/
public Object next() throws IOException {
- Object o = null;
- fosLock.lock();
- try {
- if (prevPos <= 0) return null;
+ if (prevPos <= 0) return null;
- long endOfThisRecord = prevPos;
+ long endOfThisRecord = prevPos;
- int thisLength = nextLength;
+ int thisLength = nextLength;
- long recordStart = prevPos - thisLength; // back up to the beginning of the next record
- prevPos = recordStart - 4; // back up 4 more to read the length of the next record
+ long recordStart = prevPos - thisLength; // back up to the beginning of the next record
+ prevPos = recordStart - 4; // back up 4 more to read the length of the next record
- if (prevPos <= 0) return null; // this record is the header
+ if (prevPos <= 0) return null; // this record is the header
- long bufferPos = fis.getBufferPos();
- if (prevPos >= bufferPos) {
- // nothing to do... we're within the current buffer
- } else {
- // Position buffer so that this record is at the end.
- // For small records, this will cause subsequent calls to next() to be within the buffer.
- long seekPos = endOfThisRecord - fis.getBufferSize();
- seekPos = Math.min(seekPos, prevPos); // seek to the start of the record if it's larger then the block size.
- seekPos = Math.max(seekPos, 0);
- fis.seek(seekPos);
- fis.peek(); // cause buffer to be filled
- }
+ long bufferPos = fis.getBufferPos();
+ if (prevPos >= bufferPos) {
+ // nothing to do... we're within the current buffer
+ } else {
+ // Position buffer so that this record is at the end.
+ // For small records, this will cause subsequent calls to next() to be within the buffer.
+ long seekPos = endOfThisRecord - fis.getBufferSize();
+ seekPos = Math.min(seekPos, prevPos); // seek to the start of the record if it's larger then the block size.
+ seekPos = Math.max(seekPos, 0);
+ fis.seek(seekPos);
+ fis.peek(); // cause buffer to be filled
+ }
- fis.seek(prevPos);
- nextLength = fis.readInt(); // this is the length of the *next* record (i.e. closer to the beginning)
+ fis.seek(prevPos);
+ nextLength = fis.readInt(); // this is the length of the *next* record (i.e. closer to the beginning)
- // TODO: optionally skip document data
- o = codec.readVal(fis);
+ // TODO: optionally skip document data
+ Object o = codec.readVal(fis);
+
+ // assert fis.position() == prevPos + 4 + thisLength; // this is only true if we read all the data (and we currently skip reading SolrInputDocument
- // assert fis.position() == prevPos + 4 + thisLength; // this is only true if we read all the data (and we currently skip reading SolrInputDocument
- } finally {
- fosLock.unlock();
- }
return o;
}
@@ -944,60 +938,35 @@ public class TransactionLog implements Closeable {
@Override
public int readWrappedStream(byte[] target, int offset, int len) throws IOException {
ByteBuffer bb = ByteBuffer.wrap(target, offset, len);
- fosLock.lock();
- try {
- int ret = ch.read(bb, readFromStream);
- return ret;
- } finally {
- fosLock.unlock();
- }
+ int ret = ch.read(bb, readFromStream);
+ return ret;
}
public void seek(long position) throws IOException {
- fosLock.lock();
- try {
- if (position <= readFromStream && position >= getBufferPos()) {
- // seek within buffer
- pos = (int) (position - getBufferPos());
- } else {
- // long currSize = ch.size(); // not needed - underlying read should handle (unless read never done)
- // if (position > currSize) throw new EOFException("Read past EOF: seeking to " + position + " on file of size " + currSize + " file=" + ch);
- readFromStream = position;
- end = pos = 0;
- }
- assert position() == position;
- } finally {
- fosLock.unlock();
+ if (position <= readFromStream && position >= getBufferPos()) {
+ // seek within buffer
+ pos = (int) (position - getBufferPos());
+ } else {
+ // long currSize = ch.size(); // not needed - underlying read should handle (unless read never done)
+ // if (position > currSize) throw new EOFException("Read past EOF: seeking to " + position + " on file of size " + currSize + " file=" + ch);
+ readFromStream = position;
+ end = pos = 0;
}
+ assert position() == position;
}
/** where is the start of the buffer relative to the whole file */
public long getBufferPos() {
- fosLock.lock();
- try {
- return readFromStream - end;
- } finally {
- fosLock.unlock();
- }
+ return readFromStream - end;
}
public int getBufferSize() {
- fosLock.lock();
- try {
- return buf.length;
- } finally {
- fosLock.unlock();
- }
+ return buf.length;
}
@Override
public void close() throws IOException {
- fosLock.lock();
- try {
- ch.close();
- } finally {
- fosLock.unlock();
- }
+ ch.close();
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index d36aaf8..4fa3952 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -1409,8 +1409,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
}
protected void ensureBufferTlog() {
- if (bufferTlog != null) return;
-
if (bufferTlog == null) {
tlogLock.lock();
try {
diff --git a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
index 2be0f80..fa8b8f1 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
@@ -49,6 +49,7 @@ import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
/**
* Tests the Multi threaded Collections API.
*/
+@LuceneTestCase.Nightly // can be very slow
public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
private static final int REQUEST_STATUS_TIMEOUT = 5;
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestLocalFSCloudBackupRestore.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestLocalFSCloudBackupRestore.java
index f78a1c1..5efe7b8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestLocalFSCloudBackupRestore.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestLocalFSCloudBackupRestore.java
@@ -42,6 +42,7 @@ import org.junit.Test;
* Solr backup/restore still requires a "shared" file-system. Its just that in this case such file-system would be
* exposed via local file-system API.
*/
+@Ignore // nocommit can hang
public class TestLocalFSCloudBackupRestore extends AbstractCloudBackupRestoreTestCase {
private static String backupLocation;
diff --git a/solr/core/src/test/org/apache/solr/search/TestSolrCachePerf.java b/solr/core/src/test/org/apache/solr/search/TestSolrCachePerf.java
index b30f585..81e6986 100644
--- a/solr/core/src/test/org/apache/solr/search/TestSolrCachePerf.java
+++ b/solr/core/src/test/org/apache/solr/search/TestSolrCachePerf.java
@@ -35,6 +35,7 @@ import org.junit.Test;
/**
*
*/
+// TODO: this test is flakey, fails too easily
public class TestSolrCachePerf extends SolrTestCaseJ4 {
private static final Class<? extends SolrCache>[] IMPLS = new Class[] {