You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/01/21 01:21:28 UTC

[lucene-solr] 01/02: @1264 Review UpdateLog changes.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit c7f23ce21a481bdadab044c8c315a0099b5a1b35
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Jan 20 19:18:18 2021 -0600

    @1264 Review UpdateLog changes.
---
 .../apache/solr/update/DirectUpdateHandler2.java   |   8 +-
 .../src/java/org/apache/solr/update/UpdateLog.java | 201 ++++++++++++---------
 2 files changed, 118 insertions(+), 91 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index af952f2..45a3127 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -590,7 +590,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
     boolean error=true;
 
     try {
-      log.debug("start {}", cmd);
+      if (log.isDebugEnabled()) log.debug("start {}", cmd);
       RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
       try {
         SolrIndexWriter.setCommitData(iw.get(), cmd.getVersion());
@@ -599,7 +599,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
         iw.decref();
       }
 
-      log.debug("end_prepareCommit");
+      if (log.isDebugEnabled()) log.debug("end_prepareCommit");
 
       error=false;
     }
@@ -842,7 +842,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
         }
         // we shouldn't close the transaction logs either, but leaving them open
         // means we can't delete them on windows (needed for tests)
-        if (ulog != null) ulog.close(true);
+        if (ulog != null) ulog.close(false);
 
         return;
       }
@@ -896,7 +896,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
     }
 
     try {
-      if (ulog != null) ulog.close(true);
+      if (ulog != null) ulog.close(false);
     } catch (Throwable th) {
       log.error("Error closing log files", th);
       if (th instanceof OutOfMemoryError) {
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 09754a7..0f51d41 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -369,7 +369,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
    * This must be called when a new log is created, or
    * for an existing log whenever the core or update handler changes.
    */
-  public synchronized void init(UpdateHandler uhandler, SolrCore core) {
+  public void init(UpdateHandler uhandler, SolrCore core) {
     if (dataDir != null) {
       assert ObjectReleaseTracker.release(this);
     }
@@ -377,6 +377,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
     // tracking in this init call that can be recalled, and we create in the updatehandler
     // but then the solrcorestate is what closes ...
     // assert ObjectReleaseTracker.track(this);
+    tlogLock.lock();
     try {
       dataDir = core.getUlogDir();
 
@@ -462,6 +463,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
         throw e;
       }
       throw new SolrException(ErrorCode.SERVER_ERROR, e);
+    } finally {
+      tlogLock.unlock();
     }
   }
 
@@ -524,34 +527,38 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
   /* Takes over ownership of the log, keeping it until no longer needed
      and then decrementing its reference and dropping it.
    */
-  protected synchronized void addOldLog(TransactionLog oldLog, boolean removeOld) {
+  protected void addOldLog(TransactionLog oldLog, boolean removeOld) {
     if (oldLog == null) return;
+    tlogLock.lock();
+    try {
+      numOldRecords += oldLog.numRecords();
 
-    numOldRecords += oldLog.numRecords();
+      int currRecords = numOldRecords;
 
-    int currRecords = numOldRecords;
+      if (oldLog != tlog && tlog != null) {
+        currRecords += tlog.numRecords();
+      }
 
-    if (oldLog != tlog &&  tlog != null) {
-      currRecords += tlog.numRecords();
-    }
+      while (removeOld && logs.size() > 0) {
+        TransactionLog log = logs.peekLast();
+        int nrec = log.numRecords();
+        // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if
+        // we already have the limit of 10 log files.
+        if (currRecords - nrec >= numRecordsToKeep || (maxNumLogsToKeep > 0 && logs.size() >= maxNumLogsToKeep)) {
+          currRecords -= nrec;
+          numOldRecords -= nrec;
+          logs.removeLast().decref();  // dereference so it will be deleted when no longer in use
+          continue;
+        }
 
-    while (removeOld && logs.size() > 0) {
-      TransactionLog log = logs.peekLast();
-      int nrec = log.numRecords();
-      // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if
-      // we already have the limit of 10 log files.
-      if (currRecords - nrec >= numRecordsToKeep || (maxNumLogsToKeep > 0 && logs.size() >= maxNumLogsToKeep)) {
-        currRecords -= nrec;
-        numOldRecords -= nrec;
-        logs.removeLast().decref();  // dereference so it will be deleted when no longer in use
-        continue;
+        break;
       }
 
-      break;
+      // don't incref... we are taking ownership from the caller.
+      logs.addFirst(oldLog);
+    } finally {
+      tlogLock.unlock();
     }
-
-    // don't incref... we are taking ownership from the caller.
-    logs.addFirst(oldLog);
   }
 
   public String[] getBufferLogList(File directory) {
@@ -720,7 +727,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
         trackDeleteByQuery(cmd.getQuery(), cmd.getVersion());
 
         if (trace) {
-          // TODO: sync
           LogPtr ptr = new LogPtr(pos, cmd.getVersion());
           int hash = System.identityHashCode(map);
           log.trace("TLOG: added deleteByQuery {} to {} {} map = {}.", cmd.query, tlog, ptr, hash);
@@ -880,6 +886,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
         // the current transaction log.  This normally shouldn't happen
         // as DistributedUpdateProcessor will prevent this.  Commits
         // that don't use the processor are possible though.
+        log.info("Won't roll transaction log because not ACTIVE or a replay");
         return;
       }
 
@@ -888,19 +895,20 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
 
       if (prevTlog != null) {
         globalStrings = prevTlog.getGlobalStrings();
+      }
 
-        // since document additions can happen concurrently with commit, create
-        // a new transaction log first so that we know the old one is definitely
-        // in the index.
-        if (prevTlog != null) {
-          // postCommit for prevTlog is not called, may be the index is corrupted
-          // if we override prevTlog value, the correspond tlog will be leaked, close it first
-          postCommit(cmd);
-        }
-        prevTlog = tlog;
-        tlog = null;
-        id++;
+      // since document additions can happen concurrently with commit, create
+      // a new transaction log first so that we know the old one is definitely
+      // in the index.
+      if (prevTlog != null) {
+        // postCommit for prevTlog is not called, may be the index is corrupted
+        // if we override prevTlog value, the correspond tlog will be leaked, close it first
+        postCommit(cmd);
       }
+      prevTlog = tlog;
+      tlog = null;
+      id++;
+
     } finally {
       tlogLock.unlock();
     }
@@ -1052,35 +1060,40 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
    * @return The entry if found, otherwise null
    */
   @SuppressWarnings({"rawtypes"})
-  private synchronized List getEntryFromTLog(long lookupPointer, long lookupVersion, List<TransactionLog> lookupLogs) {
-    for (TransactionLog lookupLog : lookupLogs) {
-      if (lookupLog != null && lookupLog.getLogSize() > lookupPointer) {
-        lookupLog.incref();
-        try {
-          Object obj = null;
-
+  private List getEntryFromTLog(long lookupPointer, long lookupVersion, List<TransactionLog> lookupLogs) {
+    tlogLock.lock();
+    try {
+      for (TransactionLog lookupLog : lookupLogs) {
+        if (lookupLog != null && lookupLog.getLogSize() > lookupPointer) {
+          lookupLog.incref();
           try {
-            obj = lookupLog.lookup(lookupPointer);
-          } catch (Exception | AssertionError ex) {
-            // This can happen when trying to deserialize the entry at position lookupPointer,
-            // but from a different tlog than the one containing the desired entry.
-            // Just ignore the exception, so as to proceed to the next tlog.
-            log.info("Exception reading the log (this is expected, don't worry)={}, for version={}. This can be ignored"
-                , lookupLog, lookupVersion);
-          }
+            Object obj = null;
 
-          if (obj != null && obj instanceof List) {
-            List tmpEntry = (List) obj;
-            if (tmpEntry.size() >= 2 && 
-                (tmpEntry.get(UpdateLog.VERSION_IDX) instanceof Long) &&
-                (tmpEntry.get(UpdateLog.VERSION_IDX)).equals(lookupVersion)) {
-              return tmpEntry;
+            try {
+              obj = lookupLog.lookup(lookupPointer);
+            } catch (Exception | AssertionError ex) {
+              // This can happen when trying to deserialize the entry at position lookupPointer,
+              // but from a different tlog than the one containing the desired entry.
+              // Just ignore the exception, so as to proceed to the next tlog.
+              log.info("Exception reading the log (this is expected, don't worry)={}, for version={}. This can be ignored"
+                  , lookupLog, lookupVersion);
             }
+
+            if (obj != null && obj instanceof List) {
+              List tmpEntry = (List) obj;
+              if (tmpEntry.size() >= 2 &&
+                  (tmpEntry.get(UpdateLog.VERSION_IDX) instanceof Long) &&
+                  (tmpEntry.get(UpdateLog.VERSION_IDX)).equals(lookupVersion)) {
+                return tmpEntry;
+              }
+            }
+          } finally {
+            lookupLog.decref();
           }
-        } finally {
-          lookupLog.decref();
         }
       }
+    } finally {
+      tlogLock.unlock();
     }
     return null;
   }
@@ -1129,20 +1142,25 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
   // synchronization is needed for stronger guarantees (as VersionUpdateProcessor does).
   public Long lookupVersion(BytesRef indexedId) {
     LogPtr entry;
-
-    entry = map.get(indexedId);
-    // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
-    if (entry == null && prevMap != null) {
-      entry = prevMap.get(indexedId);
-      // something found in prevMap will always be found in prevMapLog (which could be tlog or prevTlog)
-      // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
-    }
-    if (entry == null && prevMap2 != null) {
-      entry = prevMap2.get(indexedId);
-      // something found in prevMap2 will always be found in prevMapLog2 (which could be tlog or prevTlog)
-      // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+    tlogLock.lock();
+    try {
+      entry = map.get(indexedId);
+      // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+      if (entry == null && prevMap != null) {
+        entry = prevMap.get(indexedId);
+        // something found in prevMap will always be found in prevMapLog (which could be tlog or prevTlog)
+        // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+      }
+      if (entry == null && prevMap2 != null) {
+        entry = prevMap2.get(indexedId);
+        // something found in prevMap2 will always be found in prevMapLog2 (which could be tlog or prevTlog)
+        // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+      }
+    } finally {
+      tlogLock.unlock();
     }
 
+
     if (entry != null) {
       return entry.version;
     }
@@ -1156,8 +1174,12 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
 
     // We can't get any version info for deletes from the index, so if the doc
     // wasn't found, check a cache of recent deletes.
-
-    entry = oldDeletes.get(indexedId);
+    tlogLock.lock();
+    try {
+      entry = oldDeletes.get(indexedId);
+    } finally {
+      tlogLock.unlock();
+    }
 
     if (entry != null) {
       return entry.version;
@@ -1173,10 +1195,15 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
     if (syncLevel == SyncLevel.NONE) {
       return;
     }
-
-    TransactionLog currLog = tlog;
-    if (currLog == null) return;
-    currLog.incref();
+    TransactionLog currLog;
+    tlogLock.lock();
+    try {
+      currLog = tlog;
+      if (currLog == null) return;
+      currLog.incref();
+    } finally {
+      tlogLock.unlock();
+    }
 
     try {
       currLog.finish(syncLevel);
@@ -1300,16 +1327,20 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
   public void commitAndSwitchToNewTlog(CommitUpdateCommand cuc) {
     versionInfo.blockUpdates();
     try {
-      if (tlog == null) {
-        return;
-      }
-      preCommit(cuc);
+      tlogLock.lock();
       try {
-        copyOverOldUpdates(cuc.getVersion());
+        if (tlog == null) {
+          return;
+        }
+        preCommit(cuc);
+        try {
+          copyOverOldUpdates(cuc.getVersion());
+        } finally {
+          postCommit(cuc);
+        }
       } finally {
-        postCommit(cuc);
+        tlogLock.unlock();
       }
-
     } finally {
       versionInfo.unblockUpdates();
     }
@@ -1834,11 +1865,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
     ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(ParWork.getRootSharedExecutor());
     LogReplayer replayer = new LogReplayer(Collections.singletonList(bufferTlog), true);
     return cs.submit(() -> {
-      try {
-        replayer.run();
-      } catch (AlreadyClosedException e) {
-
-      }
+      replayer.run();
       dropBufferTlog();
     }, recoveryInfo);
   }
@@ -2350,7 +2377,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
   }
 
   public void seedBucketsWithHighestVersion(SolrIndexSearcher newSearcher) {
-    log.debug("Looking up max value of version field to seed version buckets");
+    if (debug) log.debug("Looking up max value of version field to seed version buckets");
     if (versionInfo != null) {
       versionInfo.blockUpdates();
       try {