You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2015/11/24 19:30:41 UTC

svn commit: r1716229 - in /lucene/dev/trunk/solr/core/src: java/org/apache/solr/handler/ java/org/apache/solr/update/ test/org/apache/solr/cloud/

Author: erick
Date: Tue Nov 24 18:30:41 2015
New Revision: 1716229

URL: http://svn.apache.org/viewvc?rev=1716229&view=rev
Log:
SOLR-8263: Tlog replication could interfere with the replay of buffered updates

Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java?rev=1716229&r1=1716228&r2=1716229&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java Tue Nov 24 18:30:41 2015
@@ -87,6 +87,7 @@ import org.apache.solr.search.SolrIndexS
 import org.apache.solr.update.CdcrUpdateLog;
 import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.VersionInfo;
 import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.FileUtils;
 import org.apache.solr.util.PropertiesInputStream;
@@ -1052,11 +1053,30 @@ public class IndexFetcher {
   private boolean moveTlogFiles(File tmpTlogDir) {
     UpdateLog ulog = solrCore.getUpdateHandler().getUpdateLog();
 
-    // reset the update log before copying the new tlog directory, it will be reinitialized
-    // during the core reload
-    ((CdcrUpdateLog) ulog).reset();
-    // try to move the temp tlog files to the tlog directory
-    if (!copyTmpTlogFiles2Tlog(tmpTlogDir)) return false;
+    VersionInfo vinfo = ulog.getVersionInfo();
+    vinfo.blockUpdates(); // block updates until the new update log is initialised
+    try {
+      // reset the update log before copying the new tlog directory
+      CdcrUpdateLog.BufferedUpdates bufferedUpdates = ((CdcrUpdateLog) ulog).resetForRecovery();
+      // try to move the temp tlog files to the tlog directory
+      if (!copyTmpTlogFiles2Tlog(tmpTlogDir)) return false;
+      // reinitialise the update log and copy the buffered updates
+      if (bufferedUpdates.tlog != null) {
+        // map file path to its new backup location
+        File parentDir = FileSystems.getDefault().getPath(solrCore.getUpdateHandler().getUpdateLog().getLogDir()).getParent().toFile();
+        File backupTlogDir = new File(parentDir, tmpTlogDir.getName());
+        bufferedUpdates.tlog = new File(backupTlogDir, bufferedUpdates.tlog.getName());
+      }
+      // init the update log with the new set of tlog files, and copy the buffered updates
+      ((CdcrUpdateLog) ulog).initForRecovery(bufferedUpdates.tlog, bufferedUpdates.offset);
+    }
+    catch (Exception e) {
+      LOG.error("Unable to copy tlog files", e);
+      return false;
+    }
+    finally {
+      vinfo.unblockUpdates();
+    }
     return true;
   }
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java?rev=1716229&r1=1716228&r2=1716229&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java Tue Nov 24 18:30:41 2015
@@ -20,6 +20,7 @@ package org.apache.solr.update;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Locale;
@@ -27,8 +28,15 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
 
+import org.apache.lucene.util.BytesRef;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.update.processor.DistributingUpdateProcessorFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -239,13 +247,23 @@ public class CdcrUpdateLog extends Updat
   }
 
   /**
-   * expert: Reset the update log before initialisation. This is needed by the IndexFetcher during a
+   * expert: Reset the update log before initialisation. This is called by
+   * {@link org.apache.solr.handler.IndexFetcher#moveTlogFiles(File)} during a
    * a Recovery operation in order to re-initialise the UpdateLog with a new set of tlog files.
+   * @see #initForRecovery(File, long)
    */
-  public void reset() {
-    synchronized (this) {
+  public BufferedUpdates resetForRecovery() {
+    synchronized (this) { // since we blocked updates in IndexFetcher, this synchronization shouldn't strictly be necessary.
+      // If we are buffering, we need to return the related information to the index fetcher
+      // for properly initialising the new update log - SOLR-8263
+      BufferedUpdates bufferedUpdates = new BufferedUpdates();
+      if (state == State.BUFFERING && tlog != null) {
+        bufferedUpdates.tlog = tlog.tlogFile; // file to keep
+        bufferedUpdates.offset = this.recoveryInfo.positionOfStart;
+      }
+
       // Close readers
-      for (CdcrLogReader reader : new ArrayList<>(logPointers.keySet())) {
+      for (CdcrLogReader reader : logPointers.keySet()) {
         reader.close();
       }
       logPointers.clear();
@@ -268,13 +286,163 @@ public class CdcrUpdateLog extends Updat
       if (prevMap != null) prevMap.clear();
       if (prevMap2 != null) prevMap2.clear();
 
+      tlogFiles = null;
       numOldRecords = 0;
 
       oldDeletes.clear();
       deleteByQueries.clear();
 
-      // reset lastDataDir for triggering full #init()
-      lastDataDir = null;
+      return bufferedUpdates;
+    }
+  }
+
+  public static class BufferedUpdates {
+    public File tlog;
+    public long offset;
+  }
+
+  /**
+   * <p>
+   *   expert: Initialise the update log with a tlog file containing buffered updates. This is called by
+   *   {@link org.apache.solr.handler.IndexFetcher#moveTlogFiles(File)} during a Recovery operation.
+   * </p>
+   *
+   *   This is mainly a copy of the original {@link UpdateLog#init(UpdateHandler, SolrCore)} method, but modified
+   *   to:
+   *   <ul>
+   *     <li>preserve the same {@link VersionInfo} instance in order to not "unblock" updates, since the
+   *     {@link org.apache.solr.handler.IndexFetcher#moveTlogFiles(File)} acquired a write lock from this instance.</li>
+   *     <li>copy the buffered updates.</li>
+   *   </ul>
+   *
+   * @see #resetForRecovery()
+   */
+  public void initForRecovery(File bufferedTlog, long offset) {
+    tlogFiles = getLogList(tlogDir);
+    id = getLastLogId() + 1;   // add 1 since we will create a new log for the next update
+
+    if (debug) {
+      log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
+    }
+
+    TransactionLog oldLog = null;
+    for (String oldLogName : tlogFiles) {
+      File f = new File(tlogDir, oldLogName);
+      try {
+        oldLog = newTransactionLog(f, null, true);
+        addOldLog(oldLog, false);  // don't remove old logs on startup since more than one may be uncapped.
+      } catch (Exception e) {
+        SolrException.log(log, "Failure to open existing log file (non fatal) " + f, e);
+        deleteFile(f);
+      }
+    }
+
+    // Record first two logs (oldest first) at startup for potential tlog recovery.
+    // It's possible that at abnormal close both "tlog" and "prevTlog" were uncapped.
+    for (TransactionLog ll : logs) {
+      newestLogsOnStartup.addFirst(ll);
+      if (newestLogsOnStartup.size() >= 2) break;
+    }
+
+    // TODO: these startingVersions assume that we successfully recover from all non-complete tlogs.
+    UpdateLog.RecentUpdates startingUpdates = getRecentUpdates();
+    try {
+      startingVersions = startingUpdates.getVersions(numRecordsToKeep);
+      startingOperation = startingUpdates.getLatestOperation();
+
+      // populate recent deletes list (since we can't get that info from the index)
+      for (int i=startingUpdates.deleteList.size()-1; i>=0; i--) {
+        DeleteUpdate du = startingUpdates.deleteList.get(i);
+        oldDeletes.put(new BytesRef(du.id), new LogPtr(-1,du.version));
+      }
+
+      // populate recent deleteByQuery commands
+      for (int i=startingUpdates.deleteByQueryList.size()-1; i>=0; i--) {
+        Update update = startingUpdates.deleteByQueryList.get(i);
+        List<Object> dbq = (List<Object>) update.log.lookup(update.pointer);
+        long version = (Long) dbq.get(1);
+        String q = (String) dbq.get(2);
+        trackDeleteByQuery(q, version);
+      }
+
+    } finally {
+      startingUpdates.close();
+    }
+
+    // Copy buffered updates
+    if (bufferedTlog != null) {
+      this.copyBufferedUpdates(bufferedTlog, offset);
+    }
+  }
+
+  /**
+   * Read the entries from the given tlog file and replay them as buffered updates.
+   */
+  private void copyBufferedUpdates(File tlogSrc, long offsetSrc) {
+    recoveryInfo = new RecoveryInfo();
+    recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot();
+    state = State.BUFFERING;
+    operationFlags |= FLAG_GAP;
+
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM, DistributedUpdateProcessor.DistribPhase.FROMLEADER.toString());
+    SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
+
+    CdcrTransactionLog src = new CdcrTransactionLog(tlogSrc, null, true);
+    TransactionLog.LogReader tlogReader = src.getReader(offsetSrc);
+    try {
+      int operationAndFlags = 0;
+      for (; ; ) {
+        Object o = tlogReader.next();
+        if (o == null) break; // we reached the end of the tlog
+        // should currently be a List<Oper,Ver,Doc/Id>
+        List entry = (List) o;
+        operationAndFlags = (Integer) entry.get(0);
+        int oper = operationAndFlags & OPERATION_MASK;
+        long version = (Long) entry.get(1);
+
+        switch (oper) {
+          case UpdateLog.ADD: {
+            SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
+            AddUpdateCommand cmd = new AddUpdateCommand(req);
+            cmd.solrDoc = sdoc;
+            cmd.setVersion(version);
+            cmd.setFlags(UpdateCommand.BUFFERING);
+            this.add(cmd);
+            break;
+          }
+          case UpdateLog.DELETE: {
+            byte[] idBytes = (byte[]) entry.get(2);
+            DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+            cmd.setIndexedId(new BytesRef(idBytes));
+            cmd.setVersion(version);
+            cmd.setFlags(UpdateCommand.BUFFERING);
+            this.delete(cmd);
+            break;
+          }
+
+          case UpdateLog.DELETE_BY_QUERY: {
+            String query = (String) entry.get(2);
+            DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+            cmd.query = query;
+            cmd.setVersion(version);
+            cmd.setFlags(UpdateCommand.BUFFERING);
+            this.deleteByQuery(cmd);
+            break;
+          }
+
+          default:
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Invalid Operation! " + oper);
+        }
+
+      }
+    }
+    catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to copy buffered updates", e);
+    }
+    finally {
+      tlogReader.close();
+      src.close();
     }
   }
 

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java?rev=1716229&r1=1716228&r2=1716229&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java Tue Nov 24 18:30:41 2015
@@ -257,7 +257,7 @@ public class BaseCdcrDistributedZkTest e
           Thread.sleep(500);
         }
       }
-      throw new AssertionError("Timeout while trying to assert number of documents on collection: " + collection, lastAssertionError);
+      throw new AssertionError("Timeout while trying to assert number of documents @ " + collection, lastAssertionError);
     } finally {
       client.close();
     }

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java?rev=1716229&r1=1716228&r2=1716229&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java Tue Nov 24 18:30:41 2015
@@ -17,17 +17,31 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
 import org.apache.lucene.util.LuceneTestCase.Nightly;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class is testing the cdcr extension to the {@link org.apache.solr.handler.ReplicationHandler} and
+ * {@link org.apache.solr.handler.IndexFetcher}.
+ */
 @Nightly
 public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
 
@@ -190,6 +204,84 @@ public class CdcrReplicationHandlerTest
     this.assertUpdateLogsEquals(SOURCE_COLLECTION, 15);
   }
 
+  /**
+   * Test the scenario where the slave is killed while the leader is still receiving updates.
+   * The slave should buffer updates while in recovery, then replay them at the end of the recovery.
+   * If updates were properly buffered and replayed, then the slave should have the same number of documents
+   * than the leader. This checks if cdcr tlog replication interferes with buffered updates - SOLR-8263.
+   */
+  @Test
+  @ShardsFixed(num = 2)
+  public void testReplicationWithBufferedUpdates() throws Exception {
+    List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
+
+    AtomicInteger numDocs = new AtomicInteger(0);
+    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DefaultSolrThreadFactory("cdcr-test-update-scheduler"));
+    executor.scheduleWithFixedDelay(new UpdateThread(numDocs), 10, 10, TimeUnit.MILLISECONDS);
+
+    // Restart the slave node to trigger Replication strategy
+    this.restartServer(slaves.get(0));
+
+    // shutdown the update thread and wait for its completion
+    executor.shutdown();
+    executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+
+    // check that we have the expected number of documents in the cluster
+    assertNumDocs(numDocs.get(), SOURCE_COLLECTION);
+
+    // check that we have the expected number of documents on the slave
+    assertNumDocs(numDocs.get(), slaves.get(0));
+  }
+
+  private void assertNumDocs(int expectedNumDocs, CloudJettyRunner jetty)
+  throws InterruptedException, IOException, SolrServerException {
+    SolrClient client = createNewSolrServer(jetty.url);
+    try {
+      int cnt = 30; // timeout after 15 seconds
+      AssertionError lastAssertionError = null;
+      while (cnt > 0) {
+        try {
+          assertEquals(expectedNumDocs, client.query(new SolrQuery("*:*")).getResults().getNumFound());
+          return;
+        }
+        catch (AssertionError e) {
+          lastAssertionError = e;
+          cnt--;
+          Thread.sleep(500);
+        }
+      }
+      throw new AssertionError("Timeout while trying to assert number of documents @ " + jetty.url, lastAssertionError);
+    } finally {
+      client.close();
+    }
+  }
+
+  private class UpdateThread implements Runnable {
+
+    private AtomicInteger numDocs;
+
+    private UpdateThread(AtomicInteger numDocs) {
+      this.numDocs = numDocs;
+    }
+
+    @Override
+    public void run() {
+      try {
+        List<SolrInputDocument> docs = new ArrayList<>();
+        for (int j = numDocs.get(); j < (numDocs.get() + 10); j++) {
+          docs.add(getDoc(id, Integer.toString(j)));
+        }
+        index(SOURCE_COLLECTION, docs);
+        numDocs.getAndAdd(10);
+        log.info("Sent batch of {} updates - numDocs:{}", docs.size(), numDocs);
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+  }
+
   private List<CloudJettyRunner> getShardToSlaveJetty(String collection, String shard) {
     List<CloudJettyRunner> jetties = new ArrayList<>(shardToJetty.get(collection).get(shard));
     CloudJettyRunner leader = shardToLeaderJetty.get(collection).get(shard);