You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2015/05/22 20:58:29 UTC
svn commit: r1681186 [5/5] - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/update/
core/src/java/org/apache/solr/update/processor/
core/src/java/org/apache/solr/util/ core/src/test-files/solr/collect...
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java Fri May 22 18:58:29 2015
@@ -173,7 +173,7 @@ public class TestSolrConfigHandler exten
}
- public static void reqhandlertests(RestTestHarness writeHarness, String testServerBaseUrl, CloudSolrClient cloudSolrServer) throws Exception {
+ public static void reqhandlertests(RestTestHarness writeHarness, String testServerBaseUrl, CloudSolrClient cloudSolrClient) throws Exception {
String payload = "{\n" +
"'create-requesthandler' : { 'name' : '/x', 'class': 'org.apache.solr.handler.DumpRequestHandler' , 'startup' : 'lazy'}\n" +
"}";
@@ -182,7 +182,7 @@ public class TestSolrConfigHandler exten
testForResponseElement(writeHarness,
testServerBaseUrl,
"/config/overlay?wt=json",
- cloudSolrServer,
+ cloudSolrClient,
Arrays.asList("overlay", "requestHandler", "/x", "startup"),
"lazy",
10);
@@ -195,7 +195,7 @@ public class TestSolrConfigHandler exten
testForResponseElement(writeHarness,
testServerBaseUrl,
"/config/overlay?wt=json",
- cloudSolrServer,
+ cloudSolrClient,
Arrays.asList("overlay", "requestHandler", "/x", "a"),
"b",
10);
@@ -203,7 +203,7 @@ public class TestSolrConfigHandler exten
testForResponseElement(writeHarness,
testServerBaseUrl,
"/x?wt=json&getdefaults=true&json.nl=map",
- cloudSolrServer,
+ cloudSolrClient,
Arrays.asList("getdefaults", "def_a"),
"def A val",
10);
@@ -217,7 +217,7 @@ public class TestSolrConfigHandler exten
int maxTimeoutSeconds = 10;
while (TimeUnit.SECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS) < maxTimeoutSeconds) {
String uri = "/config/overlay?wt=json";
- Map m = testServerBaseUrl == null ? getRespMap(uri, writeHarness) : TestSolrConfigHandlerConcurrent.getAsMap(testServerBaseUrl + uri, cloudSolrServer);
+ Map m = testServerBaseUrl == null ? getRespMap(uri, writeHarness) : TestSolrConfigHandlerConcurrent.getAsMap(testServerBaseUrl + uri, cloudSolrClient);
if (null == ConfigOverlay.getObjectByPath(m, true, Arrays.asList("overlay", "requestHandler", "/x", "a"))) {
success = true;
break;
@@ -234,7 +234,7 @@ public class TestSolrConfigHandler exten
testForResponseElement(writeHarness,
testServerBaseUrl,
"/config?wt=json",
- cloudSolrServer,
+ cloudSolrClient,
Arrays.asList("config", "queryConverter", "qc", "class"),
"org.apache.solr.spelling.SpellingQueryConverter",
10);
@@ -245,7 +245,7 @@ public class TestSolrConfigHandler exten
testForResponseElement(writeHarness,
testServerBaseUrl,
"/config?wt=json",
- cloudSolrServer,
+ cloudSolrClient,
Arrays.asList("config", "queryConverter", "qc", "class"),
"org.apache.solr.spelling.SuggestQueryConverter",
10);
@@ -257,7 +257,7 @@ public class TestSolrConfigHandler exten
testForResponseElement(writeHarness,
testServerBaseUrl,
"/config?wt=json",
- cloudSolrServer,
+ cloudSolrClient,
Arrays.asList("config", "queryConverter", "qc"),
null,
10);
@@ -269,7 +269,7 @@ public class TestSolrConfigHandler exten
testForResponseElement(writeHarness,
testServerBaseUrl,
"/config?wt=json",
- cloudSolrServer,
+ cloudSolrClient,
Arrays.asList("config", "searchComponent", "tc", "class"),
"org.apache.solr.handler.component.TermsComponent",
10);
@@ -280,7 +280,7 @@ public class TestSolrConfigHandler exten
testForResponseElement(writeHarness,
testServerBaseUrl,
"/config?wt=json",
- cloudSolrServer,
+ cloudSolrClient,
Arrays.asList("config", "searchComponent", "tc", "class"),
"org.apache.solr.handler.component.TermVectorComponent",
10);
@@ -292,7 +292,7 @@ public class TestSolrConfigHandler exten
testForResponseElement(writeHarness,
testServerBaseUrl,
"/config?wt=json",
- cloudSolrServer,
+ cloudSolrClient,
Arrays.asList("config", "searchComponent", "tc"),
null,
10);
@@ -304,7 +304,7 @@ public class TestSolrConfigHandler exten
testForResponseElement(writeHarness,
testServerBaseUrl,
"/config?wt=json",
- cloudSolrServer,
+ cloudSolrClient,
Arrays.asList("config", "valueSourceParser", "cu", "class"),
"org.apache.solr.core.CountUsageValueSourceParser",
10);
@@ -318,7 +318,7 @@ public class TestSolrConfigHandler exten
testForResponseElement(writeHarness,
testServerBaseUrl,
"/config?wt=json",
- cloudSolrServer,
+ cloudSolrClient,
Arrays.asList("config", "valueSourceParser", "cu", "class"),
"org.apache.solr.search.function.NvlValueSourceParser",
10);
@@ -330,7 +330,7 @@ public class TestSolrConfigHandler exten
testForResponseElement(writeHarness,
testServerBaseUrl,
"/config?wt=json",
- cloudSolrServer,
+ cloudSolrClient,
Arrays.asList("config", "valueSourceParser", "cu"),
null,
10);
@@ -344,7 +344,7 @@ public class TestSolrConfigHandler exten
testForResponseElement(writeHarness,
testServerBaseUrl,
"/config?wt=json",
- cloudSolrServer,
+ cloudSolrClient,
Arrays.asList("config", "transformer", "mytrans", "class"),
"org.apache.solr.response.transform.ValueAugmenterFactory",
10);
@@ -356,7 +356,7 @@ public class TestSolrConfigHandler exten
testForResponseElement(writeHarness,
testServerBaseUrl,
"/config?wt=json",
- cloudSolrServer,
+ cloudSolrClient,
Arrays.asList("config", "transformer", "mytrans", "value"),
"6",
10);
@@ -369,7 +369,7 @@ public class TestSolrConfigHandler exten
Map map = testForResponseElement(writeHarness,
testServerBaseUrl,
"/config?wt=json",
- cloudSolrServer,
+ cloudSolrClient,
Arrays.asList("config", "transformer", "mytrans"),
null,
10);
@@ -383,7 +383,7 @@ public class TestSolrConfigHandler exten
public static Map testForResponseElement(RestTestHarness harness,
String testServerBaseUrl,
String uri,
- CloudSolrClient cloudSolrServer, List<String> jsonPath,
+ CloudSolrClient cloudSolrClient, List<String> jsonPath,
Object expected,
long maxTimeoutSeconds) throws Exception {
@@ -393,7 +393,7 @@ public class TestSolrConfigHandler exten
while (TimeUnit.SECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS) < maxTimeoutSeconds) {
try {
- m = testServerBaseUrl == null ? getRespMap(uri, harness) : TestSolrConfigHandlerConcurrent.getAsMap(testServerBaseUrl + uri, cloudSolrServer);
+ m = testServerBaseUrl == null ? getRespMap(uri, harness) : TestSolrConfigHandlerConcurrent.getAsMap(testServerBaseUrl + uri, cloudSolrClient);
} catch (Exception e) {
Thread.sleep(100);
continue;
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/CdcrUpdateLogTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/CdcrUpdateLogTest.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/CdcrUpdateLogTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/CdcrUpdateLogTest.java Fri May 22 18:58:29 2015
@@ -0,0 +1,800 @@
+package org.apache.solr.update;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.noggit.ObjectBuilder;
+
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+@Slow
+public class CdcrUpdateLogTest extends SolrTestCaseJ4 {
+
+ // means that we've seen the leader and have version info (i.e. we are a non-leader replica)
+ private static String FROM_LEADER = DistributedUpdateProcessor.DistribPhase.FROMLEADER.toString();
+
+ private static int timeout = 60; // acquire timeout in seconds. change this to a huge number when debugging to prevent threads from advancing.
+
+ // TODO: fix this test to not require FSDirectory
+ static String savedFactory;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ savedFactory = System.getProperty("solr.DirectoryFactory");
+ System.setProperty("solr.directoryFactory", "org.apache.solr.core.MockFSDirectoryFactory");
+ initCore("solrconfig-cdcrupdatelog.xml", "schema15.xml");
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ if (savedFactory == null) {
+ System.clearProperty("solr.directoryFactory");
+ } else {
+ System.setProperty("solr.directoryFactory", savedFactory);
+ }
+ }
+
+ // since we make up fake versions in these tests, we can get messed up by a DBQ with a real version
+ // since Solr can think following updates were reordered.
+ @Override
+ public void clearIndex() {
+ try {
+ deleteByQueryAndGetVersion("*:*", params("_version_", Long.toString(-Long.MAX_VALUE), DISTRIB_UPDATE_PARAM, FROM_LEADER));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void clearCore() throws IOException {
+ clearIndex();
+ assertU(commit());
+
+ UpdateLog ulog = h.getCore().getUpdateHandler().getUpdateLog();
+ File logDir = new File(h.getCore().getUpdateHandler().getUpdateLog().getLogDir());
+
+ h.close();
+
+ String[] files = ulog.getLogList(logDir);
+ for (String file : files) {
+
+ File toDelete = new File(logDir, file);
+ Files.delete(toDelete.toPath()); // Should we really error out here?
+ }
+
+ assertEquals(0, ulog.getLogList(logDir).length);
+
+ createCore();
+ }
+
+ private void deleteByQuery(String q) throws Exception {
+ deleteByQueryAndGetVersion(q, null);
+ }
+
+ private void addDocs(int nDocs, int start, LinkedList<Long> versions) throws Exception {
+ for (int i = 0; i < nDocs; i++) {
+ versions.addFirst(addAndGetVersion(sdoc("id", Integer.toString(start + i)), null));
+ }
+ }
+
+ private static Long getVer(SolrQueryRequest req) throws Exception {
+ String response = JQ(req);
+ Map rsp = (Map) ObjectBuilder.fromJSON(response);
+ Map doc = null;
+ if (rsp.containsKey("doc")) {
+ doc = (Map) rsp.get("doc");
+ } else if (rsp.containsKey("docs")) {
+ List lst = (List) rsp.get("docs");
+ if (lst.size() > 0) {
+ doc = (Map) lst.get(0);
+ }
+ } else if (rsp.containsKey("response")) {
+ Map responseMap = (Map) rsp.get("response");
+ List lst = (List) responseMap.get("docs");
+ if (lst.size() > 0) {
+ doc = (Map) lst.get(0);
+ }
+ }
+
+ if (doc == null) return null;
+
+ return (Long) doc.get("_version_");
+ }
+
+ @Test
+ public void testLogReaderNext() throws Exception {
+ this.clearCore();
+
+ int start = 0;
+
+ UpdateLog ulog = h.getCore().getUpdateHandler().getUpdateLog();
+ CdcrUpdateLog.CdcrLogReader reader = ((CdcrUpdateLog) ulog).newLogReader(); // test reader on empty updates log
+
+ LinkedList<Long> versions = new LinkedList<>();
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ addDocs(11, start, versions);
+ start += 11;
+ assertU(commit());
+
+ for (int i = 0; i < 10; i++) { // 10 adds
+ assertNotNull(reader.next());
+ }
+ Object o = reader.next();
+ assertNotNull(o);
+
+ List entry = (List) o;
+ int opAndFlags = (Integer) entry.get(0);
+ assertEquals(UpdateLog.COMMIT, opAndFlags & UpdateLog.OPERATION_MASK);
+
+ for (int i = 0; i < 11; i++) { // 11 adds
+ assertNotNull(reader.next());
+ }
+ o = reader.next();
+ assertNotNull(o);
+
+ entry = (List) o;
+ opAndFlags = (Integer) entry.get(0);
+ assertEquals(UpdateLog.COMMIT, opAndFlags & UpdateLog.OPERATION_MASK);
+
+ assertNull(reader.next());
+
+ // add a new tlog after having exhausted the reader
+
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ // the reader should pick up the new tlog
+
+ for (int i = 0; i < 11; i++) { // 10 adds + 1 commit
+ assertNotNull(reader.next());
+ }
+ assertNull(reader.next());
+ }
+
+ /**
+ * Check the seek method of the log reader.
+ */
+ @Test
+ public void testLogReaderSeek() throws Exception {
+ this.clearCore();
+
+ int start = 0;
+
+ UpdateLog ulog = h.getCore().getUpdateHandler().getUpdateLog();
+ CdcrUpdateLog.CdcrLogReader reader1 = ((CdcrUpdateLog) ulog).newLogReader();
+ CdcrUpdateLog.CdcrLogReader reader2 = ((CdcrUpdateLog) ulog).newLogReader();
+ CdcrUpdateLog.CdcrLogReader reader3 = ((CdcrUpdateLog) ulog).newLogReader();
+
+ LinkedList<Long> versions = new LinkedList<>();
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ addDocs(11, start, versions);
+ start += 11;
+ assertU(commit());
+
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ // Test case where target version is equal to startVersion of tlog file
+ long targetVersion = getVer(req("q", "id:10"));
+
+ assertTrue(reader1.seek(targetVersion));
+ Object o = reader1.next();
+ assertNotNull(o);
+ List entry = (List) o;
+ long version = (Long) entry.get(1);
+
+ assertEquals(targetVersion, version);
+
+ assertNotNull(reader1.next());
+
+ // test case where target version is superior to startVersion of tlog file
+ targetVersion = getVer(req("q", "id:26"));
+
+ assertTrue(reader2.seek(targetVersion));
+ o = reader2.next();
+ assertNotNull(o);
+ entry = (List) o;
+ version = (Long) entry.get(1);
+
+ assertEquals(targetVersion, version);
+
+ assertNotNull(reader2.next());
+
+ // test case where target version is inferior to startVersion of oldest tlog file
+ targetVersion = getVer(req("q", "id:0")) - 1;
+
+ assertFalse(reader3.seek(targetVersion));
+ }
+
+ /**
+ * Check that the log reader is able to read the new tlog
+ * and pick up new entries as they appear.
+ */
+ @Test
+ public void testLogReaderNextOnNewTLog() throws Exception {
+ this.clearCore();
+
+ int start = 0;
+
+ UpdateLog ulog = h.getCore().getUpdateHandler().getUpdateLog();
+ CdcrUpdateLog.CdcrLogReader reader = ((CdcrUpdateLog) ulog).newLogReader();
+
+ LinkedList<Long> versions = new LinkedList<>();
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ addDocs(11, start, versions);
+ start += 11;
+
+ for (int i = 0; i < 22; i++) { // 21 adds + 1 commit
+ assertNotNull(reader.next());
+ }
+
+ // we should have reach the end of the new tlog
+ assertNull(reader.next());
+
+ addDocs(5, start, versions);
+ start += 5;
+
+ // the reader should now pick up the new updates
+
+ for (int i = 0; i < 5; i++) { // 5 adds
+ assertNotNull(reader.next());
+ }
+
+ assertNull(reader.next());
+ }
+
+ @Test
+ public void testRemoveOldLogs() throws Exception {
+ this.clearCore();
+
+ UpdateLog ulog = h.getCore().getUpdateHandler().getUpdateLog();
+ File logDir = new File(h.getCore().getUpdateHandler().getUpdateLog().getLogDir());
+
+ int start = 0;
+ int maxReq = 50;
+
+ LinkedList<Long> versions = new LinkedList<>();
+ addDocs(10, start, versions);
+ start += 10;
+ assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
+ assertU(commit());
+ assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
+
+ addDocs(10, start, versions);
+ start += 10;
+ assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
+ assertU(commit());
+ assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
+
+ assertEquals(2, ulog.getLogList(logDir).length);
+
+ // Get a cdcr log reader to initialise a log pointer
+ CdcrUpdateLog.CdcrLogReader reader = ((CdcrUpdateLog) ulog).newLogReader();
+
+ addDocs(105, start, versions);
+ start += 105;
+ assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
+ assertU(commit());
+ assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
+
+ // the previous two tlogs should not be removed
+ assertEquals(3, ulog.getLogList(logDir).length);
+
+ // move the pointer past the first tlog
+ for (int i = 0; i <= 11; i++) { // 10 adds + 1 commit
+ assertNotNull(reader.next());
+ }
+
+ addDocs(10, start, versions);
+ start += 10;
+ assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
+ assertU(commit());
+ assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
+
+ // the first tlog should be removed
+ assertEquals(3, ulog.getLogList(logDir).length);
+
+ h.close();
+ createCore();
+
+ ulog = h.getCore().getUpdateHandler().getUpdateLog();
+
+ addDocs(105, start, versions);
+ start += 105;
+ assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
+ assertU(commit());
+ assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
+
+ // previous tlogs should be gone now
+ assertEquals(1, ulog.getLogList(logDir).length);
+ }
+
+ /**
+ * Check that the removal of old logs is taking into consideration
+ * multiple log pointers. Check also that the removal takes into consideration the
+ * numRecordsToKeep limit, even if the log pointers are ahead.
+ */
+ @Test
+ public void testRemoveOldLogsMultiplePointers() throws Exception {
+ this.clearCore();
+
+ UpdateLog ulog = h.getCore().getUpdateHandler().getUpdateLog();
+ File logDir = new File(h.getCore().getUpdateHandler().getUpdateLog().getLogDir());
+ CdcrUpdateLog.CdcrLogReader reader1 = ((CdcrUpdateLog) ulog).newLogReader();
+ CdcrUpdateLog.CdcrLogReader reader2 = ((CdcrUpdateLog) ulog).newLogReader();
+
+ int start = 0;
+
+ LinkedList<Long> versions = new LinkedList<>();
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ addDocs(105, start, versions);
+ start += 105;
+ assertU(commit());
+
+ // the previous two tlogs should not be removed
+ assertEquals(3, ulog.getLogList(logDir).length);
+
+ // move the first pointer past the first tlog
+ for (int i = 0; i <= 11; i++) { // 10 adds + 1 commit
+ assertNotNull(reader1.next());
+ }
+
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ // the first tlog should not be removed
+ assertEquals(4, ulog.getLogList(logDir).length);
+
+ // move the second pointer past the first tlog
+ for (int i = 0; i <= 11; i++) { // 10 adds + 1 commit
+ assertNotNull(reader2.next());
+ }
+
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ // the first tlog should be removed
+ assertEquals(4, ulog.getLogList(logDir).length);
+
+ // exhaust the readers
+ while (reader1.next() != null) {
+ }
+ while (reader2.next() != null) {
+ }
+
+ // the readers should point to the new tlog
+ // now add enough documents to trigger the numRecordsToKeep limit
+
+ addDocs(80, start, versions);
+ start += 80;
+ assertU(commit());
+
+ // the update log should kept the last 3 tlogs, which sum up to 100 records
+ assertEquals(3, ulog.getLogList(logDir).length);
+ }
+
+ /**
+ * Check that the output stream of an uncapped tlog is correctly reopen
+ * and that the commit is written during recovery.
+ */
+ @Test
+ public void testClosingOutputStreamAfterLogReplay() throws Exception {
+ this.clearCore();
+
+ DirectUpdateHandler2.commitOnClose = false;
+ final Semaphore logReplay = new Semaphore(0);
+ final Semaphore logReplayFinish = new Semaphore(0);
+
+ UpdateLog.testing_logReplayHook = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ assertTrue(logReplay.tryAcquire(timeout, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ UpdateLog.testing_logReplayFinishHook = new Runnable() {
+ @Override
+ public void run() {
+ logReplayFinish.release();
+ }
+ };
+
+ Deque<Long> versions = new ArrayDeque<>();
+ versions.addFirst(addAndGetVersion(sdoc("id", "A11"), null));
+ versions.addFirst(addAndGetVersion(sdoc("id", "A12"), null));
+ versions.addFirst(addAndGetVersion(sdoc("id", "A13"), null));
+
+ assertJQ(req("q", "*:*"), "/response/numFound==0");
+
+ assertJQ(req("qt", "/get", "getVersions", "" + versions.size()), "/versions==" + versions);
+
+ h.close();
+ createCore();
+ // Solr should kick this off now
+ // h.getCore().getUpdateHandler().getUpdateLog().recoverFromLog();
+
+ // verify that previous close didn't do a commit
+ // recovery should be blocked by our hook
+ assertJQ(req("q", "*:*"), "/response/numFound==0");
+
+ // unblock recovery
+ logReplay.release(1000);
+
+ // wait until recovery has finished
+ assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
+
+ assertJQ(req("q", "*:*"), "/response/numFound==3");
+
+ // The transaction log should have written a commit and close its output stream
+ UpdateLog ulog = h.getCore().getUpdateHandler().getUpdateLog();
+ assertEquals(0, ulog.logs.peekLast().refcount.get());
+ assertNull(ulog.logs.peekLast().channel);
+
+ ulog.logs.peekLast().incref(); // reopen the output stream to check if its ends with a commit
+ assertTrue(ulog.logs.peekLast().endsWithCommit());
+ ulog.logs.peekLast().decref();
+ }
+
+ /**
+ * Check the buffering of the old tlogs
+ */
+ @Test
+ public void testBuffering() throws Exception {
+ this.clearCore();
+
+ CdcrUpdateLog ulog = (CdcrUpdateLog) h.getCore().getUpdateHandler().getUpdateLog();
+ File logDir = new File(h.getCore().getUpdateHandler().getUpdateLog().getLogDir());
+
+ int start = 0;
+
+ LinkedList<Long> versions = new LinkedList<>();
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ addDocs(105, start, versions);
+ start += 105;
+ assertU(commit());
+
+ // the first two tlogs should have been removed
+ assertEquals(1, ulog.getLogList(logDir).length);
+
+ // enable buffer
+ ulog.enableBuffer();
+
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ addDocs(105, start, versions);
+ start += 105;
+ assertU(commit());
+
+ // no tlog should have been removed
+ assertEquals(4, ulog.getLogList(logDir).length);
+
+ // disable buffer
+ ulog.disableBuffer();
+
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ // old tlogs should have been removed
+ assertEquals(2, ulog.getLogList(logDir).length);
+ }
+
+
+ @Test
+ public void testSubReader() throws Exception {
+ this.clearCore();
+
+ CdcrUpdateLog ulog = (CdcrUpdateLog) h.getCore().getUpdateHandler().getUpdateLog();
+ File logDir = new File(h.getCore().getUpdateHandler().getUpdateLog().getLogDir());
+ CdcrUpdateLog.CdcrLogReader reader = ulog.newLogReader();
+
+ int start = 0;
+
+ LinkedList<Long> versions = new LinkedList<>();
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ assertEquals(2, ulog.getLogList(logDir).length);
+
+ // start to read the first tlog
+ for (int i = 0; i < 10; i++) {
+ assertNotNull(reader.next());
+ }
+
+ // instantiate a sub reader, and finish to read the first tlog (commit operation), plus start to read the
+ // second tlog (first five adds)
+ CdcrUpdateLog.CdcrLogReader subReader = reader.getSubReader();
+ for (int i = 0; i < 6; i++) {
+ assertNotNull(subReader.next());
+ }
+
+ // Five adds + one commit
+ assertEquals(6, subReader.getNumberOfRemainingRecords());
+
+ // Generate a new tlog
+ addDocs(105, start, versions);
+ start += 105;
+ assertU(commit());
+
+ // Even if the subreader is past the first tlog, the first tlog should not have been removed
+ // since the parent reader is still pointing to it
+ assertEquals(3, ulog.getLogList(logDir).length);
+
+ // fast forward the parent reader with the subreader
+ reader.forwardSeek(subReader);
+ subReader.close();
+
+ // After fast forward, the parent reader should be position on the doc15
+ List o = (List) reader.next();
+ assertNotNull(o);
+ assertTrue(o.get(2) instanceof SolrInputDocument);
+ assertEquals("15", ((SolrInputDocument) o.get(2)).getFieldValue("id"));
+
+ // Finish to read the second tlog, and start to read the third one
+ for (int i = 0; i < 6; i++) {
+ assertNotNull(reader.next());
+ }
+
+ assertEquals(105, reader.getNumberOfRemainingRecords());
+
+ // Generate a new tlog to activate tlog cleaning
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ // If the parent reader was correctly fast forwarded, it should be on the third tlog, and the first two should
+ // have been removed.
+ assertEquals(2, ulog.getLogList(logDir).length);
+ }
+
+ /**
+ * Check that the reader is correctly reset to its last position
+ */
+ @Test
+ public void testResetToLastPosition() throws Exception {
+ this.clearCore();
+
+ CdcrUpdateLog ulog = (CdcrUpdateLog) h.getCore().getUpdateHandler().getUpdateLog();
+ File logDir = new File(h.getCore().getUpdateHandler().getUpdateLog().getLogDir());
+ CdcrUpdateLog.CdcrLogReader reader = ulog.newLogReader();
+
+ int start = 0;
+
+ LinkedList<Long> versions = new LinkedList<>();
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ assertEquals(2, ulog.getLogList(logDir).length);
+
+ for (int i = 0; i < 22; i++) {
+ Object o = reader.next();
+ assertNotNull(o);
+ // reset to last position
+ reader.resetToLastPosition();
+ // we should read the same update operation, i.e., same version number
+ assertEquals(((List) o).get(1), ((List) reader.next()).get(1));
+ }
+ assertNull(reader.next());
+ }
+
+ /**
+ * Check that the reader is correctly reset to its last position
+ */
+ @Test
+ public void testGetNumberOfRemainingRecords() throws Exception {
+ try {
+ DirectUpdateHandler2.commitOnClose = false;
+ final Semaphore logReplayFinish = new Semaphore(0);
+ UpdateLog.testing_logReplayFinishHook = new Runnable() {
+ @Override
+ public void run() {
+ logReplayFinish.release();
+ }
+ };
+
+ this.clearCore();
+
+ int start = 0;
+
+ LinkedList<Long> versions = new LinkedList<>();
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ addDocs(10, start, versions);
+ start += 10;
+
+ h.close();
+ logReplayFinish.drainPermits();
+ createCore();
+
+ // At this stage, we have re-opened a capped tlog, and an uncapped tlog.
+ // check that the number of remaining records is correctly computed in these two cases
+
+ CdcrUpdateLog ulog = (CdcrUpdateLog) h.getCore().getUpdateHandler().getUpdateLog();
+ CdcrUpdateLog.CdcrLogReader reader = ulog.newLogReader();
+
+ // wait for the replay to finish
+ assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
+
+ // 20 records + 2 commits
+ assertEquals(22, reader.getNumberOfRemainingRecords());
+
+ for (int i = 0; i < 22; i++) {
+ Object o = reader.next();
+ assertNotNull(o);
+ assertEquals(22 - (i + 1), reader.getNumberOfRemainingRecords());
+ }
+ assertNull(reader.next());
+ assertEquals(0, reader.getNumberOfRemainingRecords());
+
+ // It should pick up the new tlog files
+ addDocs(10, start, versions);
+ assertEquals(10, reader.getNumberOfRemainingRecords());
+ } finally {
+ DirectUpdateHandler2.commitOnClose = true;
+ UpdateLog.testing_logReplayFinishHook = null;
+ }
+ }
+
+ /**
+ * Check that the initialisation of the log reader is picking up the tlog file that is currently being
+ * written.
+ */
+ @Test
+ public void testLogReaderInitOnNewTlog() throws Exception {
+ this.clearCore();
+
+ int start = 0;
+
+ // Start to index some documents to instantiate the new tlog
+ LinkedList<Long> versions = new LinkedList<>();
+ addDocs(10, start, versions);
+ start += 10;
+
+ // Create the reader after the instantiation of the new tlog
+ UpdateLog ulog = h.getCore().getUpdateHandler().getUpdateLog();
+ CdcrUpdateLog.CdcrLogReader reader = ((CdcrUpdateLog) ulog).newLogReader();
+
+ // Continue to index documents and commits
+ addDocs(11, start, versions);
+ start += 11;
+ assertU(commit());
+
+ // check that the log reader was initialised with the new tlog
+ for (int i = 0; i < 22; i++) { // 21 adds + 1 commit
+ assertNotNull(reader.next());
+ }
+
+ // we should have reach the end of the new tlog
+ assertNull(reader.next());
+ }
+
+ /**
+ * Check that the absolute version number is used for the update log index and for the last entry read
+ */
+ @Test
+ public void testAbsoluteLastVersion() throws Exception {
+ this.clearCore();
+
+ CdcrUpdateLog ulog = (CdcrUpdateLog) h.getCore().getUpdateHandler().getUpdateLog();
+ File logDir = new File(h.getCore().getUpdateHandler().getUpdateLog().getLogDir());
+ CdcrUpdateLog.CdcrLogReader reader = ulog.newLogReader();
+
+ int start = 0;
+
+ LinkedList<Long> versions = new LinkedList<>();
+ addDocs(10, start, versions);
+ start += 10;
+ deleteByQuery("*:*");
+ assertU(commit());
+
+ deleteByQuery("*:*");
+ addDocs(10, start, versions);
+ start += 10;
+ assertU(commit());
+
+ assertEquals(2, ulog.getLogList(logDir).length);
+
+ for (long version : ulog.getStartingVersions()) {
+ assertTrue(version > 0);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ reader.next();
+ }
+
+ // first delete
+ Object o = reader.next();
+ assertTrue((Long) ((List) o).get(1) < 0);
+ assertTrue(reader.getLastVersion() > 0);
+
+ reader.next(); // commit
+
+ // second delete
+ o = reader.next();
+ assertTrue((Long) ((List) o).get(1) < 0);
+ assertTrue(reader.getLastVersion() > 0);
+ }
+
+}
+
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java Fri May 22 18:58:29 2015
@@ -7,9 +7,9 @@ package org.apache.solr.common.cloud;
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -56,13 +56,13 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
/**
- *
+ *
* All Solr ZooKeeper interactions should go through this class rather than
* ZooKeeper. This class handles synchronous connects and reconnections.
*
*/
public class SolrZkClient implements Closeable {
-
+
static final String NEWL = System.getProperty("line.separator");
static final int DEFAULT_CLIENT_CONNECT_TIMEOUT = 30000;
@@ -73,7 +73,7 @@ public class SolrZkClient implements Clo
private ConnectionManager connManager;
private volatile SolrZooKeeper keeper;
-
+
private ZkCmdExecutor zkCmdExecutor;
private final ExecutorService zkCallbackExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("zkCallback"));
@@ -90,17 +90,17 @@ public class SolrZkClient implements Clo
// expert: for tests
public SolrZkClient() {
-
+
}
-
+
public SolrZkClient(String zkServerAddress, int zkClientTimeout) {
this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), null);
}
-
+
public SolrZkClient(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) {
this(zkServerAddress, zkClientTimeout, zkClientConnectTimeout, new DefaultConnectionStrategy(), null);
}
-
+
public SolrZkClient(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, OnReconnect onReonnect) {
this(zkServerAddress, zkClientTimeout, zkClientConnectTimeout, new DefaultConnectionStrategy(), onReonnect);
}
@@ -109,31 +109,31 @@ public class SolrZkClient implements Clo
ZkClientConnectionStrategy strat, final OnReconnect onReconnect) {
this(zkServerAddress, zkClientTimeout, DEFAULT_CLIENT_CONNECT_TIMEOUT, strat, onReconnect);
}
-
+
public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
ZkClientConnectionStrategy strat, final OnReconnect onReconnect) {
this(zkServerAddress, zkClientTimeout, clientConnectTimeout, strat, onReconnect, null, null);
}
-
+
public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
ZkClientConnectionStrategy strat, final OnReconnect onReconnect, BeforeReconnect beforeReconnect) {
this(zkServerAddress, zkClientTimeout, clientConnectTimeout, strat, onReconnect, beforeReconnect, null);
}
- public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
+ public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
ZkClientConnectionStrategy strat, final OnReconnect onReconnect, BeforeReconnect beforeReconnect, ZkACLProvider zkACLProvider) {
this.zkClientConnectionStrategy = strat;
this.zkServerAddress = zkServerAddress;
-
+
if (strat == null) {
strat = new DefaultConnectionStrategy();
}
-
+
if (!strat.hasZkCredentialsToAddAutomatically()) {
ZkCredentialsProvider zkCredentialsToAddAutomatically = createZkCredentialsToAddAutomatically();
strat.setZkCredentialsToAddAutomatically(zkCredentialsToAddAutomatically);
}
-
+
this.zkClientTimeout = zkClientTimeout;
// we must retry at least as long as the session timeout
zkCmdExecutor = new ZkCmdExecutor(zkClientTimeout);
@@ -168,7 +168,7 @@ public class SolrZkClient implements Clo
}
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
-
+
try {
connManager.waitForConnected(clientConnectTimeout);
} catch (Exception e) {
@@ -191,7 +191,7 @@ public class SolrZkClient implements Clo
public ConnectionManager getConnectionManager() {
return connManager;
}
-
+
public ZkClientConnectionStrategy getZkClientConnectionStrategy() {
return zkClientConnectionStrategy;
}
@@ -227,14 +227,14 @@ public class SolrZkClient implements Clo
log.info("Using default ZkACLProvider");
return new DefaultZkACLProvider();
}
-
+
/**
* Returns true if client is connected
*/
public boolean isConnected() {
return keeper != null && keeper.getState() == ZooKeeper.States.CONNECTED;
}
-
+
public void delete(final String path, final int version, boolean retryOnConnLoss)
throws InterruptedException, KeeperException {
if (retryOnConnLoss) {
@@ -250,11 +250,16 @@ public class SolrZkClient implements Clo
}
}
- private Watcher wrapWatcher (final Watcher watcher) {
- if (watcher == null) return watcher;
+ /**
+ * Wraps the watcher so that it doesn't fire off ZK's event queue. In order to guarantee that a watch object will
+ * only be triggered once for a given notification, users need to wrap their watcher using this method before
+ * calling {@link #exists(String, org.apache.zookeeper.Watcher, boolean)} or
+ * {@link #getData(String, org.apache.zookeeper.Watcher, org.apache.zookeeper.data.Stat, boolean)}.
+ */
+ public Watcher wrapWatcher(final Watcher watcher) {
+ if (watcher == null || watcher instanceof SolrZkWatcher) return watcher;
- // wrap the watcher so that it doesn't fire off ZK's event queue
- return new Watcher() {
+ return new SolrZkWatcher() {
@Override
public void process(final WatchedEvent event) {
log.debug("Submitting job to respond to event " + event);
@@ -268,6 +273,9 @@ public class SolrZkClient implements Clo
};
}
+ private interface SolrZkWatcher extends Watcher {
+ }
+
/**
* Return the stat of the node of the given path. Return null if no such a
* node exists.
@@ -298,7 +306,7 @@ public class SolrZkClient implements Clo
return keeper.exists(path, wrapWatcher(watcher));
}
}
-
+
/**
* Returns true if path exists
*/
@@ -366,7 +374,7 @@ public class SolrZkClient implements Clo
return keeper.setData(path, data, version);
}
}
-
+
/**
* Returns path of created node
*/
@@ -389,7 +397,7 @@ public class SolrZkClient implements Clo
/**
* Creates the path in ZooKeeper, creating each node as necessary.
- *
+ *
* e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
* group, node exist, each will be created.
*/
@@ -397,23 +405,23 @@ public class SolrZkClient implements Clo
InterruptedException {
makePath(path, null, CreateMode.PERSISTENT, retryOnConnLoss);
}
-
+
public void makePath(String path, boolean failOnExists, boolean retryOnConnLoss) throws KeeperException,
InterruptedException {
makePath(path, null, CreateMode.PERSISTENT, null, failOnExists, retryOnConnLoss);
}
-
+
public void makePath(String path, File file, boolean failOnExists, boolean retryOnConnLoss)
throws IOException, KeeperException, InterruptedException {
makePath(path, FileUtils.readFileToByteArray(file),
CreateMode.PERSISTENT, null, failOnExists, retryOnConnLoss);
}
-
+
public void makePath(String path, File file, boolean retryOnConnLoss) throws IOException,
KeeperException, InterruptedException {
makePath(path, FileUtils.readFileToByteArray(file), retryOnConnLoss);
}
-
+
public void makePath(String path, CreateMode createMode, boolean retryOnConnLoss) throws KeeperException,
InterruptedException {
makePath(path, null, createMode, retryOnConnLoss);
@@ -421,7 +429,7 @@ public class SolrZkClient implements Clo
/**
* Creates the path in ZooKeeper, creating each node as necessary.
- *
+ *
* @param data to set on the last zkNode
*/
public void makePath(String path, byte[] data, boolean retryOnConnLoss) throws KeeperException,
@@ -431,10 +439,10 @@ public class SolrZkClient implements Clo
/**
* Creates the path in ZooKeeper, creating each node as necessary.
- *
+ *
* e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
* group, node exist, each will be created.
- *
+ *
* @param data to set on the last zkNode
*/
public void makePath(String path, byte[] data, CreateMode createMode, boolean retryOnConnLoss)
@@ -444,25 +452,24 @@ public class SolrZkClient implements Clo
/**
* Creates the path in ZooKeeper, creating each node as necessary.
- *
+ *
* e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
* group, node exist, each will be created.
- *
+ *
* @param data to set on the last zkNode
*/
public void makePath(String path, byte[] data, CreateMode createMode,
Watcher watcher, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
makePath(path, data, createMode, watcher, true, retryOnConnLoss);
}
-
/**
* Creates the path in ZooKeeper, creating each node as necessary.
- *
+ *
* e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
* group, node exist, each will be created.
- *
+ *
* Note: retryOnConnLoss is only respected for the final node - nodes
* before that are always retried on connection loss.
*/
@@ -472,7 +479,7 @@ public class SolrZkClient implements Clo
log.info("makePath: " + path);
}
boolean retry = true;
-
+
if (path.startsWith("/")) {
path = path.substring(1, path.length());
}
@@ -506,7 +513,7 @@ public class SolrZkClient implements Clo
keeper.create(currentPath, bytes, zkACLProvider.getACLsToAdd(currentPath), mode);
}
} catch (NodeExistsException e) {
-
+
if (!failOnExists) {
// TODO: version ? for now, don't worry about race
setData(currentPath, data, -1, retryOnConnLoss);
@@ -514,7 +521,7 @@ public class SolrZkClient implements Clo
exists(currentPath, watcher, retryOnConnLoss);
return;
}
-
+
// ignore unless it's the last node in the path
if (i == paths.length - 1) {
throw e;
@@ -548,7 +555,7 @@ public class SolrZkClient implements Clo
/**
* Write file to ZooKeeper - default system encoding used.
- *
+ *
* @param path path to upload file to e.g. /solr/conf/solrconfig.xml
* @param file path to file to be uploaded
*/
@@ -594,7 +601,7 @@ public class SolrZkClient implements Clo
// this is the cluster state in xml format - lets pretty print
dataString = prettyPrint(dataString);
}
-
+
string.append(dent + "DATA:\n" + dent + " "
+ dataString.replaceAll("\n", "\n" + dent + " ") + NEWL);
} else {
@@ -624,7 +631,7 @@ public class SolrZkClient implements Clo
printLayout("/", 0, sb);
System.out.println(sb.toString());
}
-
+
public static String prettyPrint(String input, int indent) {
try {
Source xmlInput = new StreamSource(new StringReader(input));
@@ -640,7 +647,7 @@ public class SolrZkClient implements Clo
throw new RuntimeException("Problem pretty printing XML", e);
}
}
-
+
private static String prettyPrint(String input) {
return prettyPrint(input, 2);
}
@@ -673,11 +680,11 @@ public class SolrZkClient implements Clo
// we might have been closed already
if (isClosed) this.keeper.close();
}
-
+
public SolrZooKeeper getSolrZooKeeper() {
return keeper;
}
-
+
private void closeKeeper(SolrZooKeeper keeper) {
if (keeper != null) {
try {
@@ -729,7 +736,7 @@ public class SolrZkClient implements Clo
return;
}
}
-
+
/**
* Validates if zkHost contains a chroot. See http://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#ch_zkSessions
*/