You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2015/11/24 19:30:41 UTC
svn commit: r1716229 - in /lucene/dev/trunk/solr/core/src:
java/org/apache/solr/handler/ java/org/apache/solr/update/
test/org/apache/solr/cloud/
Author: erick
Date: Tue Nov 24 18:30:41 2015
New Revision: 1716229
URL: http://svn.apache.org/viewvc?rev=1716229&view=rev
Log:
SOLR-8263: Tlog replication could interfere with the replay of buffered updates
Modified:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java?rev=1716229&r1=1716228&r2=1716229&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java Tue Nov 24 18:30:41 2015
@@ -87,6 +87,7 @@ import org.apache.solr.search.SolrIndexS
import org.apache.solr.update.CdcrUpdateLog;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.VersionInfo;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.FileUtils;
import org.apache.solr.util.PropertiesInputStream;
@@ -1052,11 +1053,30 @@ public class IndexFetcher {
private boolean moveTlogFiles(File tmpTlogDir) {
UpdateLog ulog = solrCore.getUpdateHandler().getUpdateLog();
- // reset the update log before copying the new tlog directory, it will be reinitialized
- // during the core reload
- ((CdcrUpdateLog) ulog).reset();
- // try to move the temp tlog files to the tlog directory
- if (!copyTmpTlogFiles2Tlog(tmpTlogDir)) return false;
+ VersionInfo vinfo = ulog.getVersionInfo();
+ vinfo.blockUpdates(); // block updates until the new update log is initialised
+ try {
+ // reset the update log before copying the new tlog directory
+ CdcrUpdateLog.BufferedUpdates bufferedUpdates = ((CdcrUpdateLog) ulog).resetForRecovery();
+ // try to move the temp tlog files to the tlog directory
+ if (!copyTmpTlogFiles2Tlog(tmpTlogDir)) return false;
+ // reinitialise the update log and copy the buffered updates
+ if (bufferedUpdates.tlog != null) {
+ // map file path to its new backup location
+ File parentDir = FileSystems.getDefault().getPath(solrCore.getUpdateHandler().getUpdateLog().getLogDir()).getParent().toFile();
+ File backupTlogDir = new File(parentDir, tmpTlogDir.getName());
+ bufferedUpdates.tlog = new File(backupTlogDir, bufferedUpdates.tlog.getName());
+ }
+ // init the update log with the new set of tlog files, and copy the buffered updates
+ ((CdcrUpdateLog) ulog).initForRecovery(bufferedUpdates.tlog, bufferedUpdates.offset);
+ }
+ catch (Exception e) {
+ LOG.error("Unable to copy tlog files", e);
+ return false;
+ }
+ finally {
+ vinfo.unblockUpdates();
+ }
return true;
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java?rev=1716229&r1=1716228&r2=1716229&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java Tue Nov 24 18:30:41 2015
@@ -20,6 +20,7 @@ package org.apache.solr.update;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
@@ -27,8 +28,15 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
+import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.update.processor.DistributingUpdateProcessorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -239,13 +247,23 @@ public class CdcrUpdateLog extends Updat
}
/**
- * expert: Reset the update log before initialisation. This is needed by the IndexFetcher during a
+ * expert: Reset the update log before initialisation. This is called by
+ * {@link org.apache.solr.handler.IndexFetcher#moveTlogFiles(File)} during a
* a Recovery operation in order to re-initialise the UpdateLog with a new set of tlog files.
+ * @see #initForRecovery(File, long)
*/
- public void reset() {
- synchronized (this) {
+ public BufferedUpdates resetForRecovery() {
+ synchronized (this) { // since we blocked updates in IndexFetcher, this synchronization shouldn't strictly be necessary.
+ // If we are buffering, we need to return the related information to the index fetcher
+ // for properly initialising the new update log - SOLR-8263
+ BufferedUpdates bufferedUpdates = new BufferedUpdates();
+ if (state == State.BUFFERING && tlog != null) {
+ bufferedUpdates.tlog = tlog.tlogFile; // file to keep
+ bufferedUpdates.offset = this.recoveryInfo.positionOfStart;
+ }
+
// Close readers
- for (CdcrLogReader reader : new ArrayList<>(logPointers.keySet())) {
+ for (CdcrLogReader reader : logPointers.keySet()) {
reader.close();
}
logPointers.clear();
@@ -268,13 +286,163 @@ public class CdcrUpdateLog extends Updat
if (prevMap != null) prevMap.clear();
if (prevMap2 != null) prevMap2.clear();
+ tlogFiles = null;
numOldRecords = 0;
oldDeletes.clear();
deleteByQueries.clear();
- // reset lastDataDir for triggering full #init()
- lastDataDir = null;
+ return bufferedUpdates;
+ }
+ }
+
+ public static class BufferedUpdates {
+ public File tlog;
+ public long offset;
+ }
+
+ /**
+ * <p>
+ * expert: Initialise the update log with a tlog file containing buffered updates. This is called by
+ * {@link org.apache.solr.handler.IndexFetcher#moveTlogFiles(File)} during a Recovery operation.
+ * </p>
+ *
+ * This is mainly a copy of the original {@link UpdateLog#init(UpdateHandler, SolrCore)} method, but modified
+ * to:
+ * <ul>
+ * <li>preserve the same {@link VersionInfo} instance in order to not "unblock" updates, since the
+ * {@link org.apache.solr.handler.IndexFetcher#moveTlogFiles(File)} acquired a write lock from this instance.</li>
+ * <li>copy the buffered updates.</li>
+ * </ul>
+ *
+ * @see #resetForRecovery()
+ */
+ public void initForRecovery(File bufferedTlog, long offset) {
+ tlogFiles = getLogList(tlogDir);
+ id = getLastLogId() + 1; // add 1 since we will create a new log for the next update
+
+ if (debug) {
+ log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
+ }
+
+ TransactionLog oldLog = null;
+ for (String oldLogName : tlogFiles) {
+ File f = new File(tlogDir, oldLogName);
+ try {
+ oldLog = newTransactionLog(f, null, true);
+ addOldLog(oldLog, false); // don't remove old logs on startup since more than one may be uncapped.
+ } catch (Exception e) {
+ SolrException.log(log, "Failure to open existing log file (non fatal) " + f, e);
+ deleteFile(f);
+ }
+ }
+
+ // Record first two logs (oldest first) at startup for potential tlog recovery.
+ // It's possible that at abnormal close both "tlog" and "prevTlog" were uncapped.
+ for (TransactionLog ll : logs) {
+ newestLogsOnStartup.addFirst(ll);
+ if (newestLogsOnStartup.size() >= 2) break;
+ }
+
+ // TODO: these startingVersions assume that we successfully recover from all non-complete tlogs.
+ UpdateLog.RecentUpdates startingUpdates = getRecentUpdates();
+ try {
+ startingVersions = startingUpdates.getVersions(numRecordsToKeep);
+ startingOperation = startingUpdates.getLatestOperation();
+
+ // populate recent deletes list (since we can't get that info from the index)
+ for (int i=startingUpdates.deleteList.size()-1; i>=0; i--) {
+ DeleteUpdate du = startingUpdates.deleteList.get(i);
+ oldDeletes.put(new BytesRef(du.id), new LogPtr(-1,du.version));
+ }
+
+ // populate recent deleteByQuery commands
+ for (int i=startingUpdates.deleteByQueryList.size()-1; i>=0; i--) {
+ Update update = startingUpdates.deleteByQueryList.get(i);
+ List<Object> dbq = (List<Object>) update.log.lookup(update.pointer);
+ long version = (Long) dbq.get(1);
+ String q = (String) dbq.get(2);
+ trackDeleteByQuery(q, version);
+ }
+
+ } finally {
+ startingUpdates.close();
+ }
+
+ // Copy buffered updates
+ if (bufferedTlog != null) {
+ this.copyBufferedUpdates(bufferedTlog, offset);
+ }
+ }
+
+ /**
+ * Read the entries from the given tlog file and replay them as buffered updates.
+ */
+ private void copyBufferedUpdates(File tlogSrc, long offsetSrc) {
+ recoveryInfo = new RecoveryInfo();
+ recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot();
+ state = State.BUFFERING;
+ operationFlags |= FLAG_GAP;
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM, DistributedUpdateProcessor.DistribPhase.FROMLEADER.toString());
+ SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
+
+ CdcrTransactionLog src = new CdcrTransactionLog(tlogSrc, null, true);
+ TransactionLog.LogReader tlogReader = src.getReader(offsetSrc);
+ try {
+ int operationAndFlags = 0;
+ for (; ; ) {
+ Object o = tlogReader.next();
+ if (o == null) break; // we reached the end of the tlog
+ // should currently be a List<Oper,Ver,Doc/Id>
+ List entry = (List) o;
+ operationAndFlags = (Integer) entry.get(0);
+ int oper = operationAndFlags & OPERATION_MASK;
+ long version = (Long) entry.get(1);
+
+ switch (oper) {
+ case UpdateLog.ADD: {
+ SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
+ AddUpdateCommand cmd = new AddUpdateCommand(req);
+ cmd.solrDoc = sdoc;
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.BUFFERING);
+ this.add(cmd);
+ break;
+ }
+ case UpdateLog.DELETE: {
+ byte[] idBytes = (byte[]) entry.get(2);
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.setIndexedId(new BytesRef(idBytes));
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.BUFFERING);
+ this.delete(cmd);
+ break;
+ }
+
+ case UpdateLog.DELETE_BY_QUERY: {
+ String query = (String) entry.get(2);
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.query = query;
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.BUFFERING);
+ this.deleteByQuery(cmd);
+ break;
+ }
+
+ default:
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Invalid Operation! " + oper);
+ }
+
+ }
+ }
+ catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to copy buffered updates", e);
+ }
+ finally {
+ tlogReader.close();
+ src.close();
}
}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java?rev=1716229&r1=1716228&r2=1716229&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java Tue Nov 24 18:30:41 2015
@@ -257,7 +257,7 @@ public class BaseCdcrDistributedZkTest e
Thread.sleep(500);
}
}
- throw new AssertionError("Timeout while trying to assert number of documents on collection: " + collection, lastAssertionError);
+ throw new AssertionError("Timeout while trying to assert number of documents @ " + collection, lastAssertionError);
} finally {
client.close();
}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java?rev=1716229&r1=1716228&r2=1716229&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java Tue Nov 24 18:30:41 2015
@@ -17,17 +17,31 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.apache.lucene.util.LuceneTestCase.Nightly;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Test;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class is testing the cdcr extension to the {@link org.apache.solr.handler.ReplicationHandler} and
+ * {@link org.apache.solr.handler.IndexFetcher}.
+ */
@Nightly
public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
@@ -190,6 +204,84 @@ public class CdcrReplicationHandlerTest
this.assertUpdateLogsEquals(SOURCE_COLLECTION, 15);
}
+ /**
+ * Test the scenario where the slave is killed while the leader is still receiving updates.
+ * The slave should buffer updates while in recovery, then replay them at the end of the recovery.
+ * If updates were properly buffered and replayed, then the slave should have the same number of documents
+ * than the leader. This checks if cdcr tlog replication interferes with buffered updates - SOLR-8263.
+ */
+ @Test
+ @ShardsFixed(num = 2)
+ public void testReplicationWithBufferedUpdates() throws Exception {
+ List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
+
+ AtomicInteger numDocs = new AtomicInteger(0);
+ ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DefaultSolrThreadFactory("cdcr-test-update-scheduler"));
+ executor.scheduleWithFixedDelay(new UpdateThread(numDocs), 10, 10, TimeUnit.MILLISECONDS);
+
+ // Restart the slave node to trigger Replication strategy
+ this.restartServer(slaves.get(0));
+
+ // shutdown the update thread and wait for its completion
+ executor.shutdown();
+ executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+
+ // check that we have the expected number of documents in the cluster
+ assertNumDocs(numDocs.get(), SOURCE_COLLECTION);
+
+ // check that we have the expected number of documents on the slave
+ assertNumDocs(numDocs.get(), slaves.get(0));
+ }
+
+ private void assertNumDocs(int expectedNumDocs, CloudJettyRunner jetty)
+ throws InterruptedException, IOException, SolrServerException {
+ SolrClient client = createNewSolrServer(jetty.url);
+ try {
+ int cnt = 30; // timeout after 15 seconds
+ AssertionError lastAssertionError = null;
+ while (cnt > 0) {
+ try {
+ assertEquals(expectedNumDocs, client.query(new SolrQuery("*:*")).getResults().getNumFound());
+ return;
+ }
+ catch (AssertionError e) {
+ lastAssertionError = e;
+ cnt--;
+ Thread.sleep(500);
+ }
+ }
+ throw new AssertionError("Timeout while trying to assert number of documents @ " + jetty.url, lastAssertionError);
+ } finally {
+ client.close();
+ }
+ }
+
+ private class UpdateThread implements Runnable {
+
+ private AtomicInteger numDocs;
+
+ private UpdateThread(AtomicInteger numDocs) {
+ this.numDocs = numDocs;
+ }
+
+ @Override
+ public void run() {
+ try {
+ List<SolrInputDocument> docs = new ArrayList<>();
+ for (int j = numDocs.get(); j < (numDocs.get() + 10); j++) {
+ docs.add(getDoc(id, Integer.toString(j)));
+ }
+ index(SOURCE_COLLECTION, docs);
+ numDocs.getAndAdd(10);
+ log.info("Sent batch of {} updates - numDocs:{}", docs.size(), numDocs);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
private List<CloudJettyRunner> getShardToSlaveJetty(String collection, String shard) {
List<CloudJettyRunner> jetties = new ArrayList<>(shardToJetty.get(collection).get(shard));
CloudJettyRunner leader = shardToLeaderJetty.get(collection).get(shard);