You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/01/21 01:21:28 UTC
[lucene-solr] 01/02: @1264 Review UpdateLog changes.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit c7f23ce21a481bdadab044c8c315a0099b5a1b35
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Jan 20 19:18:18 2021 -0600
@1264 Review UpdateLog changes.
---
.../apache/solr/update/DirectUpdateHandler2.java | 8 +-
.../src/java/org/apache/solr/update/UpdateLog.java | 201 ++++++++++++---------
2 files changed, 118 insertions(+), 91 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index af952f2..45a3127 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -590,7 +590,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
boolean error=true;
try {
- log.debug("start {}", cmd);
+ if (log.isDebugEnabled()) log.debug("start {}", cmd);
RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
try {
SolrIndexWriter.setCommitData(iw.get(), cmd.getVersion());
@@ -599,7 +599,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
iw.decref();
}
- log.debug("end_prepareCommit");
+ if (log.isDebugEnabled()) log.debug("end_prepareCommit");
error=false;
}
@@ -842,7 +842,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
}
// we shouldn't close the transaction logs either, but leaving them open
// means we can't delete them on windows (needed for tests)
- if (ulog != null) ulog.close(true);
+ if (ulog != null) ulog.close(false);
return;
}
@@ -896,7 +896,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
}
try {
- if (ulog != null) ulog.close(true);
+ if (ulog != null) ulog.close(false);
} catch (Throwable th) {
log.error("Error closing log files", th);
if (th instanceof OutOfMemoryError) {
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 09754a7..0f51d41 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -369,7 +369,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
* This must be called when a new log is created, or
* for an existing log whenever the core or update handler changes.
*/
- public synchronized void init(UpdateHandler uhandler, SolrCore core) {
+ public void init(UpdateHandler uhandler, SolrCore core) {
if (dataDir != null) {
assert ObjectReleaseTracker.release(this);
}
@@ -377,6 +377,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
// tracking in this init call that can be recalled, and we create in the updatehandler
// but then the solrcorestate is what closes ...
// assert ObjectReleaseTracker.track(this);
+ tlogLock.lock();
try {
dataDir = core.getUlogDir();
@@ -462,6 +463,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
throw e;
}
throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ } finally {
+ tlogLock.unlock();
}
}
@@ -524,34 +527,38 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
/* Takes over ownership of the log, keeping it until no longer needed
and then decrementing its reference and dropping it.
*/
- protected synchronized void addOldLog(TransactionLog oldLog, boolean removeOld) {
+ protected void addOldLog(TransactionLog oldLog, boolean removeOld) {
if (oldLog == null) return;
+ tlogLock.lock();
+ try {
+ numOldRecords += oldLog.numRecords();
- numOldRecords += oldLog.numRecords();
+ int currRecords = numOldRecords;
- int currRecords = numOldRecords;
+ if (oldLog != tlog && tlog != null) {
+ currRecords += tlog.numRecords();
+ }
- if (oldLog != tlog && tlog != null) {
- currRecords += tlog.numRecords();
- }
+ while (removeOld && logs.size() > 0) {
+ TransactionLog log = logs.peekLast();
+ int nrec = log.numRecords();
+ // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if
+ // we already have the limit of 10 log files.
+ if (currRecords - nrec >= numRecordsToKeep || (maxNumLogsToKeep > 0 && logs.size() >= maxNumLogsToKeep)) {
+ currRecords -= nrec;
+ numOldRecords -= nrec;
+ logs.removeLast().decref(); // dereference so it will be deleted when no longer in use
+ continue;
+ }
- while (removeOld && logs.size() > 0) {
- TransactionLog log = logs.peekLast();
- int nrec = log.numRecords();
- // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if
- // we already have the limit of 10 log files.
- if (currRecords - nrec >= numRecordsToKeep || (maxNumLogsToKeep > 0 && logs.size() >= maxNumLogsToKeep)) {
- currRecords -= nrec;
- numOldRecords -= nrec;
- logs.removeLast().decref(); // dereference so it will be deleted when no longer in use
- continue;
+ break;
}
- break;
+ // don't incref... we are taking ownership from the caller.
+ logs.addFirst(oldLog);
+ } finally {
+ tlogLock.unlock();
}
-
- // don't incref... we are taking ownership from the caller.
- logs.addFirst(oldLog);
}
public String[] getBufferLogList(File directory) {
@@ -720,7 +727,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
trackDeleteByQuery(cmd.getQuery(), cmd.getVersion());
if (trace) {
- // TODO: sync
LogPtr ptr = new LogPtr(pos, cmd.getVersion());
int hash = System.identityHashCode(map);
log.trace("TLOG: added deleteByQuery {} to {} {} map = {}.", cmd.query, tlog, ptr, hash);
@@ -880,6 +886,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
// the current transaction log. This normally shouldn't happen
// as DistributedUpdateProcessor will prevent this. Commits
// that don't use the processor are possible though.
+ log.info("Won't roll transaction log because not ACTIVE or a replay");
return;
}
@@ -888,19 +895,20 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
if (prevTlog != null) {
globalStrings = prevTlog.getGlobalStrings();
+ }
- // since document additions can happen concurrently with commit, create
- // a new transaction log first so that we know the old one is definitely
- // in the index.
- if (prevTlog != null) {
- // postCommit for prevTlog is not called, may be the index is corrupted
- // if we override prevTlog value, the correspond tlog will be leaked, close it first
- postCommit(cmd);
- }
- prevTlog = tlog;
- tlog = null;
- id++;
+ // since document additions can happen concurrently with commit, create
+ // a new transaction log first so that we know the old one is definitely
+ // in the index.
+ if (prevTlog != null) {
+ // postCommit for prevTlog is not called, may be the index is corrupted
+ // if we override prevTlog value, the correspond tlog will be leaked, close it first
+ postCommit(cmd);
}
+ prevTlog = tlog;
+ tlog = null;
+ id++;
+
} finally {
tlogLock.unlock();
}
@@ -1052,35 +1060,40 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
* @return The entry if found, otherwise null
*/
@SuppressWarnings({"rawtypes"})
- private synchronized List getEntryFromTLog(long lookupPointer, long lookupVersion, List<TransactionLog> lookupLogs) {
- for (TransactionLog lookupLog : lookupLogs) {
- if (lookupLog != null && lookupLog.getLogSize() > lookupPointer) {
- lookupLog.incref();
- try {
- Object obj = null;
-
+ private List getEntryFromTLog(long lookupPointer, long lookupVersion, List<TransactionLog> lookupLogs) {
+ tlogLock.lock();
+ try {
+ for (TransactionLog lookupLog : lookupLogs) {
+ if (lookupLog != null && lookupLog.getLogSize() > lookupPointer) {
+ lookupLog.incref();
try {
- obj = lookupLog.lookup(lookupPointer);
- } catch (Exception | AssertionError ex) {
- // This can happen when trying to deserialize the entry at position lookupPointer,
- // but from a different tlog than the one containing the desired entry.
- // Just ignore the exception, so as to proceed to the next tlog.
- log.info("Exception reading the log (this is expected, don't worry)={}, for version={}. This can be ignored"
- , lookupLog, lookupVersion);
- }
+ Object obj = null;
- if (obj != null && obj instanceof List) {
- List tmpEntry = (List) obj;
- if (tmpEntry.size() >= 2 &&
- (tmpEntry.get(UpdateLog.VERSION_IDX) instanceof Long) &&
- (tmpEntry.get(UpdateLog.VERSION_IDX)).equals(lookupVersion)) {
- return tmpEntry;
+ try {
+ obj = lookupLog.lookup(lookupPointer);
+ } catch (Exception | AssertionError ex) {
+ // This can happen when trying to deserialize the entry at position lookupPointer,
+ // but from a different tlog than the one containing the desired entry.
+ // Just ignore the exception, so as to proceed to the next tlog.
+ log.info("Exception reading the log (this is expected, don't worry)={}, for version={}. This can be ignored"
+ , lookupLog, lookupVersion);
}
+
+ if (obj != null && obj instanceof List) {
+ List tmpEntry = (List) obj;
+ if (tmpEntry.size() >= 2 &&
+ (tmpEntry.get(UpdateLog.VERSION_IDX) instanceof Long) &&
+ (tmpEntry.get(UpdateLog.VERSION_IDX)).equals(lookupVersion)) {
+ return tmpEntry;
+ }
+ }
+ } finally {
+ lookupLog.decref();
}
- } finally {
- lookupLog.decref();
}
}
+ } finally {
+ tlogLock.unlock();
}
return null;
}
@@ -1129,20 +1142,25 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
// synchronization is needed for stronger guarantees (as VersionUpdateProcessor does).
public Long lookupVersion(BytesRef indexedId) {
LogPtr entry;
-
- entry = map.get(indexedId);
- // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
- if (entry == null && prevMap != null) {
- entry = prevMap.get(indexedId);
- // something found in prevMap will always be found in prevMapLog (which could be tlog or prevTlog)
- // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
- }
- if (entry == null && prevMap2 != null) {
- entry = prevMap2.get(indexedId);
- // something found in prevMap2 will always be found in prevMapLog2 (which could be tlog or prevTlog)
- // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ tlogLock.lock();
+ try {
+ entry = map.get(indexedId);
+ // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ if (entry == null && prevMap != null) {
+ entry = prevMap.get(indexedId);
+ // something found in prevMap will always be found in prevMapLog (which could be tlog or prevTlog)
+ // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ }
+ if (entry == null && prevMap2 != null) {
+ entry = prevMap2.get(indexedId);
+ // something found in prevMap2 will always be found in prevMapLog2 (which could be tlog or prevTlog)
+ // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ }
+ } finally {
+ tlogLock.unlock();
}
+
if (entry != null) {
return entry.version;
}
@@ -1156,8 +1174,12 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
// We can't get any version info for deletes from the index, so if the doc
// wasn't found, check a cache of recent deletes.
-
- entry = oldDeletes.get(indexedId);
+ tlogLock.lock();
+ try {
+ entry = oldDeletes.get(indexedId);
+ } finally {
+ tlogLock.unlock();
+ }
if (entry != null) {
return entry.version;
@@ -1173,10 +1195,15 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
if (syncLevel == SyncLevel.NONE) {
return;
}
-
- TransactionLog currLog = tlog;
- if (currLog == null) return;
- currLog.incref();
+ TransactionLog currLog;
+ tlogLock.lock();
+ try {
+ currLog = tlog;
+ if (currLog == null) return;
+ currLog.incref();
+ } finally {
+ tlogLock.unlock();
+ }
try {
currLog.finish(syncLevel);
@@ -1300,16 +1327,20 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
public void commitAndSwitchToNewTlog(CommitUpdateCommand cuc) {
versionInfo.blockUpdates();
try {
- if (tlog == null) {
- return;
- }
- preCommit(cuc);
+ tlogLock.lock();
try {
- copyOverOldUpdates(cuc.getVersion());
+ if (tlog == null) {
+ return;
+ }
+ preCommit(cuc);
+ try {
+ copyOverOldUpdates(cuc.getVersion());
+ } finally {
+ postCommit(cuc);
+ }
} finally {
- postCommit(cuc);
+ tlogLock.unlock();
}
-
} finally {
versionInfo.unblockUpdates();
}
@@ -1834,11 +1865,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(ParWork.getRootSharedExecutor());
LogReplayer replayer = new LogReplayer(Collections.singletonList(bufferTlog), true);
return cs.submit(() -> {
- try {
- replayer.run();
- } catch (AlreadyClosedException e) {
-
- }
+ replayer.run();
dropBufferTlog();
}, recoveryInfo);
}
@@ -2350,7 +2377,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
}
public void seedBucketsWithHighestVersion(SolrIndexSearcher newSearcher) {
- log.debug("Looking up max value of version field to seed version buckets");
+ if (debug) log.debug("Looking up max value of version field to seed version buckets");
if (versionInfo != null) {
versionInfo.blockUpdates();
try {