You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/20 13:41:43 UTC

[lucene-solr] branch reference_impl_dev updated (878b2e4 -> 6b43489)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a change to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


    from 878b2e4  @1040 Try to improve test.
     new 93bc02a  @1041 ensureBufferTlog should thread saftey should work same as ensureLog.
     new 6b43489  @1042 Cleanup TransactionLog/Updatelog

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/solr/update/TransactionLog.java     | 221 +++++++++------------
 .../src/java/org/apache/solr/update/UpdateLog.java |  16 +-
 .../apache/solr/cloud/MultiThreadedOCPTest.java    |   1 +
 .../collections/TestLocalFSCloudBackupRestore.java |   1 +
 .../org/apache/solr/search/TestSolrCachePerf.java  |   1 +
 5 files changed, 110 insertions(+), 130 deletions(-)


[lucene-solr] 01/02: @1041 ensureBufferTlog should thread saftey should work same as ensureLog.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 93bc02afc418668a42bfbd7c03bf3b468b8a7615
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Oct 19 22:48:46 2020 -0500

    @1041 ensureBufferTlog should thread saftey should work same as ensureLog.
---
 solr/core/src/java/org/apache/solr/update/UpdateLog.java | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 30b6792..d36aaf8 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -1410,9 +1410,19 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
 
   protected void ensureBufferTlog() {
     if (bufferTlog != null) return;
-    String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, BUFFER_TLOG_NAME, System.nanoTime());
-    bufferTlog = newTransactionLog(new File(tlogDir, newLogName), globalStrings, false);
-    bufferTlog.isBuffer = true;
+
+    if (bufferTlog == null) {
+      tlogLock.lock();
+      try {
+        if (bufferTlog == null) {
+          String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, BUFFER_TLOG_NAME, System.nanoTime());
+          bufferTlog = newTransactionLog(new File(tlogDir, newLogName), globalStrings, false);
+          bufferTlog.isBuffer = true;
+        }
+      } finally {
+        tlogLock.unlock();
+      }
+    }
   }
 
   // Cleanup old buffer tlogs


[lucene-solr] 02/02: @1042 Cleanup TransactionLog/Updatelog

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 6b43489569d7a94dabb171d1f22153d4eac8e98f
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Oct 19 23:22:29 2020 -0500

    @1042 Cleanup TransactionLog/Updatelog
---
 .../org/apache/solr/update/TransactionLog.java     | 221 +++++++++------------
 .../src/java/org/apache/solr/update/UpdateLog.java |   2 -
 .../apache/solr/cloud/MultiThreadedOCPTest.java    |   1 +
 .../collections/TestLocalFSCloudBackupRestore.java |   1 +
 .../org/apache/solr/search/TestSolrCachePerf.java  |   1 +
 5 files changed, 98 insertions(+), 128 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
index ed69d85..3a4625b 100644
--- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
@@ -117,10 +117,7 @@ public class TransactionLog implements Closeable {
       }
 
       // no need to synchronize globalStringMap - it's only updated before the first record is written to the log
-      Integer idx;
-      synchronized (globalStringList) {
-        idx = globalStringMap.get(s.toString());
-      }
+      Integer idx = globalStringMap.get(s.toString());
       if (idx == null) {
         // write a normal string
         writeStr(s);
@@ -135,9 +132,7 @@ public class TransactionLog implements Closeable {
       int idx = readSize(fis);
       if (idx != 0) {// idx != 0 is the index of the extern string
         // no need to synchronize globalStringList - it's only updated before the first record is written to the log
-        synchronized (globalStringList) {
-          return globalStringList.get(idx - 1);
-        }
+        return globalStringList.get(idx - 1);
       } else {// idx == 0 means it has a string value
         // this shouldn't happen with this codec subclass.
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log");
@@ -186,7 +181,7 @@ public class TransactionLog implements Closeable {
       long start = raf.length();
       channel = raf.getChannel();
       os = Channels.newOutputStream(channel);
-      fos = new FastOutputStream(os, new byte[65536 * 2], 0);
+      fos = new FastOutputStream(os, new byte[65536], 0);
       // fos = FastOutputStream.wrap(os);
 
       if (openExisting) {
@@ -283,7 +278,8 @@ public class TransactionLog implements Closeable {
 
   protected void addGlobalStrings(Collection<String> strings) {
     if (strings == null) return;
-    synchronized (globalStringList) {
+    fosLock.lock();
+    try {
       int origSize = globalStringMap.size();
       for (String s : strings) {
         Integer idx = null;
@@ -295,12 +291,17 @@ public class TransactionLog implements Closeable {
         globalStringMap.put(s, globalStringList.size());
       }
       assert globalStringMap.size() == globalStringList.size();
+    } finally {
+      fosLock.unlock();
     }
   }
 
   Collection<String> getGlobalStrings() {
-    synchronized (globalStringList) {
+    fosLock.lock();
+    try {
       return new ArrayList<>(globalStringList);
+    } finally {
+      fosLock.unlock();
     }
   }
 
@@ -708,29 +709,32 @@ public class TransactionLog implements Closeable {
         }
 
         fos.flushBuffer();
+      } finally {
+        fosLock.unlock();
+      }
+      if (pos == 0) {
+        readHeader(fis);
 
-        if (pos == 0) {
-          readHeader(fis);
-
-          // shouldn't currently happen - header and first record are currently written at the same time
-          synchronized (TransactionLog.this) {
-            if (fis.position() >= fos.size()) {
-              return null;
-            }
-            pos = fis.position();
+        // shouldn't currently happen - header and first record are currently written at the same time
+        fosLock.lock();
+        try {
+          if (fis.position() >= fos.size()) {
+            return null;
           }
+          pos = fis.position();
+        } finally {
+          fosLock.unlock();
         }
+      }
 
-        Object o = codec.readVal(fis);
+      Object o = codec.readVal(fis);
 
-        // skip over record size
-        int size = fis.readInt();
-        assert size == fis.position() - pos - 4;
+      // skip over record size
+      int size = fis.readInt();
+      assert size == fis.position() - pos - 4;
+
+      return o;
 
-        return o;
-      } finally {
-        fosLock.unlock();
-      }
     }
 
     public void close() {
@@ -771,39 +775,34 @@ public class TransactionLog implements Closeable {
 
     @Override
     public Object next() throws IOException, InterruptedException {
-      fosLock.lock();
-      try {
-        if (versionToPos == null) {
-          versionToPos = new TreeMap<>();
-          Object o;
-          long pos = startingPos;
-
-          long lastVersion = Long.MIN_VALUE;
-          while ((o = super.next()) != null) {
-            @SuppressWarnings({"rawtypes"}) List entry = (List) o;
-            long version = (Long) entry.get(UpdateLog.VERSION_IDX);
-            version = Math.abs(version);
-            versionToPos.put(version, pos);
-            pos = currentPos();
-
-            if (version < lastVersion) inOrder = false;
-            lastVersion = version;
-          }
-          fis.seek(startingPos);
-
+      if (versionToPos == null) {
+        versionToPos = new TreeMap<>();
+        Object o;
+        long pos = startingPos;
+
+        long lastVersion = Long.MIN_VALUE;
+        while ((o = super.next()) != null) {
+          @SuppressWarnings({"rawtypes"})
+          List entry = (List) o;
+          long version = (Long) entry.get(UpdateLog.VERSION_IDX);
+          version = Math.abs(version);
+          versionToPos.put(version, pos);
+          pos = currentPos();
+
+          if (version < lastVersion) inOrder = false;
+          lastVersion = version;
         }
+        fis.seek(startingPos);
+      }
 
-        if (inOrder) {
-          return super.next();
-        } else {
-          if (iterator == null) iterator = versionToPos.values().iterator();
-          if (!iterator.hasNext()) return null;
-          long pos = iterator.next();
-          if (pos != currentPos()) fis.seek(pos);
-          return super.next();
-        }
-      } finally {
-        fosLock.unlock();
+      if (inOrder) {
+        return super.next();
+      } else {
+        if (iterator == null) iterator = versionToPos.values().iterator();
+        if (!iterator.hasNext()) return null;
+        long pos = iterator.next();
+        if (pos != currentPos()) fis.seek(pos);
+        return super.next();
       }
     }
   }
@@ -869,43 +868,38 @@ public class TransactionLog implements Closeable {
      * @throws IOException If there is a low-level I/O error.
      */
     public Object next() throws IOException {
-      Object o = null;
-      fosLock.lock();
-      try {
-        if (prevPos <= 0) return null;
+      if (prevPos <= 0) return null;
 
-        long endOfThisRecord = prevPos;
+      long endOfThisRecord = prevPos;
 
-        int thisLength = nextLength;
+      int thisLength = nextLength;
 
-        long recordStart = prevPos - thisLength;  // back up to the beginning of the next record
-        prevPos = recordStart - 4;  // back up 4 more to read the length of the next record
+      long recordStart = prevPos - thisLength;  // back up to the beginning of the next record
+      prevPos = recordStart - 4;  // back up 4 more to read the length of the next record
 
-        if (prevPos <= 0) return null;  // this record is the header
+      if (prevPos <= 0) return null;  // this record is the header
 
-        long bufferPos = fis.getBufferPos();
-        if (prevPos >= bufferPos) {
-          // nothing to do... we're within the current buffer
-        } else {
-          // Position buffer so that this record is at the end.
-          // For small records, this will cause subsequent calls to next() to be within the buffer.
-          long seekPos = endOfThisRecord - fis.getBufferSize();
-          seekPos = Math.min(seekPos, prevPos); // seek to the start of the record if it's larger then the block size.
-          seekPos = Math.max(seekPos, 0);
-          fis.seek(seekPos);
-          fis.peek();  // cause buffer to be filled
-        }
+      long bufferPos = fis.getBufferPos();
+      if (prevPos >= bufferPos) {
+        // nothing to do... we're within the current buffer
+      } else {
+        // Position buffer so that this record is at the end.
+        // For small records, this will cause subsequent calls to next() to be within the buffer.
+        long seekPos = endOfThisRecord - fis.getBufferSize();
+        seekPos = Math.min(seekPos, prevPos); // seek to the start of the record if it's larger then the block size.
+        seekPos = Math.max(seekPos, 0);
+        fis.seek(seekPos);
+        fis.peek();  // cause buffer to be filled
+      }
 
-        fis.seek(prevPos);
-        nextLength = fis.readInt();     // this is the length of the *next* record (i.e. closer to the beginning)
+      fis.seek(prevPos);
+      nextLength = fis.readInt();     // this is the length of the *next* record (i.e. closer to the beginning)
 
-        // TODO: optionally skip document data
-        o = codec.readVal(fis);
+      // TODO: optionally skip document data
+      Object o = codec.readVal(fis);
+
+      // assert fis.position() == prevPos + 4 + thisLength;  // this is only true if we read all the data (and we currently skip reading SolrInputDocument
 
-        // assert fis.position() == prevPos + 4 + thisLength;  // this is only true if we read all the data (and we currently skip reading SolrInputDocument
-      } finally {
-        fosLock.unlock();
-      }
       return o;
     }
 
@@ -944,60 +938,35 @@ public class TransactionLog implements Closeable {
     @Override
     public int readWrappedStream(byte[] target, int offset, int len) throws IOException {
       ByteBuffer bb = ByteBuffer.wrap(target, offset, len);
-      fosLock.lock();
-      try {
-        int ret = ch.read(bb, readFromStream);
-        return ret;
-      } finally {
-        fosLock.unlock();
-      }
+      int ret = ch.read(bb, readFromStream);
+      return ret;
     }
 
     public void seek(long position) throws IOException {
-      fosLock.lock();
-      try {
-        if (position <= readFromStream && position >= getBufferPos()) {
-          // seek within buffer
-          pos = (int) (position - getBufferPos());
-        } else {
-          // long currSize = ch.size();   // not needed - underlying read should handle (unless read never done)
-          // if (position > currSize) throw new EOFException("Read past EOF: seeking to " + position + " on file of size " + currSize + " file=" + ch);
-          readFromStream = position;
-          end = pos = 0;
-        }
-        assert position() == position;
-      } finally {
-        fosLock.unlock();
+      if (position <= readFromStream && position >= getBufferPos()) {
+        // seek within buffer
+        pos = (int) (position - getBufferPos());
+      } else {
+        // long currSize = ch.size();   // not needed - underlying read should handle (unless read never done)
+        // if (position > currSize) throw new EOFException("Read past EOF: seeking to " + position + " on file of size " + currSize + " file=" + ch);
+        readFromStream = position;
+        end = pos = 0;
       }
+      assert position() == position;
     }
 
   /** where is the start of the buffer relative to the whole file */
     public long getBufferPos() {
-      fosLock.lock();
-      try {
-        return readFromStream - end;
-      } finally {
-        fosLock.unlock();
-      }
+      return readFromStream - end;
     }
 
     public int getBufferSize() {
-      fosLock.lock();
-      try {
-        return buf.length;
-      } finally {
-        fosLock.unlock();
-      }
+      return buf.length;
     }
 
     @Override
     public void close() throws IOException {
-      fosLock.lock();
-      try {
-        ch.close();
-      } finally {
-        fosLock.unlock();
-      }
+      ch.close();
     }
 
     @Override
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index d36aaf8..4fa3952 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -1409,8 +1409,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
   }
 
   protected void ensureBufferTlog() {
-    if (bufferTlog != null) return;
-
     if (bufferTlog == null) {
       tlogLock.lock();
       try {
diff --git a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
index 2be0f80..fa8b8f1 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
@@ -49,6 +49,7 @@ import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 /**
  * Tests the Multi threaded Collections API.
  */
+@LuceneTestCase.Nightly // can be very slow
 public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
 
   private static final int REQUEST_STATUS_TIMEOUT = 5;
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestLocalFSCloudBackupRestore.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestLocalFSCloudBackupRestore.java
index f78a1c1..5efe7b8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestLocalFSCloudBackupRestore.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestLocalFSCloudBackupRestore.java
@@ -42,6 +42,7 @@ import org.junit.Test;
  * Solr backup/restore still requires a "shared" file-system. Its just that in this case such file-system would be
  * exposed via local file-system API.
  */
+@Ignore // nocommit can hang
 public class TestLocalFSCloudBackupRestore extends AbstractCloudBackupRestoreTestCase {
   private static String backupLocation;
 
diff --git a/solr/core/src/test/org/apache/solr/search/TestSolrCachePerf.java b/solr/core/src/test/org/apache/solr/search/TestSolrCachePerf.java
index b30f585..81e6986 100644
--- a/solr/core/src/test/org/apache/solr/search/TestSolrCachePerf.java
+++ b/solr/core/src/test/org/apache/solr/search/TestSolrCachePerf.java
@@ -35,6 +35,7 @@ import org.junit.Test;
 /**
  *
  */
+// TODO: this test is flakey, fails too easily
 public class TestSolrCachePerf extends SolrTestCaseJ4 {
 
   private static final Class<? extends SolrCache>[] IMPLS = new Class[] {