You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by an...@apache.org on 2015/03/07 12:40:38 UTC

svn commit: r1664825 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/update/ core/src/test/org/apache/solr/search/ test-framework/src/java/org/apache/solr/

Author: andyetitmoves
Date: Sat Mar  7 11:40:37 2015
New Revision: 1664825

URL: http://svn.apache.org/r1664825
Log:
SOLR-6359: Allow customization of the number of records and logs kept by UpdateLog

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java
    lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/JSONTestUtil.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1664825&r1=1664824&r2=1664825&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Sat Mar  7 11:40:37 2015
@@ -141,6 +141,9 @@ New Features
 
 * SOLR-7073: Support adding a jar to a collections classpath (Noble Paul)
 
+* SOLR-6359: Allow number of logs and records kept by UpdateLog to be configured
+  (Ramkumar Aiyengar)
+
 Bug Fixes
 ----------------------
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1664825&r1=1664824&r2=1664825&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Sat Mar  7 11:40:37 2015
@@ -259,7 +259,7 @@ public class RecoveryStrategy extends Th
     UpdateLog.RecentUpdates recentUpdates = null;
     try {
       recentUpdates = ulog.getRecentUpdates();
-      recentVersions = recentUpdates.getVersions(ulog.numRecordsToKeep);
+      recentVersions = recentUpdates.getVersions(ulog.getNumRecordsToKeep());
     } catch (Exception e) {
       SolrException.log(log, "Corrupt tlog - ignoring. core=" + coreName, e);
       recentVersions = new ArrayList<>(0);
@@ -382,7 +382,7 @@ public class RecoveryStrategy extends Th
           // System.out.println("Attempting to PeerSync from " + leaderUrl
           // + " i am:" + zkController.getNodeName());
           PeerSync peerSync = new PeerSync(core,
-              Collections.singletonList(leaderUrl), ulog.numRecordsToKeep, false, false);
+              Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false);
           peerSync.setStartingVersions(recentVersions);
           boolean syncSuccess = peerSync.sync();
           if (syncSuccess) {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java?rev=1664825&r1=1664824&r2=1664825&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java Sat Mar  7 11:40:37 2015
@@ -163,7 +163,7 @@ public class SyncStrategy {
     // if we can't reach a replica for sync, we still consider the overall sync a success
     // TODO: as an assurance, we should still try and tell the sync nodes that we couldn't reach
     // to recover once more?
-    PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().numRecordsToKeep, true, true, peerSyncOnlyWithActive);
+    PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true, peerSyncOnlyWithActive);
     return peerSync.sync();
   }
   

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java?rev=1664825&r1=1664824&r2=1664825&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java Sat Mar  7 11:40:37 2015
@@ -84,7 +84,12 @@ public class HdfsUpdateLog extends Updat
     
     defaultSyncLevel = SyncLevel.getSyncLevel((String) info.initArgs
         .get("syncLevel"));
-    
+
+    numRecordsToKeep = objToInt(info.initArgs.get("numRecordsToKeep"), 100);
+    maxNumLogsToKeep = objToInt(info.initArgs.get("maxNumLogsToKeep"), 10);
+
+    log.info("Initializing HdfsUpdateLog: dataDir={} defaultSyncLevel={} numRecordsToKeep={} maxNumLogsToKeep={}",
+        dataDir, defaultSyncLevel, numRecordsToKeep, maxNumLogsToKeep);
   }
 
   private Configuration getConf() {
@@ -217,7 +222,7 @@ public class HdfsUpdateLog extends Updat
     // non-complete tlogs.
     HdfsUpdateLog.RecentUpdates startingUpdates = getRecentUpdates();
     try {
-      startingVersions = startingUpdates.getVersions(numRecordsToKeep);
+      startingVersions = startingUpdates.getVersions(getNumRecordsToKeep());
       startingOperation = startingUpdates.getLatestOperation();
       
       // populate recent deletes list (since we can't get that info from the

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1664825&r1=1664824&r2=1664825&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java Sat Mar  7 11:40:37 2015
@@ -130,7 +130,7 @@ public class UpdateLog implements Plugin
 
   protected TransactionLog tlog;
   protected TransactionLog prevTlog;
-  protected Deque<TransactionLog> logs = new LinkedList<>();  // list of recent logs, newest first
+  protected final Deque<TransactionLog> logs = new LinkedList<>();  // list of recent logs, newest first
   protected LinkedList<TransactionLog> newestLogsOnStartup = new LinkedList<>();
   protected int numOldRecords;  // number of records in the recent logs
 
@@ -142,7 +142,8 @@ public class UpdateLog implements Plugin
 
   protected final int numDeletesToKeep = 1000;
   protected final int numDeletesByQueryToKeep = 100;
-  public final int numRecordsToKeep = 100;
+  protected int numRecordsToKeep;
+  protected int maxNumLogsToKeep;
 
   // keep track of deletes only... this is not updated on an add
   protected LinkedHashMap<BytesRef, LogPtr> oldDeletes = new LinkedHashMap<BytesRef, LogPtr>(numDeletesToKeep) {
@@ -215,10 +216,31 @@ public class UpdateLog implements Plugin
     return versionInfo;
   }
 
+  public int getNumRecordsToKeep() {
+    return numRecordsToKeep;
+  }
+
+  public int getMaxNumLogsToKeep() {
+    return maxNumLogsToKeep;
+  }
+
+  protected static int objToInt(Object obj, int def) {
+    if (obj != null) {
+      return Integer.parseInt(obj.toString());
+    }
+    else return def;
+  }
+
   @Override
   public void init(PluginInfo info) {
     dataDir = (String)info.initArgs.get("dir");
     defaultSyncLevel = SyncLevel.getSyncLevel((String)info.initArgs.get("syncLevel"));
+
+    numRecordsToKeep = objToInt(info.initArgs.get("numRecordsToKeep"), 100);
+    maxNumLogsToKeep = objToInt(info.initArgs.get("maxNumLogsToKeep"), 10);
+
+    log.info("Initializing UpdateLog: dataDir={} defaultSyncLevel={} numRecordsToKeep={} maxNumLogsToKeep={}",
+        dataDir, defaultSyncLevel, numRecordsToKeep, maxNumLogsToKeep);
   }
 
   /* Note, when this is called, uhandler is not completely constructed.
@@ -335,7 +357,7 @@ public class UpdateLog implements Plugin
       int nrec = log.numRecords();
       // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if
       // we already have the limit of 10 log files.
-      if (currRecords - nrec >= numRecordsToKeep || logs.size() >= 10) {
+      if (currRecords - nrec >= numRecordsToKeep || (maxNumLogsToKeep > 0 && logs.size() >= maxNumLogsToKeep)) {
         currRecords -= nrec;
         numOldRecords -= nrec;
         logs.removeLast().decref();  // dereference so it will be deleted when no longer in use

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java?rev=1664825&r1=1664824&r2=1664825&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java Sat Mar  7 11:40:37 2015
@@ -19,7 +19,6 @@ package org.apache.solr.search;
 
 import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
 
-import org.apache.solr.common.SolrException;
 import org.noggit.ObjectBuilder;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.request.SolrQueryRequest;
@@ -28,7 +27,6 @@ import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.UpdateHandler;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.File;
@@ -46,16 +44,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.solr.SolrTestCaseJ4;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.update.DirectUpdateHandler2;
-import org.apache.solr.update.UpdateHandler;
-import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.noggit.ObjectBuilder;
 
 public class TestRecovery extends SolrTestCaseJ4 {
 
@@ -770,46 +759,63 @@ public class TestRecovery extends SolrTe
 
       createCore();
 
-      int start = 0;
-      int maxReq = 50;
+      int numIndexed = 0;
+      int maxReq = 200;
 
       LinkedList<Long> versions = new LinkedList<>();
-      addDocs(10, start, versions); start+=10;
-      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
-      assertU(commit());
-      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
-
-      addDocs(10, start, versions);  start+=10;
-      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
-      assertU(commit());
-      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
 
-      assertEquals(2, ulog.getLogList(logDir).length);
+      int docsPerBatch = 3;
+      // we don't expect to reach numRecordsToKeep as yet, so the bottleneck is still number of logs to keep
+      int expectedToRetain = ulog.getMaxNumLogsToKeep() * docsPerBatch;
+      int versExpected;
+
+      for (int i = 1; i <= ulog.getMaxNumLogsToKeep() + 2; i ++) {
+        addDocs(docsPerBatch, numIndexed, versions); numIndexed += docsPerBatch;
+        versExpected = Math.min(numIndexed, expectedToRetain + docsPerBatch); // not yet committed, so one more tlog could slip in
+        assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, versExpected)));
+        assertU(commit());
+        versExpected = Math.min(numIndexed, expectedToRetain);
+        assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, versExpected)));
+        assertEquals(Math.min(i, ulog.getMaxNumLogsToKeep()), ulog.getLogList(logDir).length);
+      }
 
-      addDocs(105, start, versions);  start+=105;
-      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
-      assertU(commit());
-      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+      docsPerBatch = ulog.getNumRecordsToKeep() + 20;
+      // about to commit a lot of docs, so numRecordsToKeep becomes the bottleneck
+      expectedToRetain = ulog.getNumRecordsToKeep();
+
+      addDocs(docsPerBatch, numIndexed, versions);  numIndexed+=docsPerBatch;
+      versExpected = Math.min(numIndexed, expectedToRetain);
+      assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, versExpected)));
+      assertU(commit());
+      expectedToRetain = expectedToRetain - 1; // we lose a log entry due to the commit record
+      versExpected = Math.min(numIndexed, expectedToRetain);
+      assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, versExpected)));
 
-      // previous two logs should be gone now
+      // previous logs should be gone now
       assertEquals(1, ulog.getLogList(logDir).length);
 
-      addDocs(1, start, versions);  start+=1;
+      addDocs(1, numIndexed, versions);  numIndexed+=1;
       h.close();
       createCore();      // trigger recovery, make sure that tlog reference handling is correct
 
       // test we can get versions while replay is happening
-      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+      assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, expectedToRetain)));
 
       logReplay.release(1000);
       assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
 
-      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+      expectedToRetain = expectedToRetain - 1; // we lose a log entry due to the commit record made by recovery
+      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,expectedToRetain)));
+
+      docsPerBatch = ulog.getNumRecordsToKeep() + 20;
+      // about to commit a lot of docs, so numRecordsToKeep becomes the bottleneck
+      expectedToRetain = ulog.getNumRecordsToKeep();
 
-      addDocs(105, start, versions);  start+=105;
-      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+      addDocs(docsPerBatch, numIndexed, versions);  numIndexed+=docsPerBatch;
+      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,expectedToRetain)));
       assertU(commit());
-      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+      expectedToRetain = expectedToRetain - 1; // we lose a log entry due to the commit record
+      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,expectedToRetain)));
 
       // previous logs should be gone now
       assertEquals(1, ulog.getLogList(logDir).length);
@@ -817,7 +823,7 @@ public class TestRecovery extends SolrTe
       //
       // test that a corrupt tlog file doesn't stop us from coming up, or seeing versions before that tlog file.
       //
-      addDocs(1, start, new LinkedList<Long>()); // don't add this to the versions list because we are going to lose it...
+      addDocs(1, numIndexed, new LinkedList<Long>()); // don't add this to the versions list because we are going to lose it...
       h.close();
       files = ulog.getLogList(logDir);
       Arrays.sort(files);
@@ -828,7 +834,7 @@ public class TestRecovery extends SolrTe
       ignoreException("Failure to open existing");
       createCore();
       // we should still be able to get the list of versions (not including the trashed log file)
-      assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
+      assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, expectedToRetain)));
       resetExceptionIgnores();
 
     } finally {

Modified: lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/JSONTestUtil.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/JSONTestUtil.java?rev=1664825&r1=1664824&r2=1664825&view=diff
==============================================================================
--- lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/JSONTestUtil.java (original)
+++ lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/JSONTestUtil.java Sat Mar  7 11:40:37 2015
@@ -243,7 +243,7 @@ class CollectionTester {
 
       if (a >= expectedList.size() || b >=v.size()) {
         popPath();
-        setErr("List size mismatch");
+        setErr("List size mismatch (expected: " + expectedList.size() + ", got: " + v.size() + ")");
         return false;
       }