You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/01/21 01:21:27 UTC

[lucene-solr] branch reference_impl_dev updated (39ff7e7 -> 4038d96)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a change to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


    from 39ff7e7  @1263 Only cut off bulk/streaming updates when closed for the leaders.
     new c7f23ce  @1264 Review UpdateLog changes.
     new 4038d96  @1265 Some updates and a hack to help benching.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 lucene/benchmark/build.gradle                      |   3 +-
 .../apache/solr/update/DirectUpdateHandler2.java   |   8 +-
 .../src/java/org/apache/solr/update/UpdateLog.java | 201 ++++++++++++---------
 solr/reference-branch/bench/solr-benchmark.jar     | Bin 37846 -> 37946 bytes
 .../{prod => docker/solr-image}/start-solr.sh      |   0
 5 files changed, 120 insertions(+), 92 deletions(-)
 copy solr/reference-branch/{prod => docker/solr-image}/start-solr.sh (100%)


[lucene-solr] 01/02: @1264 Review UpdateLog changes.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit c7f23ce21a481bdadab044c8c315a0099b5a1b35
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Jan 20 19:18:18 2021 -0600

    @1264 Review UpdateLog changes.
---
 .../apache/solr/update/DirectUpdateHandler2.java   |   8 +-
 .../src/java/org/apache/solr/update/UpdateLog.java | 201 ++++++++++++---------
 2 files changed, 118 insertions(+), 91 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index af952f2..45a3127 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -590,7 +590,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
     boolean error=true;
 
     try {
-      log.debug("start {}", cmd);
+      if (log.isDebugEnabled()) log.debug("start {}", cmd);
       RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
       try {
         SolrIndexWriter.setCommitData(iw.get(), cmd.getVersion());
@@ -599,7 +599,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
         iw.decref();
       }
 
-      log.debug("end_prepareCommit");
+      if (log.isDebugEnabled()) log.debug("end_prepareCommit");
 
       error=false;
     }
@@ -842,7 +842,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
         }
         // we shouldn't close the transaction logs either, but leaving them open
         // means we can't delete them on windows (needed for tests)
-        if (ulog != null) ulog.close(true);
+        if (ulog != null) ulog.close(false);
 
         return;
       }
@@ -896,7 +896,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
     }
 
     try {
-      if (ulog != null) ulog.close(true);
+      if (ulog != null) ulog.close(false);
     } catch (Throwable th) {
       log.error("Error closing log files", th);
       if (th instanceof OutOfMemoryError) {
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 09754a7..0f51d41 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -369,7 +369,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
    * This must be called when a new log is created, or
    * for an existing log whenever the core or update handler changes.
    */
-  public synchronized void init(UpdateHandler uhandler, SolrCore core) {
+  public void init(UpdateHandler uhandler, SolrCore core) {
     if (dataDir != null) {
       assert ObjectReleaseTracker.release(this);
     }
@@ -377,6 +377,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
     // tracking in this init call that can be recalled, and we create in the updatehandler
     // but then the solrcorestate is what closes ...
     // assert ObjectReleaseTracker.track(this);
+    tlogLock.lock();
     try {
       dataDir = core.getUlogDir();
 
@@ -462,6 +463,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
         throw e;
       }
       throw new SolrException(ErrorCode.SERVER_ERROR, e);
+    } finally {
+      tlogLock.unlock();
     }
   }
 
@@ -524,34 +527,38 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
   /* Takes over ownership of the log, keeping it until no longer needed
      and then decrementing its reference and dropping it.
    */
-  protected synchronized void addOldLog(TransactionLog oldLog, boolean removeOld) {
+  protected void addOldLog(TransactionLog oldLog, boolean removeOld) {
     if (oldLog == null) return;
+    tlogLock.lock();
+    try {
+      numOldRecords += oldLog.numRecords();
 
-    numOldRecords += oldLog.numRecords();
+      int currRecords = numOldRecords;
 
-    int currRecords = numOldRecords;
+      if (oldLog != tlog && tlog != null) {
+        currRecords += tlog.numRecords();
+      }
 
-    if (oldLog != tlog &&  tlog != null) {
-      currRecords += tlog.numRecords();
-    }
+      while (removeOld && logs.size() > 0) {
+        TransactionLog log = logs.peekLast();
+        int nrec = log.numRecords();
+        // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if
+        // we already have the limit of 10 log files.
+        if (currRecords - nrec >= numRecordsToKeep || (maxNumLogsToKeep > 0 && logs.size() >= maxNumLogsToKeep)) {
+          currRecords -= nrec;
+          numOldRecords -= nrec;
+          logs.removeLast().decref();  // dereference so it will be deleted when no longer in use
+          continue;
+        }
 
-    while (removeOld && logs.size() > 0) {
-      TransactionLog log = logs.peekLast();
-      int nrec = log.numRecords();
-      // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if
-      // we already have the limit of 10 log files.
-      if (currRecords - nrec >= numRecordsToKeep || (maxNumLogsToKeep > 0 && logs.size() >= maxNumLogsToKeep)) {
-        currRecords -= nrec;
-        numOldRecords -= nrec;
-        logs.removeLast().decref();  // dereference so it will be deleted when no longer in use
-        continue;
+        break;
       }
 
-      break;
+      // don't incref... we are taking ownership from the caller.
+      logs.addFirst(oldLog);
+    } finally {
+      tlogLock.unlock();
     }
-
-    // don't incref... we are taking ownership from the caller.
-    logs.addFirst(oldLog);
   }
 
   public String[] getBufferLogList(File directory) {
@@ -720,7 +727,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
         trackDeleteByQuery(cmd.getQuery(), cmd.getVersion());
 
         if (trace) {
-          // TODO: sync
           LogPtr ptr = new LogPtr(pos, cmd.getVersion());
           int hash = System.identityHashCode(map);
           log.trace("TLOG: added deleteByQuery {} to {} {} map = {}.", cmd.query, tlog, ptr, hash);
@@ -880,6 +886,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
         // the current transaction log.  This normally shouldn't happen
         // as DistributedUpdateProcessor will prevent this.  Commits
         // that don't use the processor are possible though.
+        log.info("Won't roll transaction log because not ACTIVE or a replay");
         return;
       }
 
@@ -888,19 +895,20 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
 
       if (prevTlog != null) {
         globalStrings = prevTlog.getGlobalStrings();
+      }
 
-        // since document additions can happen concurrently with commit, create
-        // a new transaction log first so that we know the old one is definitely
-        // in the index.
-        if (prevTlog != null) {
-          // postCommit for prevTlog is not called, may be the index is corrupted
-          // if we override prevTlog value, the correspond tlog will be leaked, close it first
-          postCommit(cmd);
-        }
-        prevTlog = tlog;
-        tlog = null;
-        id++;
+      // since document additions can happen concurrently with commit, create
+      // a new transaction log first so that we know the old one is definitely
+      // in the index.
+      if (prevTlog != null) {
+        // postCommit for prevTlog is not called, may be the index is corrupted
+        // if we override prevTlog value, the correspond tlog will be leaked, close it first
+        postCommit(cmd);
       }
+      prevTlog = tlog;
+      tlog = null;
+      id++;
+
     } finally {
       tlogLock.unlock();
     }
@@ -1052,35 +1060,40 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
    * @return The entry if found, otherwise null
    */
   @SuppressWarnings({"rawtypes"})
-  private synchronized List getEntryFromTLog(long lookupPointer, long lookupVersion, List<TransactionLog> lookupLogs) {
-    for (TransactionLog lookupLog : lookupLogs) {
-      if (lookupLog != null && lookupLog.getLogSize() > lookupPointer) {
-        lookupLog.incref();
-        try {
-          Object obj = null;
-
+  private List getEntryFromTLog(long lookupPointer, long lookupVersion, List<TransactionLog> lookupLogs) {
+    tlogLock.lock();
+    try {
+      for (TransactionLog lookupLog : lookupLogs) {
+        if (lookupLog != null && lookupLog.getLogSize() > lookupPointer) {
+          lookupLog.incref();
           try {
-            obj = lookupLog.lookup(lookupPointer);
-          } catch (Exception | AssertionError ex) {
-            // This can happen when trying to deserialize the entry at position lookupPointer,
-            // but from a different tlog than the one containing the desired entry.
-            // Just ignore the exception, so as to proceed to the next tlog.
-            log.info("Exception reading the log (this is expected, don't worry)={}, for version={}. This can be ignored"
-                , lookupLog, lookupVersion);
-          }
+            Object obj = null;
 
-          if (obj != null && obj instanceof List) {
-            List tmpEntry = (List) obj;
-            if (tmpEntry.size() >= 2 && 
-                (tmpEntry.get(UpdateLog.VERSION_IDX) instanceof Long) &&
-                (tmpEntry.get(UpdateLog.VERSION_IDX)).equals(lookupVersion)) {
-              return tmpEntry;
+            try {
+              obj = lookupLog.lookup(lookupPointer);
+            } catch (Exception | AssertionError ex) {
+              // This can happen when trying to deserialize the entry at position lookupPointer,
+              // but from a different tlog than the one containing the desired entry.
+              // Just ignore the exception, so as to proceed to the next tlog.
+              log.info("Exception reading the log (this is expected, don't worry)={}, for version={}. This can be ignored"
+                  , lookupLog, lookupVersion);
             }
+
+            if (obj != null && obj instanceof List) {
+              List tmpEntry = (List) obj;
+              if (tmpEntry.size() >= 2 &&
+                  (tmpEntry.get(UpdateLog.VERSION_IDX) instanceof Long) &&
+                  (tmpEntry.get(UpdateLog.VERSION_IDX)).equals(lookupVersion)) {
+                return tmpEntry;
+              }
+            }
+          } finally {
+            lookupLog.decref();
           }
-        } finally {
-          lookupLog.decref();
         }
       }
+    } finally {
+      tlogLock.unlock();
     }
     return null;
   }
@@ -1129,20 +1142,25 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
   // synchronization is needed for stronger guarantees (as VersionUpdateProcessor does).
   public Long lookupVersion(BytesRef indexedId) {
     LogPtr entry;
-
-    entry = map.get(indexedId);
-    // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
-    if (entry == null && prevMap != null) {
-      entry = prevMap.get(indexedId);
-      // something found in prevMap will always be found in prevMapLog (which could be tlog or prevTlog)
-      // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
-    }
-    if (entry == null && prevMap2 != null) {
-      entry = prevMap2.get(indexedId);
-      // something found in prevMap2 will always be found in prevMapLog2 (which could be tlog or prevTlog)
-      // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+    tlogLock.lock();
+    try {
+      entry = map.get(indexedId);
+      // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+      if (entry == null && prevMap != null) {
+        entry = prevMap.get(indexedId);
+        // something found in prevMap will always be found in prevMapLog (which could be tlog or prevTlog)
+        // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+      }
+      if (entry == null && prevMap2 != null) {
+        entry = prevMap2.get(indexedId);
+        // something found in prevMap2 will always be found in prevMapLog2 (which could be tlog or prevTlog)
+        // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+      }
+    } finally {
+      tlogLock.unlock();
     }
 
+
     if (entry != null) {
       return entry.version;
     }
@@ -1156,8 +1174,12 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
 
     // We can't get any version info for deletes from the index, so if the doc
     // wasn't found, check a cache of recent deletes.
-
-    entry = oldDeletes.get(indexedId);
+    tlogLock.lock();
+    try {
+      entry = oldDeletes.get(indexedId);
+    } finally {
+      tlogLock.unlock();
+    }
 
     if (entry != null) {
       return entry.version;
@@ -1173,10 +1195,15 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
     if (syncLevel == SyncLevel.NONE) {
       return;
     }
-
-    TransactionLog currLog = tlog;
-    if (currLog == null) return;
-    currLog.incref();
+    TransactionLog currLog;
+    tlogLock.lock();
+    try {
+      currLog = tlog;
+      if (currLog == null) return;
+      currLog.incref();
+    } finally {
+      tlogLock.unlock();
+    }
 
     try {
       currLog.finish(syncLevel);
@@ -1300,16 +1327,20 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
   public void commitAndSwitchToNewTlog(CommitUpdateCommand cuc) {
     versionInfo.blockUpdates();
     try {
-      if (tlog == null) {
-        return;
-      }
-      preCommit(cuc);
+      tlogLock.lock();
       try {
-        copyOverOldUpdates(cuc.getVersion());
+        if (tlog == null) {
+          return;
+        }
+        preCommit(cuc);
+        try {
+          copyOverOldUpdates(cuc.getVersion());
+        } finally {
+          postCommit(cuc);
+        }
       } finally {
-        postCommit(cuc);
+        tlogLock.unlock();
       }
-
     } finally {
       versionInfo.unblockUpdates();
     }
@@ -1834,11 +1865,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
     ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(ParWork.getRootSharedExecutor());
     LogReplayer replayer = new LogReplayer(Collections.singletonList(bufferTlog), true);
     return cs.submit(() -> {
-      try {
-        replayer.run();
-      } catch (AlreadyClosedException e) {
-
-      }
+      replayer.run();
       dropBufferTlog();
     }, recoveryInfo);
   }
@@ -2350,7 +2377,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
   }
 
   public void seedBucketsWithHighestVersion(SolrIndexSearcher newSearcher) {
-    log.debug("Looking up max value of version field to seed version buckets");
+    if (debug) log.debug("Looking up max value of version field to seed version buckets");
     if (versionInfo != null) {
       versionInfo.blockUpdates();
       try {


[lucene-solr] 02/02: @1265 Some updates and a hack to help benching.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 4038d96f974c8d32c7610c8cfc4c59b86a6f2a9d
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Jan 20 19:19:17 2021 -0600

    @1265 Some updates and a hack to help benching.
---
 lucene/benchmark/build.gradle                       |   3 ++-
 solr/reference-branch/bench/solr-benchmark.jar      | Bin 37846 -> 37946 bytes
 .../docker/solr-image/start-solr.sh                 |  10 ++++++++++
 3 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/lucene/benchmark/build.gradle b/lucene/benchmark/build.gradle
index 793fbd6..742ac5a 100644
--- a/lucene/benchmark/build.gradle
+++ b/lucene/benchmark/build.gradle
@@ -48,7 +48,8 @@ def workDir = file("work")
 task run(type: JavaExec) {
   description "Run a perf test (optional: -PtaskAlg=conf/your-algorithm-file -PmaxHeapSize=1G)"
   main 'org.apache.lucene.benchmark.byTask.Benchmark'
-  classpath sourceSets.main.runtimeClasspath
+  classpath sourceSets.main.runtimeClasspath + project.rootProject.files("solr/solrj/build/classes/java/main") +
+          project.rootProject.files("solr/solrj/build/resources/main") + project.rootProject.files("solr/solrj/build/lib/*")
   // allow these to be specified on the CLI via -PtaskAlg=  for example
   args = [propertyOrDefault('taskAlg', 'conf/micro-standard.alg')]
 
diff --git a/solr/reference-branch/bench/solr-benchmark.jar b/solr/reference-branch/bench/solr-benchmark.jar
index 17c2d17..e1ca7c3 100644
Binary files a/solr/reference-branch/bench/solr-benchmark.jar and b/solr/reference-branch/bench/solr-benchmark.jar differ
diff --git a/solr/reference-branch/docker/solr-image/start-solr.sh b/solr/reference-branch/docker/solr-image/start-solr.sh
new file mode 100644
index 0000000..36a291c
--- /dev/null
+++ b/solr/reference-branch/docker/solr-image/start-solr.sh
@@ -0,0 +1,10 @@
+#!/bin/bash
+
+set -x
+
+hostip=$(ip route show | awk '/default/ {print $3}')
+echo "host: $hostip"
+
+
+bash /opt/solr/bin/solr start -c -m "${SOLR_HEAP:-1g}" -z "${ZK_ADDRESS:-zookeeper:2181}" -p ${SOLR_PORT:-9998} -force -f
+