You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by is...@apache.org on 2016/12/06 19:01:32 UTC
[1/4] lucene-solr:jira/solr-5944: SOLR-5944 Initial import into branch
Repository: lucene-solr
Updated Branches:
refs/heads/jira/solr-5944 [created] 8ae359a63
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesStandalone.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesStandalone.java b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesStandalone.java
new file mode 100644
index 0000000..f982c2f
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesStandalone.java
@@ -0,0 +1,1026 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.lucene.document.NumericDocValuesField;
+import org.apache.lucene.document.StoredField;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.legacy.LegacyField;
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.search.TestRTGBase;
+import org.apache.solr.update.processor.AtomicUpdateDocumentMerger;
+import org.apache.solr.util.RefCounted;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.internal.matchers.StringContains.containsString;
+
+
+/**
+ * Tests the in-place updates (docValues updates) for a standalone Solr instance.
+ */
+public class TestInPlaceUpdatesStandalone extends TestRTGBase {
+ // nocommit: why is this class extending TestRTGBase?
+ // nocommit: it doesn't seem to use any features of that baseclass (and was subclassing SolrTestCaseJ4 in previous patches)
+
+ private static SolrClient client;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+
+ // nocommit: does this test need to randomize between diff schema/fields used?
+ // nocommit: see nocommits/jira questions related to special dynamicField logic in AtomicUpdateDocumentMerger.isInPlaceUpdate
+
+ initCore("solrconfig-tlog.xml", "schema-inplace-updates.xml");
+
+ // sanity check that autocommits are disabled
+ assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxTime);
+ assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoSoftCommmitMaxTime);
+ assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxDocs);
+ assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoSoftCommmitMaxDocs);
+
+ // validate that the schema was not changed to an unexpected state
+ IndexSchema schema = h.getCore().getLatestSchema();
+ for (String fieldName : Arrays.asList("_version_", "inplace_updatable_float", "inplace_l_dvo")) {
+ // these fields must only be using docValues to support inplace updates
+ SchemaField field = schema.getField(fieldName);
+ assertTrue(field.toString(),
+ field.hasDocValues() && ! field.indexed() && ! field.stored());
+ }
+ for (String fieldName : Arrays.asList("title_s", "regular_l", "stored_i")) {
+ // these fields must support atomic updates, but not inplace updates (ie: stored)
+ SchemaField field = schema.getField(fieldName);
+ assertTrue(field.toString(), field.stored());
+ }
+
+ // Don't close this client, it would shutdown the CoreContainer
+ client = new EmbeddedSolrServer(h.getCoreContainer(), h.coreName);
+ }
+
+ @Before
+ public void deleteAllAndCommit() {
+ clearIndex();
+ assertU(commit("softCommit", "false"));
+ }
+
+ @Test
+ public void testUpdatingDocValues() throws Exception {
+ long version1 = addAndGetVersion(sdoc("id", "1", "title_s", "first"), null);
+ long version2 = addAndGetVersion(sdoc("id", "2", "title_s", "second"), null);
+ long version3 = addAndGetVersion(sdoc("id", "3", "title_s", "third"), null);
+ assertU(commit("softCommit", "false"));
+ assertQ(req("q", "*:*"), "//*[@numFound='3']");
+
+ // the reason we're fetching these docids is to validate that the subsequent updates
+ // are done in place and don't cause the docids to change
+ int docid1 = getDocId("1");
+ int docid2 = getDocId("2");
+ int docid3 = getDocId("3");
+
+ // Check docValues were "set"
+ version1 = addAndAssertVersion(version1, "id", "1", "inplace_updatable_float", map("set", 200));
+ version2 = addAndAssertVersion(version2, "id", "2", "inplace_updatable_float", map("set", 300));
+ version3 = addAndAssertVersion(version3, "id", "3", "inplace_updatable_float", map("set", 100));
+ assertU(commit("softCommit", "false"));
+
+ assertQ(req("q", "*:*", "sort", "id asc", "fl", "*,[docid]"),
+ "//*[@numFound='3']",
+ "//result/doc[1]/float[@name='inplace_updatable_float'][.='200.0']",
+ "//result/doc[2]/float[@name='inplace_updatable_float'][.='300.0']",
+ "//result/doc[3]/float[@name='inplace_updatable_float'][.='100.0']",
+ "//result/doc[1]/long[@name='_version_'][.='"+version1+"']",
+ "//result/doc[2]/long[@name='_version_'][.='"+version2+"']",
+ "//result/doc[3]/long[@name='_version_'][.='"+version3+"']",
+ "//result/doc[1]/int[@name='[docid]'][.='"+docid1+"']",
+ "//result/doc[2]/int[@name='[docid]'][.='"+docid2+"']",
+ "//result/doc[3]/int[@name='[docid]'][.='"+docid3+"']"
+ );
+
+ // Check docValues are "inc"ed
+ version1 = addAndAssertVersion(version1, "id", "1", "inplace_updatable_float", map("inc", 1));
+ version2 = addAndAssertVersion(version2, "id", "2", "inplace_updatable_float", map("inc", -2));
+ version3 = addAndAssertVersion(version3, "id", "3", "inplace_updatable_float", map("inc", 3));
+ assertU(commit("softCommit", "false"));
+ assertQ(req("q", "*:*", "sort", "id asc", "fl", "*,[docid]"),
+ "//*[@numFound='3']",
+ "//result/doc[1]/float[@name='inplace_updatable_float'][.='201.0']",
+ "//result/doc[2]/float[@name='inplace_updatable_float'][.='298.0']",
+ "//result/doc[3]/float[@name='inplace_updatable_float'][.='103.0']",
+ "//result/doc[1]/long[@name='_version_'][.='"+version1+"']",
+ "//result/doc[2]/long[@name='_version_'][.='"+version2+"']",
+ "//result/doc[3]/long[@name='_version_'][.='"+version3+"']",
+ "//result/doc[1]/int[@name='[docid]'][.='"+docid1+"']",
+ "//result/doc[2]/int[@name='[docid]'][.='"+docid2+"']",
+ "//result/doc[3]/int[@name='[docid]'][.='"+docid3+"']"
+ );
+
+ // Check back to back "inc"s are working (off the transaction log)
+ version1 = addAndAssertVersion(version1, "id", "1", "inplace_updatable_float", map("inc", 1));
+ version1 = addAndAssertVersion(version1, "id", "1", "inplace_updatable_float", map("inc", 2)); // new value should be 204
+ assertU(commit("softCommit", "false"));
+ assertQ(req("q", "id:1", "fl", "*,[docid]"),
+ "//result/doc[1]/float[@name='inplace_updatable_float'][.='204.0']",
+ "//result/doc[1]/long[@name='_version_'][.='"+version1+"']",
+ "//result/doc[1]/int[@name='[docid]'][.='"+docid1+"']");
+
+ // Now let the document be atomically updated (non-inplace), ensure the old docvalue is part of new doc
+ version1 = addAndAssertVersion(version1, "id", "1", "title_s", map("set", "new first"));
+ assertU(commit("softCommit", "false"));
+ int newDocid1 = getDocId("1");
+ assertTrue(newDocid1 != docid1);
+ docid1 = newDocid1;
+
+ assertQ(req("q", "id:1"),
+ "//result/doc[1]/float[@name='inplace_updatable_float'][.='204.0']",
+ "//result/doc[1]/str[@name='title_s'][.='new first']",
+ "//result/doc[1]/long[@name='_version_'][.='"+version1+"']");
+
+ // Check if atomic update with "inc" to a docValue works
+ version2 = addAndAssertVersion(version2, "id", "2", "title_s", map("set", "new second"), "inplace_updatable_float", map("inc", 2));
+ assertU(commit("softCommit", "false"));
+ int newDocid2 = getDocId("2");
+ assertTrue(newDocid2 != docid2);
+ docid2 = newDocid2;
+
+ assertQ(req("q", "id:2"),
+ "//result/doc[1]/float[@name='inplace_updatable_float'][.='300.0']",
+ "//result/doc[1]/str[@name='title_s'][.='new second']",
+ "//result/doc[1]/long[@name='_version_'][.='"+version2+"']");
+
+ // Check if docvalue "inc" update works for a newly created document, which is not yet committed
+ // Case1: docvalue was supplied during add of new document
+ long version4 = addAndGetVersion(sdoc("id", "4", "title_s", "fourth", "inplace_updatable_float", "400"), params());
+ version4 = addAndAssertVersion(version4, "id", "4", "inplace_updatable_float", map("inc", 1));
+ assertU(commit("softCommit", "false"));
+ assertQ(req("q", "id:4"),
+ "//result/doc[1]/float[@name='inplace_updatable_float'][.='401.0']",
+ "//result/doc[1]/long[@name='_version_'][.='"+version4+"']");
+
+ // Check if docvalue "inc" update works for a newly created document, which is not yet committed
+ // Case2: docvalue was not supplied during add of new document, should assume default
+ long version5 = addAndGetVersion(sdoc("id", "5", "title_s", "fifth"), params());
+ version5 = addAndAssertVersion(version5, "id", "5", "inplace_updatable_float", map("inc", 1));
+ assertU(commit("softCommit", "false"));
+ assertQ(req("q", "id:5"),
+ "//result/doc[1]/float[@name='inplace_updatable_float'][.='1.0']",
+ "//result/doc[1]/long[@name='_version_'][.='"+version5+"']");
+
+ // Check if docvalue "set" update works for a newly created document, which is not yet committed
+ long version6 = addAndGetVersion(sdoc("id", "6", "title_s", "sixth"), params());
+ version6 = addAndAssertVersion(version6, "id", "6", "inplace_updatable_float", map("set", 600));
+ assertU(commit("softCommit", "false"));
+ assertQ(req("q", "id:6"),
+ "//result/doc[1]/float[@name='inplace_updatable_float'][.='600.0']",
+ "//result/doc[1]/long[@name='_version_'][.='"+version6+"']");
+
+ // Check optimistic concurrency works
+ long v20 = addAndGetVersion(sdoc("id", "20", "title_s","first", "inplace_updatable_float", 100), params());
+ SolrException exception = expectThrows(SolrException.class, () -> {
+ addAndGetVersion(sdoc("id","20", "_version_", -1, "inplace_updatable_float", map("inc", 1)), null);
+ });
+ assertEquals(exception.toString(), SolrException.ErrorCode.CONFLICT.code, exception.code());
+ assertThat(exception.getMessage(), containsString("expected=-1"));
+ assertThat(exception.getMessage(), containsString("actual="+v20));
+
+
+ long oldV20 = v20;
+ v20 = addAndAssertVersion(v20, "id","20", "_version_", v20, "inplace_updatable_float", map("inc", 1));
+ exception = expectThrows(SolrException.class, () -> {
+ addAndGetVersion(sdoc("id","20", "_version_", oldV20, "inplace_updatable_float", map("inc", 1)), null);
+ });
+ assertEquals(exception.toString(), SolrException.ErrorCode.CONFLICT.code, exception.code());
+ assertThat(exception.getMessage(), containsString("expected="+oldV20));
+ assertThat(exception.getMessage(), containsString("actual="+v20));
+
+ v20 = addAndAssertVersion(v20, "id","20", "_version_", v20, "inplace_updatable_float", map("inc", 1));
+ // RTG before a commit
+ assertJQ(req("qt","/get", "id","20", "fl","id,inplace_updatable_float,_version_"),
+ "=={'doc':{'id':'20', 'inplace_updatable_float':" + 102.0 + ",'_version_':" + v20 + "}}");
+ assertU(commit("softCommit", "false"));
+ assertQ(req("q", "id:20"),
+ "//result/doc[1]/float[@name='inplace_updatable_float'][.='102.0']",
+ "//result/doc[1]/long[@name='_version_'][.='"+v20+"']");
+
+ // Check if updated DVs can be used for search
+ assertQ(req("q", "inplace_updatable_float:102"),
+ "//result/doc[1]/str[@name='id'][.='20']",
+ "//result/doc[1]/float[@name='inplace_updatable_float'][.='102.0']",
+ "//result/doc[1]/long[@name='_version_'][.='"+v20+"']");
+
+ // Check if updated DVs can be used for sorting
+ assertQ(req("q", "*:*", "sort", "inplace_updatable_float asc"),
+ "//result/doc[4]/str[@name='id'][.='1']",
+ "//result/doc[4]/float[@name='inplace_updatable_float'][.='204.0']",
+
+ "//result/doc[5]/str[@name='id'][.='2']",
+ "//result/doc[5]/float[@name='inplace_updatable_float'][.='300.0']",
+
+ "//result/doc[3]/str[@name='id'][.='3']",
+ "//result/doc[3]/float[@name='inplace_updatable_float'][.='103.0']",
+
+ "//result/doc[6]/str[@name='id'][.='4']",
+ "//result/doc[6]/float[@name='inplace_updatable_float'][.='401.0']",
+
+ "//result/doc[1]/str[@name='id'][.='5']",
+ "//result/doc[1]/float[@name='inplace_updatable_float'][.='1.0']",
+
+ "//result/doc[7]/str[@name='id'][.='6']",
+ "//result/doc[7]/float[@name='inplace_updatable_float'][.='600.0']",
+
+ "//result/doc[2]/str[@name='id'][.='20']",
+ "//result/doc[2]/float[@name='inplace_updatable_float'][.='102.0']");
+ }
+
+ @Test
+ public void testUpdateTwoDifferentFields() throws Exception {
+ long version1 = addAndGetVersion(sdoc("id", "1", "title_s", "first"), null);
+ assertU(commit("softCommit", "false"));
+ assertQ(req("q", "*:*"), "//*[@numFound='1']");
+
+ int docid1 = getDocId("1");
+
+ // Check docValues were "set"
+ version1 = addAndAssertVersion(version1, "id", "1", "inplace_updatable_float", map("set", 200));
+ version1 = addAndAssertVersion(version1, "id", "1", "inplace_updatable_int", map("set", 10));
+ assertU(commit("softCommit", "false"));
+
+ assertU(commit("softCommit", "false"));
+
+ assertQ(req("q", "*:*", "sort", "id asc", "fl", "*,[docid]"),
+ "//*[@numFound='1']",
+ "//result/doc[1]/float[@name='inplace_updatable_float'][.='200.0']",
+ "//result/doc[1]/long[@name='_version_'][.='"+version1+"']",
+ "//result/doc[1]/int[@name='[docid]'][.='"+docid1+"']"
+ );
+
+ // two different update commands, updating each of the fields separately
+ version1 = addAndAssertVersion(version1, "id", "1", "inplace_updatable_int", map("inc", 1));
+ version1 = addAndAssertVersion(version1, "id", "1", "inplace_updatable_float", map("inc", 1));
+ // same update command, updating both the fields together
+ version1 = addAndAssertVersion(version1, "id", "1", "inplace_updatable_int", map("inc", 1),
+ "inplace_updatable_float", map("inc", 1));
+
+ if (random().nextBoolean()) {
+ assertU(commit("softCommit", "false"));
+ assertQ(req("q", "*:*", "sort", "id asc", "fl", "*,[docid]"),
+ "//*[@numFound='1']",
+ "//result/doc[1]/float[@name='inplace_updatable_float'][.='202.0']",
+ "//result/doc[1]/int[@name='inplace_updatable_int'][.='12']",
+ "//result/doc[1]/long[@name='_version_'][.='"+version1+"']",
+ "//result/doc[1]/int[@name='[docid]'][.='"+docid1+"']"
+ );
+ }
+
+ // RTG
+ assertJQ(req("qt","/get", "id","1", "fl","id,inplace_updatable_float,inplace_updatable_int"),
+ "=={'doc':{'id':'1', 'inplace_updatable_float':" + 202.0 + ",'inplace_updatable_int':" + 12 + "}}");
+
+ }
+
+ @Test
+ public void testDVUpdatesWithDBQofUpdatedValue() throws Exception {
+ long version1 = addAndGetVersion(sdoc("id", "1", "title_s", "first", "inplace_updatable_float", "0"), null);
+ assertU(commit());
+
+ // in-place update
+ addAndAssertVersion(version1, "id", "1", "inplace_updatable_float", map("set", 100), "_version_", version1);
+
+ // DBQ where q=inplace_updatable_float:100
+ assertU(delQ("inplace_updatable_float:100"));
+
+ assertU(commit());
+
+ assertQ(req("q", "*:*"), "//*[@numFound='0']");
+ }
+
+ @Test
+ public void testDVUpdatesWithDelete() throws Exception {
+ long version1 = 0;
+
+ for (boolean postAddCommit : Arrays.asList(true, false)) {
+ for (boolean delById : Arrays.asList(true, false)) {
+ for (boolean postDelCommit : Arrays.asList(true, false)) {
+ addAndGetVersion(sdoc("id", "1", "title_s", "first"), params());
+ if (postAddCommit) assertU(commit());
+ assertU(delById ? delI("1") : delQ("id:1"));
+ if (postDelCommit) assertU(commit());
+ version1 = addAndGetVersion(sdoc("id", "1", "inplace_updatable_float", map("set", 200)), params());
+ // assert current doc#1 doesn't have old value of "title_s"
+ assertU(commit());
+ assertQ(req("q", "title_s:first", "sort", "id asc", "fl", "*,[docid]"),
+ "//*[@numFound='0']");
+ }
+ }
+ }
+
+ // Update to recently deleted (or non-existent) document with a "set" on updateable
+ // field should succeed, since it is executed internally as a full update
+ // because AUDM.doInPlaceUpdateMerge() returns false
+ assertU(random().nextBoolean()? delI("1"): delQ("id:1"));
+ if (random().nextBoolean()) assertU(commit());
+ addAndAssertVersion(version1, "id", "1", "inplace_updatable_float", map("set", 200));
+ assertU(commit());
+ assertQ(req("q", "id:1", "sort", "id asc", "fl", "*"),
+ "//*[@numFound='1']",
+ "//result/doc[1]/float[@name='inplace_updatable_float'][.='200.0']");
+
+ // Another "set" on the same field should be an in-place update
+ int docid1 = getDocId("1");
+ addAndAssertVersion(version1, "id", "1", "inplace_updatable_float", map("set", 300));
+ assertU(commit());
+ assertQ(req("q", "id:1", "fl", "*,[docid]"),
+ "//result/doc[1]/float[@name='inplace_updatable_float'][.='300.0']",
+ "//result/doc[1]/int[@name='[docid]'][.='"+docid1+"']");
+ }
+
+ public static long addAndAssertVersion(long expectedCurrentVersion, Object... fields) throws Exception {
+ assert 0 < expectedCurrentVersion;
+ long currentVersion = addAndGetVersion(sdoc(fields), null);
+ assertTrue(currentVersion > expectedCurrentVersion);
+ return currentVersion;
+ }
+
+ /**
+ * Helper method to search for the specified (uniqueKey field) id using <code>fl=[docid]</code>
+ * and return the internal lucene docid.
+ */
+ private int getDocId(String id) throws NumberFormatException, Exception {
+ SolrDocumentList results = client.query(params("q","id:" + id, "fl", "[docid]")).getResults();
+ assertEquals(1, results.getNumFound());
+ assertEquals(1, results.size());
+ Object docid = results.get(0).getFieldValue("[docid]");
+ assertTrue(docid instanceof Integer);
+ return ((Integer)docid);
+ }
+
+ @Test
+ public void testUpdateOfNonExistentDVsShouldNotFail() throws Exception {
+ // schema sanity check: assert that the nonexistent_field_i_dvo doesn't exist already
+ FieldInfo fi;
+ RefCounted<SolrIndexSearcher> holder = h.getCore().getSearcher();
+ try {
+ fi = holder.get().getSlowAtomicReader().getFieldInfos().fieldInfo("nonexistent_field_i_dvo");
+ } finally {
+ holder.decref();
+ }
+ assertNull(fi);
+
+ // Partial update
+ addAndGetVersion(sdoc("id", "0", "nonexistent_field_i_dvo", map("set", "42")), null);
+
+ addAndGetVersion(sdoc("id", "1"), null);
+ addAndGetVersion(sdoc("id", "1", "nonexistent_field_i_dvo", map("inc", "1")), null);
+ addAndGetVersion(sdoc("id", "1", "nonexistent_field_i_dvo", map("inc", "1")), null);
+
+ assertU(commit());
+
+ assertQ(req("q", "*:*"), "//*[@numFound='2']");
+ assertQ(req("q", "nonexistent_field_i_dvo:42"), "//*[@numFound='1']");
+ assertQ(req("q", "nonexistent_field_i_dvo:2"), "//*[@numFound='1']");
+ }
+
+ @Test
+ public void testOnlyPartialUpdatesBetweenCommits() throws Exception {
+ // Full updates
+ long version1 = addAndGetVersion(sdoc("id", "1", "title_s", "first", "val1_i_dvo", "1", "val2_l_dvo", "1"), params());
+ long version2 = addAndGetVersion(sdoc("id", "2", "title_s", "second", "val1_i_dvo", "2", "val2_l_dvo", "2"), params());
+ long version3 = addAndGetVersion(sdoc("id", "3", "title_s", "third", "val1_i_dvo", "3", "val2_l_dvo", "3"), params());
+ assertU(commit("softCommit", "false"));
+
+ assertQ(req("q", "*:*", "fl", "*,[docid]"), "//*[@numFound='3']");
+
+ int docid1 = getDocId("1");
+ int docid2 = getDocId("2");
+ int docid3 = getDocId("3");
+
+ int numPartialUpdates = 1 + random().nextInt(5000);
+ for (int i=0; i<numPartialUpdates; i++) {
+ version1 = addAndAssertVersion(version1, "id", "1", "val1_i_dvo", map("set", i));
+ version2 = addAndAssertVersion(version2, "id", "2", "val1_i_dvo", map("inc", 1));
+ version3 = addAndAssertVersion(version3, "id", "3", "val1_i_dvo", map("set", i));
+
+ version1 = addAndAssertVersion(version1, "id", "1", "val2_l_dvo", map("set", i));
+ version2 = addAndAssertVersion(version2, "id", "2", "val2_l_dvo", map("inc", 1));
+ version3 = addAndAssertVersion(version3, "id", "3", "val2_l_dvo", map("set", i));
+ }
+ assertU(commit("softCommit", "true"));
+
+ assertQ(req("q", "*:*", "sort", "id asc", "fl", "*,[docid]"),
+ "//*[@numFound='3']",
+ "//result/doc[1]/int[@name='val1_i_dvo'][.='"+(numPartialUpdates-1)+"']",
+ "//result/doc[2]/int[@name='val1_i_dvo'][.='"+(numPartialUpdates+2)+"']",
+ "//result/doc[3]/int[@name='val1_i_dvo'][.='"+(numPartialUpdates-1)+"']",
+ "//result/doc[1]/long[@name='val2_l_dvo'][.='"+(numPartialUpdates-1)+"']",
+ "//result/doc[2]/long[@name='val2_l_dvo'][.='"+(numPartialUpdates+2)+"']",
+ "//result/doc[3]/long[@name='val2_l_dvo'][.='"+(numPartialUpdates-1)+"']",
+ "//result/doc[1]/int[@name='[docid]'][.='"+docid1+"']",
+ "//result/doc[2]/int[@name='[docid]'][.='"+docid2+"']",
+ "//result/doc[3]/int[@name='[docid]'][.='"+docid3+"']",
+ "//result/doc[1]/long[@name='_version_'][.='" + version1 + "']",
+ "//result/doc[2]/long[@name='_version_'][.='" + version2 + "']",
+ "//result/doc[3]/long[@name='_version_'][.='" + version3 + "']"
+ );
+ }
+
+ /**
+ * Useful to store the state of an expected document into an in-memory model
+ * representing the index.
+ */
+ private static class DocInfo {
+ public final long version;
+ public final Long value;
+
+ public DocInfo(long version, Long val) {
+ this.version = version;
+ this.value = val;
+ }
+
+ @Override
+ public String toString() {
+ return "["+version+", "+value+"]";
+ }
+ }
+
+ /** @see #checkReplay */
+ @Test
+ public void testReplay_AfterInitialAddMixOfIncAndSet() throws Exception {
+ checkReplay("val2_l_dvo",
+ //
+ sdoc("id", "0", "val2_l_dvo", 3000000000L),
+ sdoc("id", "0", "val2_l_dvo", map("inc", 3)),
+ HARDCOMMIT,
+ sdoc("id", "0", "val2_l_dvo", map("inc", 5)),
+ sdoc("id", "1", "val2_l_dvo", 2000000000L),
+ sdoc("id", "1", "val2_l_dvo", map("set", 2000000002L)),
+ sdoc("id", "1", "val2_l_dvo", map("set", 3000000000L)),
+ sdoc("id", "0", "val2_l_dvo", map("inc", 7)),
+ sdoc("id", "1", "val2_l_dvo", map("set", 7000000000L)),
+ sdoc("id", "0", "val2_l_dvo", map("inc", 11)),
+ sdoc("id", "2", "val2_l_dvo", 2000000000L),
+ HARDCOMMIT,
+ sdoc("id", "2", "val2_l_dvo", map("set", 3000000000L)),
+ HARDCOMMIT);
+ }
+
+ /** @see #checkReplay */
+ @Test
+ public void testReplay_AfterInitialAddMixOfIncAndSetAndFullUpdates() throws Exception {
+ checkReplay("val2_l_dvo",
+ //
+ sdoc("id", "0", "val2_l_dvo", 3000000000L),
+ sdoc("id", "0", "val2_l_dvo", map("set", 3000000003L)),
+ HARDCOMMIT,
+ sdoc("id", "0", "val2_l_dvo", map("set", 3000000008L)),
+ sdoc("id", "1", "val2_l_dvo", 2000000000L),
+ sdoc("id", "1", "val2_l_dvo", map("inc", 2)),
+ sdoc("id", "1", "val2_l_dvo", 3000000000L),
+ sdoc("id", "0", "val2_l_dvo", map("set", 3000000015L)),
+ sdoc("id", "1", "val2_l_dvo", 7000000000L),
+ sdoc("id", "0", "val2_l_dvo", map("set", 3000000026L)),
+ sdoc("id", "2", "val2_l_dvo", 2000000000L),
+ HARDCOMMIT,
+ sdoc("id", "2", "val2_l_dvo", 3000000000L),
+ HARDCOMMIT);
+ }
+
+ /** @see #checkReplay */
+ @Test
+ public void testReplay_AllUpdatesAfterInitialAddAreInc() throws Exception {
+ checkReplay("val2_l_dvo",
+ //
+ sdoc("id", "0", "val2_l_dvo", 3000000000L),
+ sdoc("id", "0", "val2_l_dvo", map("inc", 3)),
+ HARDCOMMIT,
+ sdoc("id", "0", "val2_l_dvo", map("inc", 5)),
+ sdoc("id", "1", "val2_l_dvo", 2000000000L),
+ sdoc("id", "1", "val2_l_dvo", map("inc", 2)),
+ sdoc("id", "1", "val2_l_dvo", 3000000000L),
+ sdoc("id", "0", "val2_l_dvo", map("inc", 7)),
+ sdoc("id", "1", "val2_l_dvo", 7000000000L),
+ sdoc("id", "0", "val2_l_dvo", map("inc", 11)),
+ sdoc("id", "2", "val2_l_dvo", 2000000000L),
+ HARDCOMMIT,
+ sdoc("id", "2", "val2_l_dvo", 3000000000L),
+ HARDCOMMIT);
+ }
+
+ /** @see #checkReplay */
+ @Test
+ public void testReplay_AllUpdatesAfterInitialAddAreSets() throws Exception {
+ checkReplay("val2_l_dvo",
+ //
+ sdoc("id", "0", "val2_l_dvo", 3000000000L),
+ sdoc("id", "0", "val2_l_dvo", map("set", 3000000003L)),
+ HARDCOMMIT,
+ sdoc("id", "0", "val2_l_dvo", map("set", 3000000008L)),
+ sdoc("id", "1", "val2_l_dvo", 2000000000L),
+ sdoc("id", "1", "val2_l_dvo", map("set", 2000000002L)),
+ sdoc("id", "1", "val2_l_dvo", map("set", 3000000000L)),
+ sdoc("id", "0", "val2_l_dvo", map("set", 3000000015L)),
+ sdoc("id", "1", "val2_l_dvo", map("set", 7000000000L)),
+ sdoc("id", "0", "val2_l_dvo", map("set", 3000000026L)),
+ sdoc("id", "2", "val2_l_dvo", 2000000000L),
+ HARDCOMMIT,
+ sdoc("id", "2", "val2_l_dvo", map("set", 3000000000L)),
+ HARDCOMMIT
+ );
+ }
+
+ /** @see #checkReplay */
+ @Test
+ public void testReplay_MixOfInplaceAndNonInPlaceAtomicUpdates() throws Exception {
+ checkReplay("inplace_l_dvo",
+ //
+ sdoc("id", "3", "inplace_l_dvo", map("inc", -13)),
+ sdoc("id", "3", "inplace_l_dvo", map("inc", 19), "regular_l", map("inc", -17)),
+ sdoc("id", "1", "regular_l", map("inc", -19)),
+ sdoc("id", "3", "inplace_l_dvo", map("inc", -11)),
+ sdoc("id", "2", "inplace_l_dvo", map("set", 28)),
+ HARDCOMMIT,
+ sdoc("id", "2", "inplace_l_dvo", map("inc", 45)),
+ sdoc("id", "3", "inplace_l_dvo", map("set", 72)),
+ sdoc("id", "2", "regular_l", map("inc", -55)),
+ sdoc("id", "2", "inplace_l_dvo", -48, "regular_l", 159),
+ sdoc("id", "3", "inplace_l_dvo", 52, "regular_l", 895),
+ sdoc("id", "2", "inplace_l_dvo", map("inc", 19)),
+ sdoc("id", "3", "inplace_l_dvo", map("inc", -264), "regular_l", map("inc", -207)),
+ sdoc("id", "3", "inplace_l_dvo", -762, "regular_l", 272),
+ SOFTCOMMIT);
+ }
+
+
+ /**
+ * Simple enum for randomizing a type of update.
+ * Each enum value has an associated probability, and the class has built in sanity checks
+ * that the total is 100%
+ *
+ * @see RandomUpdate#pick
+ * @see #checkRandomReplay
+ */
+ private static enum RandomUpdate {
+ HARD_COMMIT(5),
+ SOFT_COMMIT(5),
+
+ /** doc w/o the inplaceField, atomic update on some other (non-inplace) field */
+ ATOMIC_NOT_INPLACE(5),
+
+ /** atomic update of a doc w/ inc on both inplaceField *AND* non-inplace field */
+ ATOMIC_INPLACE_AND_NOT_INPLACE(10),
+
+
+ /** atomic update of a doc w/ set inplaceField */
+ ATOMIC_INPLACE_SET(25),
+ /** atomic update of a doc w/ inc inplaceField */
+ ATOMIC_INPLACE_INC(25),
+
+ /** doc w/o the inplaceField, normal add */
+ ADD_NO_INPLACE_VALUE(5),
+ /** a non atomic update of a doc w/ new inplaceField value */
+ ADD_INPLACE_VALUE(20);
+
+ private RandomUpdate(int odds) {
+ this.odds = odds;
+ }
+ public final int odds;
+
+ static { // sanity check odds add up to 100%
+ int total = 0;
+ for (RandomUpdate candidate : RandomUpdate.values()) {
+ total += candidate.odds;
+ }
+ assertEquals("total odds doesn't equal 100", 100, total);
+ }
+
+ /** pick a random type of RandomUpdate */
+ public static final RandomUpdate pick(Random r) {
+ final int target = TestUtil.nextInt(r, 1, 100);
+ int cumulative_odds = 0;
+ for (RandomUpdate candidate : RandomUpdate.values()) {
+ cumulative_odds += candidate.odds;
+ if (target <= cumulative_odds) {
+ return candidate;
+ }
+ }
+ fail("how did we not find a candidate? target=" + target + ", cumulative_odds=" + cumulative_odds);
+ return null; // compiler mandated return
+ }
+ }
+
+ /** @see #checkRandomReplay */
+ @Test
+ public void testReplay_Random_ManyDocsManyUpdates() throws Exception {
+
+ // build up a random list of updates
+ final int maxDocId = atLeast(50);
+ final int numUpdates = maxDocId * 3;
+ checkRandomReplay(maxDocId, numUpdates);
+ }
+
+ /** @see #checkRandomReplay */
+ @Test
+ public void testReplay_Random_FewDocsManyUpdates() throws Exception {
+
+ // build up a random list of updates
+ final int maxDocId = atLeast(3);
+ final int numUpdates = maxDocId * 50;
+ checkRandomReplay(maxDocId, numUpdates);
+ }
+
+ /** @see #checkRandomReplay */
+ @Test
+ public void testReplay_Random_FewDocsManyShortSequences() throws Exception {
+
+ // build up a random list of updates
+ final int numIters = atLeast(50);
+
+ for (int i = 0; i < numIters; i++) {
+ final int maxDocId = atLeast(3);
+ final int numUpdates = maxDocId * 5;
+ checkRandomReplay(maxDocId, numUpdates);
+ deleteAllAndCommit();
+ }
+ }
+
+ /**
+ * @see #checkReplay
+ * @see RandomUpdate
+ */
+ public void checkRandomReplay(final int maxDocId, final int numCmds) throws Exception {
+
+ final String not_inplaceField = "regular_l";
+
+ // nocommit: can use a regular long field to sanity check if failing seed is general
+ // bug with test/atomic update code, or specific to inplace update
+ //
+ // nocommit: should we randomize this when committing?
+ //
+ //final String inplaceField = "nocommit_not_really_inplace_l"; // nocommit
+ final String inplaceField = "inplace_l_dvo";
+
+ final Object[] cmds = new Object[numCmds];
+ for (int iter = 0; iter < numCmds; iter++) {
+ final int id = TestUtil.nextInt(random(), 1, maxDocId);
+ final RandomUpdate update = RandomUpdate.pick(random());
+
+ switch (update) {
+
+ case HARD_COMMIT:
+ cmds[iter] = HARDCOMMIT;
+ break;
+
+ case SOFT_COMMIT:
+ cmds[iter] = SOFTCOMMIT;
+ break;
+
+ case ATOMIC_NOT_INPLACE:
+ // atomic update on non_inplaceField, w/o any value specified for inplaceField
+ cmds[iter] = sdoc("id", id,
+ not_inplaceField, map("inc", random().nextInt()));
+ break;
+
+ case ATOMIC_INPLACE_AND_NOT_INPLACE:
+ // atomic update of a doc w/ inc on both inplaceField and not_inplaceField
+ cmds[iter] = sdoc("id", id,
+ inplaceField, map("inc", random().nextInt()),
+ not_inplaceField, map("inc", random().nextInt()));
+ break;
+
+ case ATOMIC_INPLACE_SET:
+ // atomic update of a doc w/ set inplaceField
+ cmds[iter] = sdoc("id", id,
+ inplaceField, map("set", random().nextLong()));
+ break;
+
+ case ATOMIC_INPLACE_INC:
+ // atomic update of a doc w/ inc inplaceField
+ cmds[iter] = sdoc("id", id,
+ inplaceField, map("inc", random().nextInt()));
+ break;
+
+ case ADD_NO_INPLACE_VALUE:
+ // regular add of doc w/o the inplaceField, but does include non_inplaceField
+ cmds[iter] = sdoc("id", id,
+ not_inplaceField, random().nextLong());
+ break;
+
+ case ADD_INPLACE_VALUE:
+ // a non atomic update of a doc w/ new inplaceField value
+ cmds[iter] = sdoc("id", id,
+ inplaceField, random().nextLong(),
+ not_inplaceField, random().nextLong());
+ break;
+
+ default:
+ fail("WTF is this? ... " + update);
+ }
+
+ assertNotNull(cmds[iter]); // sanity check switch
+ }
+
+ checkReplay(inplaceField, cmds);
+ }
+
+ /** sentinal object for {@link #checkReplay} */
+ public Object SOFTCOMMIT = new Object() { public String toString() { return "SOFTCOMMIT"; } };
+ /** sentinal object for {@link #checkReplay} */
+ public Object HARDCOMMIT = new Object() { public String toString() { return "HARDCOMMIT"; } };
+
+ /**
+ * Executes a sequence of commands against Solr, while tracking the expected value of a specified
+ * <code>valField</code> Long field (presumably that only uses docvalues) against an in memory model
+ * maintained in parallel (for the purpose of testing the correctness of in-place updates..
+ *
+ * <p>
+ * A few restrictions are placed on the {@link SolrInputDocument}s that can be included when using
+ * this method, in order to keep the in-memory model management simple:
+ * </p>
+ * <ul>
+ * <li><code>id</code> must be uniqueKey field</li>
+ * <li><code>id</code> may have any FieldType, but all values must be parsable as Integers</li>
+ * <li><code>valField</code> must be a single valued field</li>
+ * <li>All values in the <code>valField</code> must either be {@link Number}s, or Maps containing
+ * atomic updates ("inc" or "set") where the atomic value is a {@link Number}</li>
+ * </ul>
+ *
+ * @param valField the field to model
+ * @param commands A sequence of Commands which can either be SolrInputDocuments
+ * (regular or containing atomic update Maps)
+ * or one of the {@link HARDCOMMIT} or {@link SOFTCOMMIT} sentinal objects.
+ */
+ public void checkReplay(final String valField, Object... commands) throws Exception {
+
+ HashMap<Integer, DocInfo> model = new LinkedHashMap<>();
+ HashMap<Integer, DocInfo> committedModel = new LinkedHashMap<>();
+
+ // by default, we only check the committed model after a commit
+ // of if the number of total commands is relatively small.
+ //
+ // (in theory, there's no reason to check the committed model unless we know there's been a commit
+ // but for smaller tests the overhead of doing so is tiny, so we might as well)
+ //
+ // if some test seed fails, and you want to force the committed model to be checked
+ // after every command, just temporaribly force this variable to true...
+ boolean checkCommittedModel = (commands.length < 50);
+
+ for (Object cmd : commands) {
+ if (cmd == SOFTCOMMIT) {
+ assertU(commit("softCommit", "true"));
+ committedModel = new LinkedHashMap(model);
+ checkCommittedModel = true;
+ } else if (cmd == HARDCOMMIT) {
+ assertU(commit("softCommit", "false"));
+ committedModel = new LinkedHashMap(model);
+ checkCommittedModel = true;
+ } else {
+ assertNotNull("null command in checkReplay", cmd);
+ assertTrue("cmd is neither sentinal (HARD|SOFT)COMMIT object, nor Solr doc: " + cmd.getClass(),
+ cmd instanceof SolrInputDocument);
+
+ final SolrInputDocument sdoc = (SolrInputDocument) cmd;
+ final int id = Integer.parseInt(sdoc.getFieldValue("id").toString());
+
+ final DocInfo previousInfo = model.get(id);
+ final Long previousValue = (null == previousInfo) ? null : previousInfo.value;
+
+ final long version = addAndGetVersion(sdoc, null);
+
+ final Object val = sdoc.getFieldValue(valField);
+ if (val instanceof Map) {
+ // atomic update of the field we're modeling
+
+ Map<String,?> atomicUpdate = (Map) val;
+ assertEquals(sdoc.toString(), 1, atomicUpdate.size());
+ if (atomicUpdate.containsKey("inc")) {
+ // Solr treats inc on a non-existing doc (or doc w/o existing value) as if existing value is 0
+ final long base = (null == previousValue) ? 0L : previousValue;
+ model.put(id, new DocInfo(version,
+ base + ((Number)atomicUpdate.get("inc")).longValue()));
+ } else if (atomicUpdate.containsKey("set")) {
+ model.put(id, new DocInfo(version, ((Number)atomicUpdate.get("set")).longValue()));
+ } else {
+ fail("wtf update is this? ... " + sdoc);
+ }
+ } else if (null == val) {
+ // the field we are modeling is not mentioned in this update, It's either...
+ //
+ // a) a regular update of some other fields (our model should have a null value)
+ // b) an atomic update of some other field (keep existing value in model)
+ //
+ // for now, assume it's atomic and we're going to keep our existing value...
+ Long newValue = (null == previousInfo) ? null : previousInfo.value;
+ for (SolrInputField field : sdoc) {
+ if (! ( "id".equals(field.getName()) || (field.getValue() instanceof Map)) ) {
+ // not an atomic update, newValue in model should be null
+ newValue = null;
+ break;
+ }
+ }
+ model.put(id, new DocInfo(version, newValue));
+
+ } else {
+ // regular replacement of the value in the field we're modeling
+
+ assertTrue("Model field value is not a Number: " + val.getClass(), val instanceof Number);
+ model.put(id, new DocInfo(version, ((Number)val).longValue()));
+ }
+ }
+
+ // after every op, check the model(s)
+
+ // RTG to check the values for every id against the model
+ for (Map.Entry<Integer, DocInfo> entry : model.entrySet()) {
+ final Long expected = entry.getValue().value;
+ assertEquals(expected, client.getById(String.valueOf(entry.getKey())).getFirstValue(valField));
+ }
+
+ // search to check the values for every id in the committed model
+ if (checkCommittedModel) {
+ final int numCommitedDocs = committedModel.size();
+ String[] xpaths = new String[1 + numCommitedDocs];
+ int i = 0;
+ for (Map.Entry<Integer, DocInfo> entry : committedModel.entrySet()) {
+ Integer id = entry.getKey();
+ Long expected = entry.getValue().value;
+ if (null != expected) {
+ xpaths[i] = "//result/doc[./str='"+id+"'][./long='"+expected+"']";
+ } else {
+ xpaths[i] = "//result/doc[./str='"+id+"'][not(./long)]";
+ }
+ i++;
+ }
+ xpaths[i] = "//*[@numFound='"+numCommitedDocs+"']";
+ assertQ(req("q", "*:*",
+ "fl", "id," + valField,
+ "rows", ""+numCommitedDocs),
+ xpaths);
+ }
+ }
+ }
+
+ @Test
+ public void testMixedInPlaceAndNonInPlaceAtomicUpdates() throws Exception {
+ SolrDocument rtgDoc = null;
+ long version1 = addAndGetVersion(sdoc("id", "1", "inplace_updatable_float", "100", "stored_i", "100"), params());
+
+ version1 = addAndAssertVersion(version1, "id", "1", "inplace_updatable_float", map("inc", "1"), "stored_i", map("inc", "1"));
+ rtgDoc = client.getById("1");
+ assertEquals(101, rtgDoc.getFieldValue("stored_i"));
+ assertEquals(101.0f, rtgDoc.getFieldValue("inplace_updatable_float"));
+
+ version1 = addAndAssertVersion(version1, "id", "1", "inplace_updatable_float", map("inc", "1"));
+ rtgDoc = client.getById("1");
+ assertEquals(101, rtgDoc.getFieldValue("stored_i"));
+ assertEquals(102.0f, rtgDoc.getFieldValue("inplace_updatable_float"));
+
+ version1 = addAndAssertVersion(version1, "id", "1", "stored_i", map("inc", "1"));
+ rtgDoc = client.getById("1");
+ assertEquals(102, rtgDoc.getFieldValue("stored_i"));
+ assertEquals(102.0f, rtgDoc.getFieldValue("inplace_updatable_float"));
+
+ assertU(commit("softCommit", "false"));
+ assertQ(req("q", "*:*", "sort", "id asc", "fl", "*"),
+ "//*[@numFound='1']",
+ "//result/doc[1]/float[@name='inplace_updatable_float'][.='102.0']",
+ "//result/doc[1]/int[@name='stored_i'][.='102']",
+ "//result/doc[1]/long[@name='_version_'][.='" + version1 + "']"
+ );
+
+ // recheck RTG after commit
+ rtgDoc = client.getById("1");
+ assertEquals(102, rtgDoc.getFieldValue("stored_i"));
+ assertEquals(102.0f, rtgDoc.getFieldValue("inplace_updatable_float"));
+ }
+
+ /** @see AtomicUpdateDocumentMerger#isInPlaceUpdate */
+ @Test
+ public void testIsInPlaceUpdate() throws Exception {
+ Set<String> inPlaceUpdatedFields = new HashSet<String>();
+
+ // In-place updates:
+ inPlaceUpdatedFields = AtomicUpdateDocumentMerger.isInPlaceUpdate(
+ UpdateLogTest.getAddUpdate(null, sdoc("id", "1", "_version_", 42L, "inplace_updatable_float", map("set", 10))));
+ assertTrue(inPlaceUpdatedFields.contains("inplace_updatable_float"));
+
+ inPlaceUpdatedFields.clear();
+ inPlaceUpdatedFields = AtomicUpdateDocumentMerger.isInPlaceUpdate(
+ UpdateLogTest.getAddUpdate(null, sdoc("id", "1", "_version_", 42L, "inplace_updatable_float", map("inc", 10))));
+ assertTrue(inPlaceUpdatedFields.contains("inplace_updatable_float"));
+
+ inPlaceUpdatedFields.clear();
+ inPlaceUpdatedFields = AtomicUpdateDocumentMerger.isInPlaceUpdate(
+ UpdateLogTest.getAddUpdate(null, sdoc("id", "1", "_version_", 42L, "inplace_updatable_int", map("set", 10))));
+ assertTrue(inPlaceUpdatedFields.contains("inplace_updatable_int"));
+
+ // Non in-place updates
+ inPlaceUpdatedFields.clear();
+ addAndGetVersion(sdoc("id", "1", "stored_i", "0"), params()); // setting up the dv
+ assertTrue("stored field updated", AtomicUpdateDocumentMerger.isInPlaceUpdate(
+ UpdateLogTest.getAddUpdate(null, sdoc("id", "1", "_version_", 42L, "stored_i", map("inc", 1)))).isEmpty());
+
+ assertTrue("No map means full document update", AtomicUpdateDocumentMerger.isInPlaceUpdate(
+ UpdateLogTest.getAddUpdate(null, sdoc("id", "1", "_version_", 42L, "inplace_updatable_int", "100"))).isEmpty());
+
+ assertTrue("non existent dynamic dv field updated first time",
+ AtomicUpdateDocumentMerger.isInPlaceUpdate(
+ UpdateLogTest.getAddUpdate(null, sdoc("id", "1", "_version_", 42L, "new_updateable_int_i_dvo", map("set", 10)))).isEmpty());
+
+ // After adding a full document with the dynamic dv field, in-place update should work
+ addAndGetVersion(sdoc("id", "2", "new_updateable_int_i_dvo", "0"), params()); // setting up the dv
+ if (random().nextBoolean())
+ assertU(commit("softCommit", "false"));
+ inPlaceUpdatedFields.clear();
+ inPlaceUpdatedFields = AtomicUpdateDocumentMerger.isInPlaceUpdate(
+ UpdateLogTest.getAddUpdate(null, sdoc("id", "2", "_version_", 42L, "new_updateable_int_i_dvo", map("set", 10))));
+ assertTrue(inPlaceUpdatedFields.contains("new_updateable_int_i_dvo"));
+
+ // If a supported dv field has a copyField target which is supported, it should be an in-place update
+ inPlaceUpdatedFields = AtomicUpdateDocumentMerger.isInPlaceUpdate
+ (UpdateLogTest.getAddUpdate(null, sdoc("id", "1", "_version_", 42L,
+ "copyfield1_src__both_updateable", map("set", 10))));
+ assertTrue(inPlaceUpdatedFields.contains("copyfield1_src__both_updateable"));
+
+ // If a supported dv field has a copyField target which is not supported, it should not be an in-place update
+ inPlaceUpdatedFields = AtomicUpdateDocumentMerger.isInPlaceUpdate
+ (UpdateLogTest.getAddUpdate(null, sdoc("id", "1", "_version_", 42L,
+ "copyfield2_src__only_src_updatable", map("set", 10))));
+ assertTrue(inPlaceUpdatedFields.isEmpty());
+ }
+
+ @Test
+ /**
+ * Test the @see {@link AtomicUpdateDocumentMerger#doInPlaceUpdateMerge(AddUpdateCommand,Set<String>)}
+ * method is working fine
+ */
+ public void testDoInPlaceUpdateMerge() throws Exception {
+ long version1 = addAndGetVersion(sdoc("id", "1", "title_s", "first"), null);
+ long version2 = addAndGetVersion(sdoc("id", "2", "title_s", "second"), null);
+ long version3 = addAndGetVersion(sdoc("id", "3", "title_s", "third"), null);
+ assertU(commit("softCommit", "false"));
+ assertQ(req("q", "*:*"), "//*[@numFound='3']");
+
+ // Adding a few in-place updates
+ version1 = addAndAssertVersion(version1, "id", "1", "inplace_updatable_float", map("set", 200));
+
+ // Test the AUDM.doInPlaceUpdateMerge() method is working fine
+ AddUpdateCommand cmd = UpdateLogTest.getAddUpdate(null, sdoc("id", "1", "_version_", 42L, "inplace_updatable_float", map("inc", 10)));
+ SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), params());
+ AtomicUpdateDocumentMerger docMerger = new AtomicUpdateDocumentMerger(req);
+ boolean done = docMerger.doInPlaceUpdateMerge(cmd, AtomicUpdateDocumentMerger.isInPlaceUpdate(cmd));
+ assertTrue(done);
+ assertEquals(42L, cmd.getSolrInputDocument().getFieldValue("_version_"));
+ assertEquals(42L, cmd.getSolrInputDocument().getFieldValue("_version_"));
+ assertEquals(210f, cmd.getSolrInputDocument().getFieldValue("inplace_updatable_float"));
+ assertFalse(cmd.getSolrInputDocument().containsKey("title_s")); // in-place merged doc shouldn't have non-inplace fields from the index/tlog
+ assertEquals(version1, cmd.prevVersion);
+
+ // do a commit, and the same results should be repeated
+ assertU(commit("softCommit", "false"));
+
+ cmd = UpdateLogTest.getAddUpdate(null, sdoc("id", "1", "_version_", 42L, "inplace_updatable_float", map("inc", 10)));
+ done = docMerger.doInPlaceUpdateMerge(cmd, AtomicUpdateDocumentMerger.isInPlaceUpdate(cmd));
+ assertTrue(done);
+ assertEquals(42L, cmd.getSolrInputDocument().getFieldValue("_version_"));
+ assertEquals(42L, cmd.getSolrInputDocument().getFieldValue("_version_"));
+ assertEquals(210f, cmd.getSolrInputDocument().getFieldValue("inplace_updatable_float"));
+ assertFalse(cmd.getSolrInputDocument().containsKey("title_s")); // in-place merged doc shouldn't have non-inplace fields from the index/tlog
+ assertEquals(version1, cmd.prevVersion);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/test/org/apache/solr/update/UpdateLogTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/UpdateLogTest.java b/solr/core/src/test/org/apache/solr/update/UpdateLogTest.java
new file mode 100644
index 0000000..ccc55f9
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/update/UpdateLogTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update;
+
+import java.util.List;
+
+import org.apache.lucene.document.NumericDocValuesField;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.handler.component.RealTimeGetComponent;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.internal.matchers.StringContains.containsString;
+
+public class UpdateLogTest extends SolrTestCaseJ4 {
+
+ static UpdateLog ulog = null;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+
+ // nocommit: does this test need to randomize between diff schema/fields used?
+ // nocommit: see nocommits/jira questions related to special dynamicField logic in AtomicUpdateDocumentMerger.isInPlaceUpdate
+
+ initCore("solrconfig-tlog.xml", "schema-inplace-updates.xml");
+
+ try (SolrQueryRequest req = req()) {
+ UpdateHandler uhandler = req.getCore().getUpdateHandler();
+ ((DirectUpdateHandler2) uhandler).getCommitTracker().setTimeUpperBound(100);
+ ((DirectUpdateHandler2) uhandler).getCommitTracker().setOpenSearcher(false);
+ ulog = uhandler.getUpdateLog();
+ }
+ }
+
+ @Test
+ /**
+ * @see org.apache.solr.update.UpdateLog#applyPartialUpdates(BytesRef,long,long,SolrDocumentBase)
+ */
+ public void testApplyPartialUpdatesOnMultipleInPlaceUpdatesInSequence() {
+ // Add a full update, two in-place updates and verify applying partial updates is working
+ AddUpdateCommand cmd;
+ cmd = getAddUpdate(null, sdoc("id", "1", "title_s", "title1", "val1_i_dvo", "1", "_version_", "100"));
+ ulog.add(cmd);
+ ulog.add(getAddUpdate(100L, sdoc("id", "1", "price", "1000", "val1_i_dvo", "2", "_version_", "101")));
+ ulog.add(getAddUpdate(101L, sdoc("id", "1", "val1_i_dvo", "3", "_version_", "102")));
+
+ Object partialUpdate = ulog.lookup(cmd.getIndexedId());
+ SolrDocument partialDoc = RealTimeGetComponent.toSolrDoc((SolrInputDocument)((List)partialUpdate).get(4),
+ h.getCore().getLatestSchema(), true);
+ long prevVersion = (Long)((List)partialUpdate).get(3);
+ long prevPointer = (Long)((List)partialUpdate).get(2);
+
+ assertEquals(3L, ((NumericDocValuesField)partialDoc.getFieldValue("val1_i_dvo")).numericValue());
+ assertFalse(partialDoc.containsKey("title_s"));
+
+ long returnVal = ulog.applyPartialUpdates(cmd.getIndexedId(), prevPointer, prevVersion, partialDoc);
+
+ assertEquals(0, returnVal);
+ assertEquals(1000, Integer.parseInt(partialDoc.getFieldValue("price").toString()));
+ assertEquals(3L, ((NumericDocValuesField)partialDoc.getFieldValue("val1_i_dvo")).numericValue());
+ assertEquals("title1", partialDoc.getFieldValue("title_s"));
+
+ // Add a full update, commit, then two in-place updates, and verify that applying partial updates is working (since
+ // the prevTlog and prevTlog2 are retained after a commit
+ ulogCommit(ulog);
+ if (random().nextBoolean()) { // sometimes also try a second commit
+ ulogCommit(ulog);
+ }
+ ulog.add(getAddUpdate(102L, sdoc("id", "1", "price", "2000", "val1_i_dvo", "4", "_version_", "200")));
+ ulog.add(getAddUpdate(200L, sdoc("id", "1", "val1_i_dvo", "5", "_version_", "201")));
+
+ partialUpdate = ulog.lookup(cmd.getIndexedId());
+ partialDoc = RealTimeGetComponent.toSolrDoc((SolrInputDocument)((List)partialUpdate).get(4), h.getCore().getLatestSchema(), true);
+ prevVersion = (Long)((List)partialUpdate).get(3);
+ prevPointer = (Long)((List)partialUpdate).get(2);
+
+ assertEquals(5L, ((NumericDocValuesField)partialDoc.getFieldValue("val1_i_dvo")).numericValue());
+ assertFalse(partialDoc.containsKey("title_s"));
+
+ returnVal = ulog.applyPartialUpdates(cmd.getIndexedId(), prevPointer, prevVersion, partialDoc);
+
+ assertEquals(0, returnVal);
+ assertEquals(2000, Integer.parseInt(partialDoc.getFieldValue("price").toString()));
+ assertEquals(5L, ((NumericDocValuesField)partialDoc.getFieldValue("val1_i_dvo")).numericValue());
+ assertEquals("title1", partialDoc.getFieldValue("title_s"));
+ }
+
+ @Test
+ public void testApplyPartialUpdatesAfterMultipleCommits() {
+ AddUpdateCommand cmd;
+ cmd = getAddUpdate(null, sdoc("id", "1", "title_s", "title1", "val1_i_dvo", "1", "_version_", "100"));
+ ulog.add(cmd);
+ ulog.add(getAddUpdate(100L, sdoc("id", "1", "price", "1000", "val1_i_dvo", "2", "_version_", "101")));
+ ulog.add(getAddUpdate(101L, sdoc("id", "1", "val1_i_dvo", "3", "_version_", "102")));
+
+ // Do 3 commits, then in-place update, and verify that applying partial updates can't find full doc
+ for (int i=0; i<3; i++)
+ ulogCommit(ulog);
+ ulog.add(getAddUpdate(101L, sdoc("id", "1", "val1_i_dvo", "6", "_version_", "300")));
+
+ Object partialUpdate = ulog.lookup(cmd.getIndexedId());
+ SolrDocument partialDoc = RealTimeGetComponent.toSolrDoc((SolrInputDocument)((List)partialUpdate).get(4), h.getCore().getLatestSchema(), true);
+ long prevVersion = (Long)((List)partialUpdate).get(3);
+ long prevPointer = (Long)((List)partialUpdate).get(2);
+
+ assertEquals(6L, ((NumericDocValuesField)partialDoc.getFieldValue("val1_i_dvo")).numericValue());
+ assertFalse(partialDoc.containsKey("title_s"));
+
+ long returnVal = ulog.applyPartialUpdates(cmd.getIndexedId(), prevPointer, prevVersion, partialDoc);
+
+ assertEquals(-1, returnVal);
+ }
+
+ @Test
+ public void testApplyPartialUpdatesDependingOnNonAddShouldThrowException() {
+ AddUpdateCommand cmd;
+ cmd = getAddUpdate(null, sdoc("id", "1", "title_s", "title1", "val1_i_dvo", "1", "_version_", "100"));
+ ulog.add(cmd);
+
+ ulog.delete(getDeleteUpdate("1", 500L, false)); // dbi
+ ulog.add(getAddUpdate(500L, sdoc("id", "1", "val1_i_dvo", "2", "_version_", "501")));
+ ulog.add(getAddUpdate(501L, sdoc("id", "1", "val1_i_dvo", "3", "_version_", "502")));
+
+ Object partialUpdate = ulog.lookup(cmd.getIndexedId());
+ SolrDocument partialDoc = RealTimeGetComponent.toSolrDoc((SolrInputDocument)((List)partialUpdate).get(4), h.getCore().getLatestSchema(), true);
+ long prevVersion = (Long)((List)partialUpdate).get(3);
+ long prevPointer = (Long)((List)partialUpdate).get(2);
+
+ assertEquals(3L, ((NumericDocValuesField)partialDoc.getFieldValue("val1_i_dvo")).numericValue());
+ assertEquals(502L, ((NumericDocValuesField)partialDoc.getFieldValue("_version_")).numericValue());
+ assertFalse(partialDoc.containsKey("title_s"));
+
+ // If an in-place update depends on a non-add (i.e. DBI), assert that an exception is thrown.
+ SolrException ex = expectThrows(SolrException.class, () -> {
+ long returnVal = ulog.applyPartialUpdates(cmd.getIndexedId(), prevPointer, prevVersion, partialDoc);
+ fail("502 depends on 501, 501 depends on 500, but 500 is a"
+ + " DELETE. This should've generated an exception. returnVal is: "+returnVal);
+ });
+ assertEquals(ex.toString(), SolrException.ErrorCode.INVALID_STATE.code, ex.code());
+ assertThat(ex.getMessage(), containsString("should've been either ADD or UPDATE_INPLACE"));
+ assertThat(ex.getMessage(), containsString("looking for id=1"));
+ }
+
+ @Test
+ public void testApplyPartialUpdatesWithDBQ() { // nocommit: missleading name?
+
+ // nocommit: no in-place updates happening in this test?
+
+ AddUpdateCommand cmd;
+ cmd = getAddUpdate(null, sdoc("id", "1", "title_s", "title1", "val1_i_dvo", "1", "_version_", "100"));
+ ulog.add(cmd);
+ ulog.add(getAddUpdate(100L, sdoc("id", "1", "val1_i_dvo", "2", "_version_", "101")));
+ ulog.add(getAddUpdate(101L, sdoc("id", "1", "val1_i_dvo", "3", "_version_", "102")));
+ ulog.deleteByQuery(getDeleteUpdate("1", 200L, true)); // dbq, "id:1"
+ assertNull(ulog.lookup(cmd.getIndexedId()));
+
+ // nocommit: need more rigerous assertions about expected behavior after DBQ (new RT searcher?)
+ }
+
+ /**
+ * Simulate a commit at a given updateLog
+ */
+ private static void ulogCommit(UpdateLog ulog) {
+ try (SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), params())) {
+ CommitUpdateCommand commitCmd = new CommitUpdateCommand(req, false);
+ ulog.preCommit(commitCmd);
+ ulog.postCommit(commitCmd);
+ }
+ }
+
+ /**
+ * Obtain a DeleteUpdateCommand for a deleteById operation
+ */
+ private static DeleteUpdateCommand getDeleteUpdate(String id, long version, boolean dbq) {
+ // nocommit: req lifecycle bug
+ // nocommit: cmd returned is linked to req that's already been closed
+ // nocommit: see jira comments for suggested fix
+ try (SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), params())) {
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ if (dbq) {
+ cmd.query = ("id:"+id);
+ } else {
+ cmd.id = id;
+ }
+ cmd.setVersion(version);
+ return cmd;
+ }
+ }
+
+ /**
+ * Obtain an AddUpdateCommand for a full add/in-place update operation.
+ * If there's a non-null prevVersion, then this AddUpdateCommand represents an in-place update.
+ * This method, when prevVersion is passed in (i.e. for in-place update), represents an
+ * AddUpdateCommand that has undergone the merge process and inc/set operations have now been
+ * converted into actual values that just need to be written.
+ */
+ public static AddUpdateCommand getAddUpdate(Long prevVersion, SolrInputDocument sdoc) {
+ // nocommit: req lifecycle bug
+ // nocommit: cmd returned is linked to req that's already been closed
+ // nocommit: see jira comments for suggested fix
+ AddUpdateCommand cmd;
+ try (SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), params())) {
+ cmd = new AddUpdateCommand(req);
+ }
+ cmd.solrDoc = sdoc;
+ assertTrue(cmd.solrDoc.containsKey(DistributedUpdateProcessor.VERSION_FIELD));
+ cmd.setVersion(Long.parseLong(cmd.solrDoc.getFieldValue(DistributedUpdateProcessor.VERSION_FIELD).toString()));
+ if (prevVersion != null) {
+ cmd.prevVersion = prevVersion;
+ }
+ return cmd;
+ }
+
+}
[4/4] lucene-solr:jira/solr-5944: SOLR-5944 Initial import into branch
Posted by is...@apache.org.
SOLR-5944 Initial import into branch
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/8ae359a6
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/8ae359a6
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/8ae359a6
Branch: refs/heads/jira/solr-5944
Commit: 8ae359a63a417f387d2b53ba9b64c8935693e69f
Parents: c164f7e
Author: Ishan Chattopadhyaya <is...@apache.org>
Authored: Wed Dec 7 00:30:48 2016 +0530
Committer: Ishan Chattopadhyaya <is...@apache.org>
Committed: Wed Dec 7 00:30:48 2016 +0530
----------------------------------------------------------------------
.../org/apache/lucene/index/FieldInfos.java | 5 +
.../org/apache/lucene/index/IndexWriter.java | 11 +-
.../client/solrj/embedded/JettySolrRunner.java | 58 +
.../src/java/org/apache/solr/core/SolrCore.java | 4 +-
.../handler/component/RealTimeGetComponent.java | 321 +++++-
.../apache/solr/update/AddUpdateCommand.java | 36 +-
.../solr/update/DirectUpdateHandler2.java | 58 +-
.../org/apache/solr/update/DocumentBuilder.java | 150 ++-
.../java/org/apache/solr/update/PeerSync.java | 10 +
.../apache/solr/update/SolrCmdDistributor.java | 4 +
.../org/apache/solr/update/TransactionLog.java | 43 +-
.../java/org/apache/solr/update/UpdateLog.java | 252 ++++-
.../org/apache/solr/update/VersionInfo.java | 7 +
.../processor/AtomicUpdateDocumentMerger.java | 227 +++-
.../processor/DistributedUpdateProcessor.java | 324 +++++-
...BasedVersionConstraintsProcessorFactory.java | 7 +-
.../collection1/conf/schema-inplace-updates.xml | 61 ++
.../test-files/solr/collection1/conf/schema.xml | 12 +-
.../solr/collection1/conf/schema15.xml | 7 +-
.../solr/cloud/TestStressInPlaceUpdates.java | 495 +++++++++
.../org/apache/solr/search/TestRecovery.java | 17 +-
.../apache/solr/update/HardAutoCommitTest.java | 4 +
.../org/apache/solr/update/PeerSyncTest.java | 153 ++-
.../solr/update/TestInPlaceUpdatesDistrib.java | 998 +++++++++++++++++
.../update/TestInPlaceUpdatesStandalone.java | 1026 ++++++++++++++++++
.../org/apache/solr/update/UpdateLogTest.java | 233 ++++
26 files changed, 4366 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/lucene/core/src/java/org/apache/lucene/index/FieldInfos.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/FieldInfos.java b/lucene/core/src/java/org/apache/lucene/index/FieldInfos.java
index c80fb85..4e9d4fc 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FieldInfos.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FieldInfos.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -324,6 +325,10 @@ public class FieldInfos implements Iterable<FieldInfo> {
}
}
+ synchronized Set<String> getFieldNames() {
+ return nameToNumber.keySet();
+ }
+
synchronized void clear() {
numberToName.clear();
nameToNumber.clear();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 9868785..331da1f 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -1617,7 +1617,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
public long updateNumericDocValue(Term term, String field, long value) throws IOException {
ensureOpen();
if (!globalFieldNumberMap.contains(field, DocValuesType.NUMERIC)) {
- throw new IllegalArgumentException("can only update existing numeric-docvalues fields!");
+ throw new IllegalArgumentException("can only update existing numeric-docvalues fields! Attempted"
+ + " to update field: " + field + "=" + value);
}
try {
long seqNo = docWriter.updateDocValues(new NumericDocValuesUpdate(term, field, value));
@@ -1773,6 +1774,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
return flushDeletesCount.get();
}
+ /**
+ * Return a set of all field names as seen by this IndexWriter, across all segments
+ * of the index.
+ */
+ public Set<String> getFieldNames() {
+ return globalFieldNumberMap.getFieldNames();
+ }
+
final String newSegmentName() {
// Cannot synchronize on IndexWriter because that causes
// deadlock
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index f4887e6..155f52e 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -30,12 +30,15 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.client.solrj.SolrClient;
@@ -96,13 +99,35 @@ public class JettySolrRunner {
private int proxyPort = -1;
public static class DebugFilter implements Filter {
+ public final static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private AtomicLong nRequests = new AtomicLong();
+
+ List<Delay> delays = new ArrayList<>();
public long getTotalRequests() {
return nRequests.get();
}
+
+ /**
+ * Introduce a delay of specified milliseconds for the specified request.
+ *
+ * @param reason Info message logged when delay occurs
+ * @param count The count-th request will experience a delay
+ * @param delay There will be a delay of this many milliseconds
+ */
+ public void addDelay(String reason, int count, int delay) {
+ delays.add(new Delay(reason, count, delay));
+ }
+
+ /**
+ * Remove any delay introduced before.
+ */
+ public void unsetDelay() {
+ delays.clear();
+ }
+
@Override
public void init(FilterConfig filterConfig) throws ServletException { }
@@ -110,11 +135,32 @@ public class JettySolrRunner {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
nRequests.incrementAndGet();
+ executeDelay();
filterChain.doFilter(servletRequest, servletResponse);
}
@Override
public void destroy() { }
+
+ private void executeDelay() {
+ int delayMs = 0;
+ for (Delay delay: delays) {
+ log.info("Delaying "+delay.delayValue+", for reason: "+delay.reason);
+ if (delay.counter.decrementAndGet() == 0) {
+ delayMs += delay.delayValue;
+ }
+ }
+
+ if (delayMs > 0) {
+ log.info("Pausing this socket connection for " + delayMs + "ms...");
+ try {
+ Thread.sleep(delayMs);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ log.info("Waking up after the delay of " + delayMs + "ms...");
+ }
+ }
}
@@ -516,4 +562,16 @@ public class JettySolrRunner {
}
}
}
+
+ static class Delay {
+ final AtomicInteger counter;
+ final int delayValue;
+ final String reason;
+
+ public Delay(String reason, int counter, int delay) {
+ this.reason = reason;
+ this.counter = new AtomicInteger(counter);
+ this.delayValue = delay;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/java/org/apache/solr/core/SolrCore.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index a459bf2..73e7987 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -1605,7 +1605,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
public IndexFingerprint getIndexFingerprint(SolrIndexSearcher searcher, LeafReaderContext ctx, long maxVersion)
throws IOException {
IndexFingerprint f = null;
- f = perSegmentFingerprintCache.get(ctx.reader().getCoreCacheKey());
+ f = perSegmentFingerprintCache.get(ctx.reader().getCombinedCoreAndDeletesKey());
// fingerprint is either not cached or
// if we want fingerprint only up to a version less than maxVersionEncountered in the segment, or
// documents were deleted from segment for which fingerprint was cached
@@ -1616,7 +1616,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
// cache fingerprint for the segment only if all the versions in the segment are included in the fingerprint
if (f.getMaxVersionEncountered() == f.getMaxInHash()) {
log.info("Caching fingerprint for searcher:{} leafReaderContext:{} mavVersion:{}", searcher, ctx, maxVersion);
- perSegmentFingerprintCache.put(ctx.reader().getCoreCacheKey(), f);
+ perSegmentFingerprintCache.put(ctx.reader().getCombinedCoreAndDeletesKey(), f);
}
} else {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index 39e5b8a..11235e1 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReaderContext;
@@ -43,8 +44,10 @@ import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentBase;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.StringUtils;
import org.apache.solr.common.cloud.ClusterState;
@@ -75,11 +78,11 @@ import org.apache.solr.update.DocumentBuilder;
import org.apache.solr.update.IndexFingerprint;
import org.apache.solr.update.PeerSync;
import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class RealTimeGetComponent extends SearchComponent
{
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -148,6 +151,12 @@ public class RealTimeGetComponent extends SearchComponent
processGetUpdates(rb);
return;
}
+
+ val = params.get("getInputDocument");
+ if (val != null) {
+ processGetInputDocument(rb);
+ return;
+ }
final IdsRequsted reqIds = IdsRequsted.parseParams(req);
@@ -176,14 +185,14 @@ public class RealTimeGetComponent extends SearchComponent
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
}
- SolrCore core = req.getCore();
+ final SolrCore core = req.getCore();
SchemaField idField = core.getLatestSchema().getUniqueKeyField();
FieldType fieldType = idField.getType();
SolrDocumentList docList = new SolrDocumentList();
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- RefCounted<SolrIndexSearcher> searcherHolder = null;
+ SearcherInfo searcherInfo = new SearcherInfo(core);
// this is initialized & set on the context *after* any searcher (re-)opening
ResultContext resultContext = null;
@@ -197,7 +206,7 @@ public class RealTimeGetComponent extends SearchComponent
|| ((null != transformer) && transformer.needsSolrIndexSearcher());
try {
- SolrIndexSearcher searcher = null;
+
BytesRefBuilder idBytes = new BytesRefBuilder();
for (String idStr : reqIds.allIds) {
@@ -208,24 +217,30 @@ public class RealTimeGetComponent extends SearchComponent
// should currently be a List<Oper,Ver,Doc/Id>
List entry = (List)o;
assert entry.size() >= 3;
- int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK;
+ int oper = (Integer)entry.get(UpdateLog.FLAGS_IDX) & UpdateLog.OPERATION_MASK;
switch (oper) {
+ case UpdateLog.UPDATE_INPLACE: // fall through to ADD
case UpdateLog.ADD:
if (mustUseRealtimeSearcher) {
- if (searcherHolder != null) {
- // close handles to current searchers & result context
- searcher = null;
- searcherHolder.decref();
- searcherHolder = null;
- resultContext = null;
- }
+ // close handles to current searchers & result context
+ searcherInfo.clear();
+ resultContext = null;
ulog.openRealtimeSearcher(); // force open a new realtime searcher
o = null; // pretend we never found this record and fall through to use the searcher
break;
}
- SolrDocument doc = toSolrDoc((SolrInputDocument)entry.get(entry.size()-1), core.getLatestSchema());
+ SolrDocument doc = toSolrDoc((SolrInputDocument)entry.get(entry.size()-1), core.getLatestSchema(), oper == UpdateLog.UPDATE_INPLACE);
+ if (oper == UpdateLog.UPDATE_INPLACE) {
+ assert entry.size() == 5;
+ // For in-place update case, we have obtained the partial document till now. We need to
+ // resolve it to a full document to be returned to the user.
+ doc = (SolrDocument) resolveFullDocument(core, idBytes.get(), rsp.getReturnFields(), doc, entry);
+ if (doc == null) {
+ break;
+ }
+ }
if (transformer!=null) {
transformer.transform(doc, -1, 0); // unknown docID
}
@@ -241,23 +256,20 @@ public class RealTimeGetComponent extends SearchComponent
}
// didn't find it in the update log, so it should be in the newest searcher opened
- if (searcher == null) {
- searcherHolder = core.getRealtimeSearcher();
- searcher = searcherHolder.get();
- // don't bother with ResultContext yet, we won't need it if doc doesn't match filters
- }
+ searcherInfo.init();
+ // don't bother with ResultContext yet, we won't need it if doc doesn't match filters
int docid = -1;
- long segAndId = searcher.lookupId(idBytes.get());
+ long segAndId = searcherInfo.getSearcher().lookupId(idBytes.get());
if (segAndId >= 0) {
int segid = (int) segAndId;
- LeafReaderContext ctx = searcher.getTopReaderContext().leaves().get((int) (segAndId >> 32));
+ LeafReaderContext ctx = searcherInfo.getSearcher().getTopReaderContext().leaves().get((int) (segAndId >> 32));
docid = segid + ctx.docBase;
if (rb.getFilters() != null) {
for (Query raw : rb.getFilters()) {
- Query q = raw.rewrite(searcher.getIndexReader());
- Scorer scorer = searcher.createWeight(q, false, 1f).scorer(ctx);
+ Query q = raw.rewrite(searcherInfo.getSearcher().getIndexReader());
+ Scorer scorer = searcherInfo.getSearcher().createWeight(q, false, 1f).scorer(ctx);
if (scorer == null || segid != scorer.iterator().advance(segid)) {
// filter doesn't match.
docid = -1;
@@ -269,13 +281,13 @@ public class RealTimeGetComponent extends SearchComponent
if (docid < 0) continue;
- Document luceneDocument = searcher.doc(docid, rsp.getReturnFields().getLuceneFieldNames());
+ Document luceneDocument = searcherInfo.getSearcher().doc(docid, rsp.getReturnFields().getLuceneFieldNames());
SolrDocument doc = toSolrDoc(luceneDocument, core.getLatestSchema());
- searcher.decorateDocValueFields(doc, docid, searcher.getNonStoredDVs(true));
+ searcherInfo.getSearcher().decorateDocValueFields(doc, docid, searcherInfo.getSearcher().getNonStoredDVs(true));
if ( null != transformer) {
if (null == resultContext) {
// either first pass, or we've re-opened searcher - either way now we setContext
- resultContext = new RTGResultContext(rsp.getReturnFields(), searcher, req);
+ resultContext = new RTGResultContext(rsp.getReturnFields(), searcherInfo.getSearcher(), req);
transformer.setContext(resultContext);
}
transformer.transform(doc, docid, 0);
@@ -284,22 +296,200 @@ public class RealTimeGetComponent extends SearchComponent
}
} finally {
- if (searcherHolder != null) {
- searcherHolder.decref();
- }
+ searcherInfo.clear();
}
addDocListToResponse(rb, docList);
}
+
+ /**
+ * Return the requested SolrInputDocument from the tlog/index. This will
+ * always be a full document, i.e. any partial in-place document will be resolved.
+ */
+ void processGetInputDocument(ResponseBuilder rb) throws IOException {
+ SolrQueryRequest req = rb.req;
+ SolrQueryResponse rsp = rb.rsp;
+ SolrParams params = req.getParams();
+ if (!params.getBool(COMPONENT_NAME, true)) {
+ return;
+ }
+
+ String idStr = params.get("getInputDocument", null);
+ if (idStr == null) return;
+ SolrInputDocument doc = getInputDocument(req.getCore(), new BytesRef(idStr));
+ log.info("getInputDocument called for id="+idStr+", returning: "+doc);
+ rb.rsp.add("inputDocument", doc);
+ }
+
+ /**
+ * A SearcherInfo provides mechanism for obtaining RT searcher, from
+ * a SolrCore, and closing it, while taking care of the RefCounted references.
+ */
+ private static class SearcherInfo {
+ private RefCounted<SolrIndexSearcher> searcherHolder = null;
+ private SolrIndexSearcher searcher = null;
+ final SolrCore core;
+
+ public SearcherInfo(SolrCore core) {
+ this.core = core;
+ }
+
+ void clear(){
+ if (searcherHolder != null) {
+ // close handles to current searchers
+ searcher = null;
+ searcherHolder.decref();
+ searcherHolder = null;
+ }
+ }
+
+ void init(){
+ if (searcher == null) {
+ searcherHolder = core.getRealtimeSearcher();
+ searcher = searcherHolder.get();
+ }
+ }
+
+ public SolrIndexSearcher getSearcher() {
+ assert null != searcher : "init not called!";
+ return searcher;
+ }
+ }
+
+ /***
+ * Given a partial document obtained from the transaction log (e.g. as a result of RTG), resolve to a full document
+ * by populating all the partial updates that were applied on top of that last full document update.
+ *
+ * @return Returns the merged document, i.e. the resolved full document, or null if the document was not found (deleted
+ * after the resolving began)
+ */
+ private static SolrDocumentBase resolveFullDocument(SolrCore core, BytesRef idBytes,
+ ReturnFields returnFields, SolrDocumentBase partialDoc, List logEntry) throws IOException {
+ if (idBytes == null || logEntry.size() != 5) {
+ throw new SolrException(ErrorCode.INVALID_STATE, "Either Id field not present in partial document or log entry doesn't have previous version.");
+ }
+ long prevPointer = (long) logEntry.get(UpdateLog.PREV_POINTER_IDX);
+ long prevVersion = (long) logEntry.get(UpdateLog.PREV_VERSION_IDX);
+
+ // get the last full document from ulog
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+ long lastPrevPointer = ulog.applyPartialUpdates(idBytes, prevPointer, prevVersion, partialDoc);
+
+ if (lastPrevPointer == -1) { // full document was not found in tlog, but exists in index
+ SolrDocument mergedDoc = mergePartialDocWithFullDocFromIndex(core, idBytes, returnFields, partialDoc);
+ return mergedDoc;
+ } else if (lastPrevPointer > 0) {
+ // We were supposed to have found the last full doc also in the tlogs, but the prevPointer links led to nowhere
+ // We should reopen a new RT searcher and get the doc. This should be a rare occurrence
+ Term idTerm = new Term(core.getLatestSchema().getUniqueKeyField().getName(), idBytes);
+ SolrDocument mergedDoc = reopenRealtimeSearcherAndGet(core, idTerm, returnFields);
+ if (mergedDoc == null) {
+ return null; // the document may have been deleted as the resolving was going on.
+ }
+ return mergedDoc;
+ } else { // i.e. lastPrevPointer==0
+ assert lastPrevPointer == 0;
+ // We have successfully resolved the document based off the tlogs
+ return partialDoc;
+ }
+ }
+
+ /**
+ * Re-open the RT searcher and get the document, referred to by the idTerm, from that searcher.
+ * @return Returns the document or null if not found.
+ */
+ private static SolrDocument reopenRealtimeSearcherAndGet(SolrCore core, Term idTerm, ReturnFields returnFields) throws IOException {
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+ ulog.openRealtimeSearcher();
+ RefCounted<SolrIndexSearcher> searcherHolder = core.getRealtimeSearcher();
+ try {
+ SolrIndexSearcher searcher = searcherHolder.get();
+
+ int docid = searcher.getFirstMatch(idTerm);
+ if (docid < 0) {
+ return null;
+ }
+ Document luceneDocument = searcher.doc(docid, returnFields.getLuceneFieldNames());
+ SolrDocument doc = toSolrDoc(luceneDocument, core.getLatestSchema());
+ searcher.decorateDocValueFields(doc, docid, searcher.getNonStoredDVs(false));
+
+ return doc;
+ } finally {
+ searcherHolder.decref();
+ }
+ }
+
+ /**
+ * Gets a document from the index by id. If a non-null partial document (for in-place update) is passed in,
+ * this method obtains the document from the tlog/index by the given id, merges the partial document on top of it and then returns
+ * the resultant document.
+ *
+ * @param core A SolrCore instance, useful for obtaining a realtimesearcher and the schema
+ * @param idBytes Binary representation of the value of the unique key field
+ * @param returnFields Return fields, as requested
+ * @param partialDoc A partial document (containing an in-place update) used for merging against a full document
+ * from index; this maybe be null.
+ * @return If partial document is null, this returns document from the index or null if not found.
+ * If partial document is not null, this returns a document from index merged with the partial document, or null if
+ * document doesn't exist in the index.
+ */
+ private static SolrDocument mergePartialDocWithFullDocFromIndex(SolrCore core, BytesRef idBytes, ReturnFields returnFields,
+ SolrDocumentBase partialDoc) throws IOException {
+ RefCounted<SolrIndexSearcher> searcherHolder = core.getRealtimeSearcher(); //Searcher();
+ try {
+ // now fetch last document from index, and merge partialDoc on top of it
+ SolrIndexSearcher searcher = searcherHolder.get();
+ SchemaField idField = core.getLatestSchema().getUniqueKeyField();
+ Term idTerm = new Term(idField.getName(), idBytes);
+
+ int docid = searcher.getFirstMatch(idTerm);
+ if (docid < 0) {
+ // The document was not found in index! Reopen a new RT searcher (to be sure) and get again.
+ // This should be because the document was deleted recently.
+ SolrDocument doc = reopenRealtimeSearcherAndGet(core, idTerm, returnFields);
+ if (doc == null) {
+ // Unable to resolve the last full doc in tlog fully,
+ // and document not found in index even after opening new rt searcher.
+ // This must be a case of deleted doc
+ return null;
+ }
+ return doc;
+ }
+
+ Document luceneDocument = searcher.doc(docid, returnFields.getLuceneFieldNames());
+ SolrDocument doc = toSolrDoc(luceneDocument, core.getLatestSchema());
+ searcher.decorateDocValueFields(doc, docid, searcher.getNonStoredDVs(false));
+
+ long docVersion = (long) doc.getFirstValue(DistributedUpdateProcessor.VERSION_FIELD);
+ Object partialVersionObj = partialDoc.getFieldValue(DistributedUpdateProcessor.VERSION_FIELD);
+ long partialDocVersion = partialVersionObj instanceof Field? ((Field) partialVersionObj).numericValue().longValue():
+ partialVersionObj instanceof Number? ((Number) partialVersionObj).longValue(): Long.parseLong(partialVersionObj.toString());
+ if (docVersion > partialDocVersion) {
+ return doc;
+ }
+ for (String fieldName: (Iterable<String>) partialDoc.getFieldNames()) {
+ doc.setField(fieldName.toString(), partialDoc.get(fieldName)); // since partial doc will only contain single valued fields, this is fine
+ }
+
+ return doc;
+ } finally {
+ if (searcherHolder != null) {
+ searcherHolder.decref();
+ }
+ }
+ }
public static SolrInputDocument DELETED = new SolrInputDocument();
/** returns the SolrInputDocument from the current tlog, or DELETED if it has been deleted, or
* null if there is no record of it in the current update log. If null is returned, it could
* still be in the latest index.
+ * @param resolveFullDocument In case the document is fetched from the tlog, it could only be a partial document if the last update
+ * was an in-place update. In that case, should this partial document be resolved to a full document (by following
+ * back prevPointer/prevVersion)?
*/
- public static SolrInputDocument getInputDocumentFromTlog(SolrCore core, BytesRef idBytes) {
+ public static SolrInputDocument getInputDocumentFromTlog(SolrCore core, BytesRef idBytes, boolean resolveFullDocument) {
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
@@ -311,8 +501,27 @@ public class RealTimeGetComponent extends SearchComponent
assert entry.size() >= 3;
int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK;
switch (oper) {
+ case UpdateLog.UPDATE_INPLACE:
+ assert entry.size() == 5;
+
+ if (resolveFullDocument) {
+ SolrInputDocument doc = (SolrInputDocument)entry.get(entry.size()-1);
+ try {
+ // For in-place update case, we have obtained the partial document till now. We need to
+ // resolve it to a full document to be returned to the user.
+ doc = (SolrInputDocument) resolveFullDocument(core, idBytes, new SolrReturnFields(), doc, entry);
+ if (doc == null) {
+ return DELETED;
+ }
+ return doc;
+ } catch (IOException ex) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Error while resolving full document. ", ex);
+ }
+ } else {
+ // fall through to ADD, so as to get only the partial document
+ }
case UpdateLog.ADD:
- return (SolrInputDocument)entry.get(entry.size()-1);
+ return (SolrInputDocument) entry.get(entry.size()-1);
case UpdateLog.DELETE:
return DELETED;
default:
@@ -324,12 +533,39 @@ public class RealTimeGetComponent extends SearchComponent
return null;
}
+ /**
+ * Obtains the latest document for a given id from the tlog or index (if not found in the tlog).
+ *
+ * NOTE: This method uses the effective value for avoidRetrievingStoredFields param as false and
+ * for nonStoredDVs as null in the call to @see {@link RealTimeGetComponent#getInputDocument(SolrCore, BytesRef, boolean, Set, boolean)},
+ * so as to retrieve all stored and non-stored DV fields from all documents. Also, it uses the effective value of
+ * resolveFullDocument param as true, i.e. it resolves any partial documents (in-place updates), in case the
+ * document is fetched from the tlog, to a full document.
+ */
public static SolrInputDocument getInputDocument(SolrCore core, BytesRef idBytes) throws IOException {
+ return getInputDocument (core, idBytes, false, null, true);
+ }
+
+ /**
+ * Obtains the latest document for a given id from the tlog or through the realtime searcher (if not found in the tlog).
+ * @param avoidRetrievingStoredFields Setting this to true avoids fetching stored fields through the realtime searcher,
+ * however has no effect on documents obtained from the tlog.
+ * Non-stored docValues fields are populated anyway, and are not affected by this parameter. Note that if
+ * the id field is a stored field, it will not be populated if this parameter is true and the document is
+ * obtained from the index.
+ * @param onlyTheseNonStoredDVs If not-null, populate only these DV fields in the document fetched through the realtime searcher.
+ * If this is null, decorate all non-stored DVs (that are not targets of copy fields) from the searcher.
+ * @param resolveFullDocument In case the document is fetched from the tlog, it could only be a partial document if the last update
+ * was an in-place update. In that case, should this partial document be resolved to a full document (by following
+ * back prevPointer/prevVersion)?
+ */
+ public static SolrInputDocument getInputDocument(SolrCore core, BytesRef idBytes, boolean avoidRetrievingStoredFields,
+ Set<String> onlyTheseNonStoredDVs, boolean resolveFullDocument) throws IOException {
SolrInputDocument sid = null;
RefCounted<SolrIndexSearcher> searcherHolder = null;
try {
SolrIndexSearcher searcher = null;
- sid = getInputDocumentFromTlog(core, idBytes);
+ sid = getInputDocumentFromTlog(core, idBytes, resolveFullDocument);
if (sid == DELETED) {
return null;
}
@@ -346,9 +582,18 @@ public class RealTimeGetComponent extends SearchComponent
int docid = searcher.getFirstMatch(new Term(idField.getName(), idBytes));
if (docid < 0) return null;
- Document luceneDocument = searcher.doc(docid);
- sid = toSolrInputDocument(luceneDocument, core.getLatestSchema());
- searcher.decorateDocValueFields(sid, docid, searcher.getNonStoredDVsWithoutCopyTargets());
+
+ if (avoidRetrievingStoredFields) {
+ sid = new SolrInputDocument();
+ } else {
+ Document luceneDocument = searcher.doc(docid);
+ sid = toSolrInputDocument(luceneDocument, core.getLatestSchema());
+ }
+ if (onlyTheseNonStoredDVs != null) {
+ searcher.decorateDocValueFields(sid, docid, onlyTheseNonStoredDVs);
+ } else {
+ searcher.decorateDocValueFields(sid, docid, searcher.getNonStoredDVsWithoutCopyTargets());
+ }
}
} finally {
if (searcherHolder != null) {
@@ -409,9 +654,13 @@ public class RealTimeGetComponent extends SearchComponent
return out;
}
- private static SolrDocument toSolrDoc(SolrInputDocument sdoc, IndexSchema schema) {
+ /**
+ * Converts a SolrInputDocument to SolrDocument, using an IndexSchema instance.
+ * @lucene.experimental
+ */
+ public static SolrDocument toSolrDoc(SolrInputDocument sdoc, IndexSchema schema, boolean isThisAPartialDoc) {
// TODO: do something more performant than this double conversion
- Document doc = DocumentBuilder.toDocument(sdoc, schema);
+ Document doc = DocumentBuilder.toDocument(sdoc, schema, isThisAPartialDoc);
// copy the stored fields only
Document out = new Document();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
index 377cb6b..db1d79b 100644
--- a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
+++ b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
@@ -39,10 +39,20 @@ public class AddUpdateCommand extends UpdateCommand implements Iterable<Document
// it will be obtained from the doc.
private BytesRef indexedId;
- // Higher level SolrInputDocument, normally used to construct the Lucene Document
- // to index.
+ /**
+ * Higher level SolrInputDocument, normally used to construct the Lucene Document
+ * to index.
+ */
public SolrInputDocument solrDoc;
+ /**
+ * This is the version of a document, previously indexed, on which the current
+ * update depends on. This version could be that of a previous in-place update
+ * or a full update. A negative value here, e.g. -1, indicates that this add
+ * update does not depend on a previous update.
+ */
+ public long prevVersion = -1;
+
public boolean overwrite = true;
public Term updateTerm;
@@ -76,10 +86,19 @@ public class AddUpdateCommand extends UpdateCommand implements Iterable<Document
}
/** Creates and returns a lucene Document to index. Any changes made to the returned Document
- * will not be reflected in the SolrInputDocument, or future calls to this method.
+ * will not be reflected in the SolrInputDocument, or future calls to this method. This defaults
+ * to false for the inPlaceUpdate parameter of {@link #getLuceneDocument(boolean)}.
*/
public Document getLuceneDocument() {
- return DocumentBuilder.toDocument(getSolrInputDocument(), req.getSchema());
+ return getLuceneDocument(false);
+ }
+
+ /** Creates and returns a lucene Document to index. Any changes made to the returned Document
+ * will not be reflected in the SolrInputDocument, or future calls to this method.
+ * @param inPlaceUpdate Whether this document will be used for in-place updates.
+ */
+ public Document getLuceneDocument(boolean inPlaceUpdate) {
+ return DocumentBuilder.toDocument(getSolrInputDocument(), req.getSchema(), inPlaceUpdate);
}
/** Returns the indexed ID for this document. The returned BytesRef is retained across multiple calls, and should not be modified. */
@@ -212,7 +231,6 @@ public class AddUpdateCommand extends UpdateCommand implements Iterable<Document
unwrappedDocs.add(currentDoc);
}
-
@Override
public String toString() {
StringBuilder sb = new StringBuilder(super.toString());
@@ -223,5 +241,11 @@ public class AddUpdateCommand extends UpdateCommand implements Iterable<Document
return sb.toString();
}
-
+ /**
+ * Is this add update an in-place update? An in-place update is one where only docValues are
+ * updated, and a new docment is not indexed.
+ */
+ public boolean isInPlaceUpdate() {
+ return (prevVersion >= 0);
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index 73731ad..a28fe29 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -27,9 +27,11 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.LongAdder;
import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
import org.apache.lucene.index.CodecReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SlowCodecReaderWrapper;
import org.apache.lucene.index.Term;
@@ -274,9 +276,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
if (cmd.isBlock()) {
writer.updateDocuments(updateTerm, cmd);
} else {
- Document luceneDocument = cmd.getLuceneDocument();
- // SolrCore.verbose("updateDocument",updateTerm,luceneDocument,writer);
- writer.updateDocument(updateTerm, luceneDocument);
+ updateDocOrDocValues(cmd, writer, updateTerm);
}
// SolrCore.verbose("updateDocument",updateTerm,"DONE");
@@ -331,7 +331,8 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
// see comment in deleteByQuery
synchronized (solrCoreState.getUpdateLock()) {
- writer.updateDocument(idTerm, luceneDocument);
+ updateDocOrDocValues(cmd, writer, idTerm);
+
for (Query q : dbqList) {
writer.deleteDocuments(new DeleteByQueryWrapper(q, core.getLatestSchema()));
}
@@ -450,6 +451,19 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
// as we use around ulog.preCommit... also see comments in ulog.postSoftCommit)
//
synchronized (solrCoreState.getUpdateLock()) {
+
+ // nocommit: this line is very innocuous and easy to over look
+ // nocommit: if the purpose of this line is to work around LUCENE-7344 there should be a comment
+ // nocommit: ...otherwise someone might not realize why it's here and try to remove it
+ // nocommit: ...likewise if/when LUCENE-7344 is fixed no one will realize it can be removed
+ //
+ //
+ // nocommit: alternatively: if that's not the reason for this line, then what is?
+ //
+ // nocommit: LUCENE-7344
+ if (ulog != null) ulog.openRealtimeSearcher();
+ // nocommit: END
+
if (delAll) {
deleteAll();
} else {
@@ -830,6 +844,42 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
splitter.split();
}
+ /**
+ * Calls either {@link IndexWriter#updateDocValues} or {@link IndexWriter#updateDocument} as
+ * needed based on {@link AddUpdateCommand#isInPlaceUpdate}.
+ * <p>
+ * If the this is an UPDATE_INPLACE cmd, then all fields inclued in
+ * {@link AddUpdateCommand#getLuceneDocument} must either be the uniqueKey field, or be DocValue
+ * only fields.
+ * </p>
+ *
+ * @param cmd - cmd apply to IndexWriter
+ * @param writer - IndexWriter to use
+ * @param updateTerm - used if this cmd results in calling {@link IndexWriter#updateDocument}
+ */
+ private void updateDocOrDocValues(AddUpdateCommand cmd, IndexWriter writer, Term updateTerm) throws IOException {
+ assert null != cmd;
+ final SchemaField uniqueKeyField = cmd.req.getSchema().getUniqueKeyField();
+ final String uniqueKeyFieldName = null == uniqueKeyField ? null : uniqueKeyField.getName();
+
+ if (cmd.isInPlaceUpdate()) {
+ Document luceneDocument = cmd.getLuceneDocument(true);
+
+ final List<IndexableField> origDocFields = luceneDocument.getFields();
+ final List<Field> fieldsToUpdate = new ArrayList<>(origDocFields.size());
+ for (IndexableField field : origDocFields) {
+ if (! field.name().equals(uniqueKeyFieldName) ) {
+ fieldsToUpdate.add((Field)field);
+ }
+ }
+ writer.updateDocValues(updateTerm, fieldsToUpdate.toArray(new Field[fieldsToUpdate.size()]));
+ } else {
+ Document luceneDocument = cmd.getLuceneDocument(false);
+ writer.updateDocument(updateTerm, luceneDocument);
+ }
+ }
+
+
/////////////////////////////////////////////////////////////////////
// SolrInfoMBean stuff: Statistics and Module Info
/////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java b/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java
index 633a6dc..d4468f9 100644
--- a/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java
+++ b/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java
@@ -21,6 +21,7 @@ import java.util.Set;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
+import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.IndexableField;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
@@ -37,15 +38,46 @@ import com.google.common.collect.Sets;
*/
public class DocumentBuilder {
- private static void addField(Document doc, SchemaField field, Object val, float boost) {
+ /**
+ * Add a field value to a given document.
+ * @param doc Document that the field needs to be added to
+ * @param field The schema field object for the field
+ * @param val The value for the field to be added
+ * @param boost Boost value for the field
+ * @param forInPlaceUpdate Whether the field is to be added for in-place update. If true,
+ * only numeric docValues based fields are added to the document. This can be true
+ * when constructing a Lucene document for writing an in-place update, and we don't need
+ * presence of non-updateable fields (non NDV) in such a document.
+ */
+ private static void addField(Document doc, SchemaField field, Object val, float boost,
+ boolean forInPlaceUpdate) {
if (val instanceof IndexableField) {
+ if (forInPlaceUpdate) {
+ assert val instanceof NumericDocValuesField: "Expected in-place update to be done on"
+ + " NDV fields only.";
+ }
// set boost to the calculated compound boost
((Field)val).setBoost(boost);
doc.add((Field)val);
return;
}
for (IndexableField f : field.getType().createFields(field, val, boost)) {
- if (f != null) doc.add((Field) f); // null fields are not added
+ if (f != null) { // null fields are not added
+ // HACK: workaround for SOLR-9809
+ // even though at this point in the code we know the field is single valued and DV only
+ // TrieField.createFields() may still return (usless) IndexableField instances that are not
+ // NumericDocValuesField instances.
+ //
+ // once SOLR-9809 is resolved, we should be able to replace this conditional with...
+ // assert f instanceof NumericDocValuesField
+ if (forInPlaceUpdate) {
+ if (f instanceof NumericDocValuesField) {
+ doc.add((Field) f);
+ }
+ } else {
+ doc.add((Field) f);
+ }
+ }
}
}
@@ -60,6 +92,14 @@ public class DocumentBuilder {
}
/**
+ * @see DocumentBuilder#toDocument(SolrInputDocument, IndexSchema, boolean)
+ */
+ public static Document toDocument( SolrInputDocument doc, IndexSchema schema )
+ {
+ return toDocument(doc, schema, false);
+ }
+
+ /**
* Convert a SolrInputDocument to a lucene Document.
*
* This function should go elsewhere. This builds the Document without an
@@ -72,9 +112,18 @@ public class DocumentBuilder {
* moved to an independent function
*
* @since solr 1.3
+ *
+ * @param doc SolrInputDocument from which the document has to be built
+ * @param schema Schema instance
+ * @param forInPlaceUpdate Whether the output document would be used for an in-place update or not.
+ * @return Built Lucene document
+
*/
- public static Document toDocument( SolrInputDocument doc, IndexSchema schema )
- {
+ public static Document toDocument( SolrInputDocument doc, IndexSchema schema, boolean forInPlaceUpdate )
+ {
+ final SchemaField uniqueKeyField = schema.getUniqueKeyField();
+ final String uniqueKeyFieldName = null == uniqueKeyField ? null : uniqueKeyField.getName();
+
Document out = new Document();
final float docBoost = doc.getDocumentBoost();
Set<String> usedFields = Sets.newHashSet();
@@ -84,7 +133,6 @@ public class DocumentBuilder {
String name = field.getName();
SchemaField sfield = schema.getFieldOrNull(name);
boolean used = false;
-
// Make sure it has the correct number
if( sfield!=null && !sfield.multiValued() && field.getValueCount() > 1 ) {
@@ -119,45 +167,51 @@ public class DocumentBuilder {
hasField = true;
if (sfield != null) {
used = true;
- addField(out, sfield, v, applyBoost ? compoundBoost : 1f);
+ addField(out, sfield, v, applyBoost ? compoundBoost : 1f,
+ name.equals(uniqueKeyFieldName) ? false : forInPlaceUpdate);
// record the field as having a value
usedFields.add(sfield.getName());
}
// Check if we should copy this field value to any other fields.
// This could happen whether it is explicit or not.
- if( copyFields != null ){
- for (CopyField cf : copyFields) {
- SchemaField destinationField = cf.getDestination();
-
- final boolean destHasValues = usedFields.contains(destinationField.getName());
-
- // check if the copy field is a multivalued or not
- if (!destinationField.multiValued() && destHasValues) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "ERROR: "+getID(doc, schema)+"multiple values encountered for non multiValued copy field " +
- destinationField.getName() + ": " + v);
- }
-
- used = true;
-
- // Perhaps trim the length of a copy field
- Object val = v;
- if( val instanceof String && cf.getMaxChars() > 0 ) {
- val = cf.getLimitedValue((String)val);
+ if (copyFields != null) {
+ // Do not copy this field if this document is to be used for an in-place update,
+ // and this is the uniqueKey field (because the uniqueKey can't change so no need to "update" the copyField).
+ if ( ! (forInPlaceUpdate && name.equals(uniqueKeyFieldName)) ) {
+ for (CopyField cf : copyFields) {
+ SchemaField destinationField = cf.getDestination();
+
+ final boolean destHasValues = usedFields.contains(destinationField.getName());
+
+ // check if the copy field is a multivalued or not
+ if (!destinationField.multiValued() && destHasValues) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "ERROR: "+getID(doc, schema)+"multiple values encountered for non multiValued copy field " +
+ destinationField.getName() + ": " + v);
+ }
+
+ used = true;
+
+ // Perhaps trim the length of a copy field
+ Object val = v;
+ if( val instanceof String && cf.getMaxChars() > 0 ) {
+ val = cf.getLimitedValue((String)val);
+ }
+
+ // we can't copy any boost unless the dest field is
+ // indexed & !omitNorms, but which boost we copy depends
+ // on whether the dest field already contains values (we
+ // don't want to apply the compounded docBoost more then once)
+ final float destBoost =
+ (destinationField.indexed() && !destinationField.omitNorms()) ?
+ (destHasValues ? fieldBoost : compoundBoost) : 1.0F;
+
+ addField(out, destinationField, val, destBoost,
+ destinationField.getName().equals(uniqueKeyFieldName) ? false : forInPlaceUpdate);
+ // record the field as having a value
+ usedFields.add(destinationField.getName());
}
-
- // we can't copy any boost unless the dest field is
- // indexed & !omitNorms, but which boost we copy depends
- // on whether the dest field already contains values (we
- // don't want to apply the compounded docBoost more then once)
- final float destBoost =
- (destinationField.indexed() && !destinationField.omitNorms()) ?
- (destHasValues ? fieldBoost : compoundBoost) : 1.0F;
-
- addField(out, destinationField, val, destBoost);
- // record the field as having a value
- usedFields.add(destinationField.getName());
}
}
@@ -187,14 +241,20 @@ public class DocumentBuilder {
// Now validate required fields or add default values
// fields with default values are defacto 'required'
- for (SchemaField field : schema.getRequiredFields()) {
- if (out.getField(field.getName() ) == null) {
- if (field.getDefaultValue() != null) {
- addField(out, field, field.getDefaultValue(), 1.0f);
- }
- else {
- String msg = getID(doc, schema) + "missing required field: " + field.getName();
- throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, msg );
+
+ // Note: We don't need to add default fields if this document is to be used for
+ // in-place updates, since this validation and population of default fields would've happened
+ // during the full indexing initially.
+ if (!forInPlaceUpdate) {
+ for (SchemaField field : schema.getRequiredFields()) {
+ if (out.getField(field.getName() ) == null) {
+ if (field.getDefaultValue() != null) {
+ addField(out, field, field.getDefaultValue(), 1.0f, false);
+ }
+ else {
+ String msg = getID(doc, schema) + "missing required field: " + field.getName();
+ throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, msg );
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/java/org/apache/solr/update/PeerSync.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java
index 1f61a56..828085e 100644
--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
@@ -788,6 +788,16 @@ public class PeerSync {
proc.processDelete(cmd);
break;
}
+ case UpdateLog.UPDATE_INPLACE:
+ {
+ AddUpdateCommand cmd = UpdateLog.convertTlogEntryToAddUpdateCommand(req, entry, oper, version);
+ cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+ if (debug) {
+ log.debug(msg() + "inplace update " + cmd + " prevVersion=" + cmd.prevVersion + ", doc=" + cmd.solrDoc);
+ }
+ proc.processAdd(cmd);
+ break;
+ }
default:
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index c161b82..5caf43e 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -31,6 +31,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.Diagnostics;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.DistributedUpdateProcessor.RequestReplicationTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -206,6 +207,9 @@ public class SolrCmdDistributor {
uReq.lastDocInBatch();
uReq.setParams(params);
uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+ if (cmd.isInPlaceUpdate()) {
+ params.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion));
+ }
submit(new Req(cmd, node, uReq, synchronous, rrt, cmd.pollQueueTime), false);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/java/org/apache/solr/update/TransactionLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
index 997485a..5037b45 100644
--- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
@@ -342,7 +342,33 @@ public class TransactionLog implements Closeable {
int lastAddSize;
+ /**
+ * Writes an add update command to the transaction log. This is not applicable for
+ * in-place updates; use {@link #write(AddUpdateCommand, long, int)}.
+ * (The previous pointer (applicable for in-place updates) is set to -1 while writing
+ * the command to the transaction log.)
+ * @param cmd The add update command to be written
+ * @param flags Options for writing the command to the transaction log
+ * @return Returns the position pointer of the written update command
+ *
+ * @see #write(AddUpdateCommand, long, int)
+ */
public long write(AddUpdateCommand cmd, int flags) {
+ return write(cmd, -1, flags);
+ }
+
+ /**
+ * Writes an add update command to the transaction log. This should be called only for
+ * writing in-place updates, or else pass -1 as the prevPointer.
+ * @param cmd The add update command to be written
+ * @param prevPointer The pointer in the transaction log which this update depends
+ * on (applicable for in-place updates)
+ * @param flags Options for writing the command to the transaction log
+ * @return Returns the position pointer of the written update command
+ */
+ public long write(AddUpdateCommand cmd, long prevPointer, int flags) {
+ assert (-1 <= prevPointer && (cmd.isInPlaceUpdate() || (-1 == prevPointer)));
+
LogCodec codec = new LogCodec(resolver);
SolrInputDocument sdoc = cmd.getSolrInputDocument();
@@ -355,10 +381,19 @@ public class TransactionLog implements Closeable {
MemOutputStream out = new MemOutputStream(new byte[bufSize]);
codec.init(out);
- codec.writeTag(JavaBinCodec.ARR, 3);
- codec.writeInt(UpdateLog.ADD | flags); // should just take one byte
- codec.writeLong(cmd.getVersion());
- codec.writeSolrInputDocument(cmd.getSolrInputDocument());
+ if (cmd.isInPlaceUpdate()) {
+ codec.writeTag(JavaBinCodec.ARR, 5);
+ codec.writeInt(UpdateLog.UPDATE_INPLACE | flags); // should just take one byte
+ codec.writeLong(cmd.getVersion());
+ codec.writeLong(prevPointer);
+ codec.writeLong(cmd.prevVersion);
+ codec.writeSolrInputDocument(cmd.getSolrInputDocument());
+ } else {
+ codec.writeTag(JavaBinCodec.ARR, 3);
+ codec.writeInt(UpdateLog.ADD | flags); // should just take one byte
+ codec.writeLong(cmd.getVersion());
+ codec.writeSolrInputDocument(cmd.getSolrInputDocument());
+ }
lastAddSize = (int)out.size();
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 9c0f1cf..d51b8f0 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
import java.io.FilenameFilter;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
@@ -42,6 +43,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileSystem;
import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrDocumentBase;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
@@ -104,6 +106,7 @@ public class UpdateLog implements PluginInfoInitialized {
public static final int DELETE = 0x02;
public static final int DELETE_BY_QUERY = 0x03;
public static final int COMMIT = 0x04;
+ public static final int UPDATE_INPLACE = 0x08;
// Flag indicating that this is a buffered operation, and that a gap exists before buffering started.
// for example, if full index replication starts and we are buffering updates, then this flag should
// be set to indicate that replaying the log would not bring us into sync (i.e. peersync should
@@ -111,6 +114,28 @@ public class UpdateLog implements PluginInfoInitialized {
public static final int FLAG_GAP = 0x10;
public static final int OPERATION_MASK = 0x0f; // mask off flags to get the operation
+ /**
+ * The index of the flags value in an entry from the transaction log.
+ */
+ public static final int FLAGS_IDX = 0;
+
+ /**
+ * The index of the _version_ value in an entry from the transaction log.
+ */
+public static final int VERSION_IDX = 1;
+
+ /**
+ * The index of the previous pointer in an entry from the transaction log.
+ * This is only relevant if flags (indexed at FLAGS_IDX) includes UPDATE_INPLACE.
+ */
+ public static final int PREV_POINTER_IDX = 2;
+
+ /**
+ * The index of the previous version in an entry from the transaction log.
+ * This is only relevant if flags (indexed at FLAGS_IDX) includes UPDATE_INPLACE.
+ */
+ public static final int PREV_VERSION_IDX = 3;
+
public static class RecoveryInfo {
public long positionOfStart;
@@ -189,10 +214,29 @@ public class UpdateLog implements PluginInfoInitialized {
public static class LogPtr {
final long pointer;
final long version;
-
+ final long previousPointer; // used for entries that are in-place updates and need a pointer to a previous update command
+
+ /**
+ * Creates an object that contains the position and version of an update. In this constructor,
+ * the effective value of the previousPointer is -1.
+ *
+ * @param pointer Position in the transaction log of an update
+ * @param version Version of the update at the given position
+ */
public LogPtr(long pointer, long version) {
+ this(pointer, version, -1);
+ }
+
+ /**
+ *
+ * @param pointer Position in the transaction log of an update
+ * @param version Version of the update at the given position
+ * @param previousPointer Position, in the transaction log, of an update on which the current update depends
+ */
+ public LogPtr(long pointer, long version, long previousPointer) {
this.pointer = pointer;
this.version = version;
+ this.previousPointer = previousPointer;
}
@Override
@@ -423,16 +467,18 @@ public class UpdateLog implements PluginInfoInitialized {
synchronized (this) {
long pos = -1;
+ long prevPointer = getPrevPointerForUpdate(cmd);
+
// don't log if we are replaying from another log
if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
ensureLog();
- pos = tlog.write(cmd, operationFlags);
+ pos = tlog.write(cmd, prevPointer, operationFlags);
}
if (!clearCaches) {
// TODO: in the future we could support a real position for a REPLAY update.
// Only currently would be useful for RTG while in recovery mode though.
- LogPtr ptr = new LogPtr(pos, cmd.getVersion());
+ LogPtr ptr = new LogPtr(pos, cmd.getVersion(), prevPointer);
// only update our map if we're not buffering
if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
@@ -453,6 +499,31 @@ public class UpdateLog implements PluginInfoInitialized {
}
}
+ /**
+ * @return If cmd is an in-place update, then returns the pointer (in the tlog) of the previous
+ * update that the given update depends on.
+ * Returns -1 if this is not an in-place update, or if we can't find a previous entry in
+ * the tlog. Upon receiving a -1, it should be clear why it was -1: if the command's
+ * flags|UpdateLog.UPDATE_INPLACE is set, then this command is an in-place update whose
+ * previous update is in the index and not in the tlog; if that flag is not set, it is
+ * not an in-place update at all, and don't bother about the prevPointer value at
+ * all (which is -1 as a dummy value).)
+ */
+ private synchronized long getPrevPointerForUpdate(AddUpdateCommand cmd) {
+ // note: sync required to ensure maps aren't changed out form under us
+ if (cmd.isInPlaceUpdate()) {
+ BytesRef indexedId = cmd.getIndexedId();
+ for (Map<BytesRef, LogPtr> currentMap : Arrays.asList(map, prevMap, prevMap2)) {
+ if (currentMap != null) {
+ LogPtr prevEntry = currentMap.get(indexedId);
+ if (null != prevEntry) {
+ return prevEntry.pointer;
+ }
+ }
+ }
+ }
+ return -1;
+ }
public void delete(DeleteUpdateCommand cmd) {
@@ -702,6 +773,129 @@ public class UpdateLog implements PluginInfoInitialized {
}
}
+ /**
+ * Goes over backwards, following the prevPointer, to merge all partial updates into the passed doc. Stops at either a full
+ * document, or if there are no previous entries to follow in the update log.
+ *
+ * @param id Binary representation of the unique key field
+ * @param prevPointer Pointer to the previous entry in the ulog, based on which the current in-place update was made.
+ * @param prevVersion Version of the previous entry in the ulog, based on which the current in-place update was made.
+ * @param latestPartialDoc Partial document that is to be populated
+ * @return Returns 0 if a full document was found in the log, -1 if no full document was found. If full document was supposed
+ * to be found in the tlogs, but couldn't be found (because the logs were rotated) then the prevPointer is returned.
+ */
+ public long applyPartialUpdates(BytesRef id, long prevPointer, long prevVersion, SolrDocumentBase latestPartialDoc) {
+
+ // nocommit: changes that should probably be made (see jira comments for elaboration) ...
+ // 1) "final List<TransactionLog> lookupLogs" should be created once, outside of any looping
+ // 2) "lookupLogs" and any calls to "getEntryFromTLog" that use that List *MUST* happen in the same sync block.
+ // 3) 1+2 => the sync block must wrap the while loop
+ // 4) 3 => we might as well declare the entire method synchronized.
+ //
+ //
+ // nocommit: regardless of any changes, need additional eyeballs (besides ishan & hoss) on the synchronization in this method/class
+
+
+ SolrInputDocument partialUpdateDoc = null;
+
+ while (prevPointer >= 0) {
+ //go through each partial update and apply it on the incoming doc one after another
+ List entry;
+ synchronized (this) {
+ List<TransactionLog> lookupLogs = Arrays.asList(tlog, prevMapLog, prevMapLog2);
+ entry = getEntryFromTLog(prevPointer, prevVersion, lookupLogs);
+ }
+ if (entry == null) {
+ return prevPointer; // a previous update was supposed to be found, but wasn't found (due to log rotation)
+ }
+ int flags = (int) entry.get(UpdateLog.FLAGS_IDX);
+
+ // since updates can depend only upon ADD updates or other UPDATE_INPLACE updates, we assert that we aren't
+ // getting something else
+ if ((flags & UpdateLog.ADD) != UpdateLog.ADD && (flags & UpdateLog.UPDATE_INPLACE) != UpdateLog.UPDATE_INPLACE) {
+ throw new SolrException(ErrorCode.INVALID_STATE, entry + " should've been either ADD or UPDATE_INPLACE update" +
+ ", while looking for id=" + new String(id.bytes, Charset.forName("UTF-8")));
+ }
+ // if this is an ADD (i.e. full document update), stop here
+ if ((flags & UpdateLog.ADD) == UpdateLog.ADD) {
+ partialUpdateDoc = (SolrInputDocument) entry.get(entry.size() - 1);
+ applyOlderUpdates(latestPartialDoc, partialUpdateDoc);
+ return 0; // Full document was found in the tlog itself
+ }
+ if (entry.size() < 5) {
+ throw new SolrException(ErrorCode.INVALID_STATE, entry + " is not a partial doc" +
+ ", while looking for id=" + new String(id.bytes, Charset.forName("UTF-8")));
+ }
+ // This update is an inplace update, get the partial doc. The input doc is always at last position.
+ partialUpdateDoc = (SolrInputDocument) entry.get(entry.size() - 1);
+ applyOlderUpdates(latestPartialDoc, partialUpdateDoc);
+ prevPointer = (long) entry.get(UpdateLog.PREV_POINTER_IDX);
+ prevVersion = (long) entry.get(UpdateLog.PREV_VERSION_IDX);
+ }
+
+ return -1; // last full document is not supposed to be in tlogs, but it must be in the index
+ }
+
+ /**
+ * Add all fields from olderDoc into newerDoc if not already present in newerDoc
+ */
+ private void applyOlderUpdates(SolrDocumentBase newerDoc, SolrInputDocument olderDoc) {
+ for (String fieldName : olderDoc.getFieldNames()) {
+ // if the newerDoc has this field, then this field from olderDoc can be ignored
+ if (!newerDoc.containsKey(fieldName)) {
+ for (Object val : olderDoc.getFieldValues(fieldName)) {
+ newerDoc.addField(fieldName, val);
+ }
+ }
+ }
+ }
+
+
+ /***
+ * Get the entry that has the given lookupVersion in the given lookupLogs at the lookupPointer position.
+ *
+ * @return The entry if found, otherwise null
+ */
+ private List getEntryFromTLog(long lookupPointer, long lookupVersion, List<TransactionLog> lookupLogs) {
+ // nocommit: faily certain this method should be "synchronized...
+ // nocommit: isn't the only valid usage of that method is when the thread has a lock on "this" ?
+ // nocommit: otherwise some other thread might be decrefing/closing the TransactionLog instances passed to this method
+ //
+ // should have no impact on performance/correctness since the only existing usage of this
+ // method are already in a "synchronized (this)" blocks ... but we should protect against future missuse
+
+ for (TransactionLog lookupLog : lookupLogs) {
+ if (lookupLog != null && lookupLog.getLogSize() > lookupPointer) {
+ lookupLog.incref();
+ try {
+ Object obj = null;
+
+ try {
+ obj = lookupLog.lookup(lookupPointer);
+ } catch (Exception | Error ex) {
+ // This can happen when trying to deserialize the entry at position lookupPointer,
+ // but from a different tlog than the one containing the desired entry.
+ // Just ignore the exception, so as to proceed to the next tlog.
+ log.debug("Exception reading the log (this is expected, don't worry)=" + lookupLog + ", for version=" + lookupVersion +
+ ". This can be ignored.");
+ }
+
+ if (obj != null && obj instanceof List) {
+ List tmpEntry = (List) obj;
+ if (tmpEntry.size() >= 2 &&
+ (tmpEntry.get(UpdateLog.VERSION_IDX) instanceof Long) &&
+ ((Long) tmpEntry.get(UpdateLog.VERSION_IDX)).equals(lookupVersion)) {
+ return tmpEntry;
+ }
+ }
+ } finally {
+ lookupLog.decref();
+ }
+ }
+ }
+ return null;
+ }
+
public Object lookup(BytesRef indexedId) {
LogPtr entry;
TransactionLog lookupLog;
@@ -908,6 +1102,7 @@ public class UpdateLog implements PluginInfoInitialized {
static class Update {
TransactionLog log;
long version;
+ long previousVersion; // for in-place updates
long pointer;
}
@@ -1011,15 +1206,16 @@ public class UpdateLog implements PluginInfoInitialized {
List entry = (List)o;
// TODO: refactor this out so we get common error handling
- int opAndFlags = (Integer)entry.get(0);
+ int opAndFlags = (Integer)entry.get(UpdateLog.FLAGS_IDX);
if (latestOperation == 0) {
latestOperation = opAndFlags;
}
int oper = opAndFlags & UpdateLog.OPERATION_MASK;
- long version = (Long) entry.get(1);
+ long version = (Long) entry.get(UpdateLog.VERSION_IDX);
switch (oper) {
case UpdateLog.ADD:
+ case UpdateLog.UPDATE_INPLACE:
case UpdateLog.DELETE:
case UpdateLog.DELETE_BY_QUERY:
Update update = new Update();
@@ -1027,13 +1223,16 @@ public class UpdateLog implements PluginInfoInitialized {
update.pointer = reader.position();
update.version = version;
+ if (oper == UpdateLog.UPDATE_INPLACE && entry.size() == 5) {
+ update.previousVersion = (Long) entry.get(UpdateLog.PREV_VERSION_IDX);
+ }
updatesForLog.add(update);
updates.put(version, update);
if (oper == UpdateLog.DELETE_BY_QUERY) {
deleteByQueryList.add(update);
} else if (oper == UpdateLog.DELETE) {
- deleteList.add(new DeleteUpdate(version, (byte[])entry.get(2)));
+ deleteList.add(new DeleteUpdate(version, (byte[])entry.get(entry.size()-1)));
}
break;
@@ -1371,22 +1570,17 @@ public class UpdateLog implements PluginInfoInitialized {
// should currently be a List<Oper,Ver,Doc/Id>
List entry = (List) o;
- operationAndFlags = (Integer) entry.get(0);
+ operationAndFlags = (Integer) entry.get(UpdateLog.FLAGS_IDX);
int oper = operationAndFlags & OPERATION_MASK;
- long version = (Long) entry.get(1);
+ long version = (Long) entry.get(UpdateLog.VERSION_IDX);
switch (oper) {
+ case UpdateLog.UPDATE_INPLACE: // fall through to ADD
case UpdateLog.ADD: {
recoveryInfo.adds++;
- // byte[] idBytes = (byte[]) entry.get(2);
- SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
- AddUpdateCommand cmd = new AddUpdateCommand(req);
- // cmd.setIndexedId(new BytesRef(idBytes));
- cmd.solrDoc = sdoc;
- cmd.setVersion(version);
+ AddUpdateCommand cmd = convertTlogEntryToAddUpdateCommand(req, entry, oper, version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
- if (debug) log.debug("add " + cmd);
-
+ log.debug("{} {}", oper == ADD ? "add" : "update", cmd);
proc.processAdd(cmd);
break;
}
@@ -1413,7 +1607,6 @@ public class UpdateLog implements PluginInfoInitialized {
proc.processDelete(cmd);
break;
}
-
case UpdateLog.COMMIT: {
commitVersion = version;
break;
@@ -1486,6 +1679,31 @@ public class UpdateLog implements PluginInfoInitialized {
}
}
+ /**
+ * Given a entry from the transaction log containing a document, return a new AddUpdateCommand that
+ * can be applied to ADD the document or do an UPDATE_INPLACE.
+ *
+ * @param req The request to use as the owner of the new AddUpdateCommand
+ * @param entry Entry from the transaction log that contains the document to be added
+ * @param operation The value of the operation flag; this must be either ADD or UPDATE_INPLACE --
+ * if it is UPDATE_INPLACE then the previous version will also be read from the entry
+ * @param version Version already obtained from the entry.
+ */
+ public static AddUpdateCommand convertTlogEntryToAddUpdateCommand(SolrQueryRequest req, List entry,
+ int operation, long version) {
+ assert operation == UpdateLog.ADD || operation == UpdateLog.UPDATE_INPLACE;
+ SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size()-1);
+ AddUpdateCommand cmd = new AddUpdateCommand(req);
+ cmd.solrDoc = sdoc;
+ cmd.setVersion(version);
+
+ if (operation == UPDATE_INPLACE) {
+ long prevVersion = (Long) entry.get(UpdateLog.PREV_VERSION_IDX);
+ cmd.prevVersion = prevVersion;
+ }
+ return cmd;
+ }
+
public void cancelApplyBufferedUpdates() {
this.cancelApplyBufferUpdate = true;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/java/org/apache/solr/update/VersionInfo.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/VersionInfo.java b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
index 3c55172..07172eb 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionInfo.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
@@ -193,6 +193,10 @@ public class VersionInfo {
return ulog.lookupVersion(idBytes);
}
+ /**
+ * Returns the latest version from the index, searched by the given id (bytes) as seen from the realtime searcher.
+ * Returns null if no document can be found in the index for the given id.
+ */
public Long getVersionFromIndex(BytesRef idBytes) {
// TODO: we could cache much of this and invalidate during a commit.
// TODO: most DocValues classes are threadsafe - expose which.
@@ -219,6 +223,9 @@ public class VersionInfo {
}
}
+ /**
+ * Returns the highest version from the index, or 0L if no versions can be found in the index.
+ */
public Long getMaxVersionFromIndex(IndexSearcher searcher) throws IOException {
String versionFieldName = versionField.getName();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateDocumentMerger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateDocumentMerger.java b/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateDocumentMerger.java
index 452574e..58554d5 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateDocumentMerger.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateDocumentMerger.java
@@ -16,25 +16,36 @@
*/
package org.apache.solr.update.processor;
+import java.io.IOException;
import java.lang.invoke.MethodHandles;
-
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.schema.CopyField;
import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.schema.NumericValueFieldType;
import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,7 +134,221 @@ public class AtomicUpdateDocumentMerger {
return toDoc;
}
+
+ /**
+ * Given a schema field, return whether or not such a field is supported for an in-place update.
+ * Note: If an update command has updates to only supported fields (and _version_ is also supported),
+ * only then is such an update command executed as an in-place update.
+ */
+ private static boolean isSupportedFieldForInPlaceUpdate(SchemaField schemaField) {
+ if (schemaField == null) {
+ // nocommit: shouldn't "schemaField == null" trip an assert, or cause IllegalArgumentException
+ // nocommit: if there is a reason for this behavior, it should be noted in javadocs, and explained here in comment
+
+ return false; // nocommit: why?
+ }
+ return !(schemaField.indexed() || schemaField.stored() || !schemaField.hasDocValues() ||
+ schemaField.multiValued() || !(schemaField.getType() instanceof NumericValueFieldType));
+ }
+
+ /**
+ * Get a list of the non stored DV Fields in the index from a realtime searcher
+ */
+ private static Set<String> getNonStoredDocValueFieldNamesFromSearcher(SolrCore core) {
+ // nocommit: is this dead code?
+ // nocommit: originally calling code now uses IndexWriter to get field names
+ RefCounted<SolrIndexSearcher> holder = core.getRealtimeSearcher();
+ try {
+ SolrIndexSearcher searcher = holder.get();
+ return Collections.unmodifiableSet(searcher.getNonStoredDVs(false));
+ } finally {
+ holder.decref();
+ }
+ }
+
+ /**
+ * Given an add update command, is it suitable for an in-place update operation? If so, return the updated fields
+ *
+ * @return If this is an in-place update, return a set of fields that require in-place update.
+ * If this is not an in-place update, return an empty set.
+ */
+ public static Set<String> isInPlaceUpdate(AddUpdateCommand cmd) {
+ // nocommit: this method name no longer makes any sense since it doesn't return a boolean
+
+ SolrInputDocument sdoc = cmd.getSolrInputDocument();
+ BytesRef id = cmd.getIndexedId();
+ IndexSchema schema = cmd.getReq().getSchema();
+
+ final SchemaField uniqueKeyField = schema.getUniqueKeyField();
+ final String uniqueKeyFieldName = null == uniqueKeyField ? null : uniqueKeyField.getName();
+
+ Set<String> candidateFields = new HashSet<>();
+
+ // Whether this update command has any update to a supported field. A supported update requires the value be a map.
+ boolean hasAMap = false;
+
+ // first pass, check the things that are virtually free,
+ // and bail out early if anything is obviously not a valid in-place update
+ for (String fieldName : sdoc.getFieldNames()) {
+ if (fieldName.equals(uniqueKeyFieldName)
+ || fieldName.equals(DistributedUpdateProcessor.VERSION_FIELD)) {
+ continue;
+ }
+ Object fieldValue = sdoc.getField(fieldName).getValue();
+ if (! (fieldValue instanceof Map) ) {
+ // not even an atomic update, definitely not an in-place update
+ return Collections.emptySet();
+ }
+ // else it's a atomic update map...
+ for (String op : ((Map<String, Object>)fieldValue).keySet()) {
+ if (!op.equals("set") && !op.equals("inc")) {
+ // not a supported in-place update op
+ return Collections.emptySet();
+ }
+ }
+ candidateFields.add(fieldName);
+ }
+
+ if (candidateFields.isEmpty()) {
+ return Collections.emptySet();
+ }
+
+ // lazy init this so we don't call iw.getFields() (and sync lock on IndexWriter) unless needed
+ Set<String> fieldNamesFromIndexWriter = null;
+ // nocommit: see question about why dynamicFields are special below...
+ // nocommit: if dynamicField doesn't actaully matter, then don't bother lazy initing this,
+ // nocommit: just move the init logic here.
+
+ // second pass over the candidates for in-place updates
+ // this time more expensive checks
+ for (String fieldName: candidateFields) {
+ SchemaField schemaField = schema.getFieldOrNull(fieldName);
+
+ if (!isSupportedFieldForInPlaceUpdate(schemaField)) {
+ return Collections.emptySet();
+ }
+
+ // if this field has copy target which is not supported for in place, then false
+ for (CopyField copyField: schema.getCopyFieldsList(fieldName)) {
+ if (!isSupportedFieldForInPlaceUpdate(copyField.getDestination()))
+ return Collections.emptySet();
+ }
+
+ // nocommit: why does it matter if this is a dynamic field?
+ //
+ // nocommit: comment below says dynamicField dests that don't yet exist won't work for inplace...
+ // nocommit: ...but that doesn't explain why <dynamicFields> are special
+ // nocommit: why would a <field> that doesn't yet exist in the IndexWriter work?
+ //
+ // nocommit: see elaboration of this question in jira comments
+ if (schema.isDynamicField(fieldName)) {
+ if (null == fieldNamesFromIndexWriter) { // lazy init fieldNamesFromIndexWriter
+ try {
+ SolrCore core = cmd.getReq().getCore();
+ RefCounted<IndexWriter> holder = core.getSolrCoreState().getIndexWriter(core);
+ try {
+ IndexWriter iw = holder.get();
+ fieldNamesFromIndexWriter = iw.getFieldNames();
+ } finally {
+ holder.decref();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e); // nocommit
+
+ // nocommit: if we're going to throw a runtime excep it should be a SolrException with usefull code/msg
+ // nocommit: but why are we catching/wrapping the IOE? why aren't we rethrowing?
+ }
+ }
+ if (! fieldNamesFromIndexWriter.contains(fieldName) ) {
+ // nocommit: this comment is not usefull - doesn't explain *WHY*
+ return Collections.emptySet(); // if dynamic field and this field doesn't exist, DV update can't work
+ }
+ }
+ }
+ return candidateFields;
+ }
+ /**
+ * Given an AddUpdateCommand containing update operations (e.g. set, inc), merge and resolve the operations into
+ * a partial document that can be used for indexing the in-place updates. The AddUpdateCommand is modified to contain
+ * the partial document (instead of the original document which contained the update operations) and also
+ * the prevVersion that this in-place update depends on.
+ * Note: updatedFields passed into the method can be changed, i.e. the version field can be added to the set.
+ * @return If in-place update cannot succeed, e.g. if the old document is deleted recently, then false is returned. A false
+ * return indicates that this update can be re-tried as a full atomic update. Returns true if the in-place update
+ * succeeds.
+ */
+ public boolean doInPlaceUpdateMerge(AddUpdateCommand cmd, Set<String> updatedFields) throws IOException {
+ SolrInputDocument inputDoc = cmd.getSolrInputDocument();
+ BytesRef idBytes = cmd.getIndexedId();
+
+ updatedFields.add(DistributedUpdateProcessor.VERSION_FIELD); // add the version field so that it is fetched too
+ SolrInputDocument oldDocument = RealTimeGetComponent.getInputDocument(cmd.getReq().getCore(),
+ idBytes, true, updatedFields, false); // avoid stored fields from index
+ if (oldDocument == RealTimeGetComponent.DELETED || oldDocument == null) {
+ // This doc was deleted recently. In-place update cannot work, hence a full atomic update should be tried.
+ return false;
+ }
+
+ // If oldDocument doesn't have a field that is present in updatedFields,
+ // then fetch the field from RT searcher into oldDocument.
+ // This can happen if the oldDocument was fetched from tlog, but the DV field to be
+ // updated was not in that document.
+ if (oldDocument.getFieldNames().containsAll(updatedFields) == false) {
+ RefCounted<SolrIndexSearcher> searcherHolder = null;
+ try {
+ searcherHolder = cmd.getReq().getCore().getRealtimeSearcher();
+ SolrIndexSearcher searcher = searcherHolder.get();
+ int docid = searcher.getFirstMatch(new Term(idField.getName(), idBytes));
+ if (docid >= 0) {
+ searcher.decorateDocValueFields(oldDocument, docid, updatedFields);
+ } else {
+ // Not all fields needed for DV updates were found in the document obtained
+ // from tlog, and the document wasn't found in the index.
+ return false; // do a full atomic update
+ }
+ } finally {
+ if (searcherHolder != null) {
+ searcherHolder.decref();
+ }
+ }
+ }
+
+ if (oldDocument.containsKey(DistributedUpdateProcessor.VERSION_FIELD) == false) {
+ throw new SolrException (ErrorCode.INVALID_STATE, "There is no _version_ in previous document. id=" +
+ cmd.getPrintableId());
+ }
+ Long oldVersion = (Long) oldDocument.remove(DistributedUpdateProcessor.VERSION_FIELD).getValue();
+
+ // Copy over all supported DVs from oldDocument to partialDoc
+ //
+ // Assuming multiple updates to the same doc: field 'dv1' in one update, then field 'dv2' in a second
+ // update, and then again 'dv1' in a third update (without commits in between), the last update would
+ // fetch from the tlog the partial doc for the 2nd (dv2) update. If that doc doesn't copy over the
+ // previous updates to dv1 as well, then a full resolution (by following previous pointers) would
+ // need to be done to calculate the dv1 value -- so instead copy all the potentially affected DV fields.
+ SolrInputDocument partialDoc = new SolrInputDocument();
+ String uniqueKeyField = schema.getUniqueKeyField().getName();
+ for (String fieldName : oldDocument.getFieldNames()) {
+ SchemaField schemaField = schema.getField(fieldName);
+ if (fieldName.equals(uniqueKeyField) || isSupportedFieldForInPlaceUpdate(schemaField)) {
+ partialDoc.addField(fieldName, oldDocument.getFieldValue(fieldName));
+ }
+ }
+
+ merge(inputDoc, partialDoc);
+
+ // Populate the id field if not already populated (this can happen since stored fields were avoided during fetch from RTGC)
+ if (!partialDoc.containsKey(schema.getUniqueKeyField().getName())) {
+ partialDoc.addField(idField.getName(),
+ inputDoc.getField(schema.getUniqueKeyField().getName()).getFirstValue());
+ }
+
+ cmd.prevVersion = oldVersion;
+ cmd.solrDoc = partialDoc;
+ return true;
+ }
+
protected void doSet(SolrInputDocument toDoc, SolrInputField sif, Object fieldVal) {
SchemaField sf = schema.getField(sif.getName());
toDoc.setField(sif.getName(), sf.getType().toNativeType(fieldVal), sif.getBoost());
[3/4] lucene-solr:jira/solr-5944: SOLR-5944 Initial import into branch
Posted by is...@apache.org.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index b8bdd16..e0206af 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -36,7 +36,13 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRefBuilder;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.SolrRequest.METHOD;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.GenericSolrRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.SimpleSolrResponse;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.Overseer;
@@ -82,9 +88,11 @@ import org.apache.solr.update.SolrIndexSplitter;
import org.apache.solr.update.UpdateCommand;
import org.apache.solr.update.UpdateHandler;
import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.update.VersionBucket;
import org.apache.solr.update.VersionInfo;
import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,6 +106,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
public static final String DISTRIB_FROM_COLLECTION = "distrib.from.collection";
public static final String DISTRIB_FROM_PARENT = "distrib.from.parent";
public static final String DISTRIB_FROM = "distrib.from";
+ public static final String DISTRIB_INPLACE_PREVVERSION = "distrib.inplace.prevversion";
private static final String TEST_DISTRIB_SKIP_SERVERS = "test.distrib.skip.servers";
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -728,7 +737,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
}
-
+
+ // If we were sent a previous version, set this to the AddUpdateCommand (if not already set)
+ if (!cmd.isInPlaceUpdate()) {
+ cmd.prevVersion = cmd.getReq().getParams().getLong(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, -1);
+ }
// TODO: if minRf > 1 and we know the leader is the only active replica, we could fail
// the request right here but for now I think it is better to just return the status
// to the client that the minRf wasn't reached and let them handle it
@@ -785,7 +798,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (replicationTracker != null && minRf > 1)
params.set(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
-
+
+ if (cmd.isInPlaceUpdate()) {
+ params.set(DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion));
+ }
cmdDistrib.distribAdd(cmd, nodes, params, false, replicationTracker);
}
@@ -1013,9 +1029,21 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
VersionBucket bucket = vinfo.bucket(bucketHash);
+ long dependentVersionFound = -1; // Last found version for a dependent update; applicable only for in-place updates; useful for logging later
+ // if this is an inplace update, check and wait if we should be waiting for a dependent update, before
+ // entering the synchronized block
+ if (!leaderLogic && cmd.isInPlaceUpdate()) {
+ dependentVersionFound = waitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync, bucket);
+ if (dependentVersionFound == -1) {
+ // it means in leader, the document has been deleted by now. drop this update
+ return true;
+ }
+ }
+
vinfo.lockForUpdate();
try {
synchronized (bucket) {
+ bucket.notifyAll(); //just in case anyone is waiting let them know that we have a new update
// we obtain the version when synchronized and then do the add so we can ensure that
// if version1 < version2 then version1 is actually added before version2.
@@ -1080,23 +1108,85 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return true;
}
- // if we aren't the leader, then we need to check that updates were not re-ordered
- if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
- // we're OK... this update has a version higher than anything we've seen
- // in this bucket so far, so we know that no reordering has yet occurred.
- bucket.updateHighest(versionOnUpdate);
- } else {
- // there have been updates higher than the current update. we need to check
- // the specific version for this id.
+ if (cmd.isInPlaceUpdate()) { // nocommit: OUTER IF (see nocommits below regarding INNER ELSE)
+
+ long prev = cmd.prevVersion;
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
- if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
- // This update is a repeat, or was reordered. We need to drop this update.
- log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
+ if (lastVersion == null || Math.abs(lastVersion) < prev) {
+ // this was checked for (in waitForDependentUpdates()) before entering the synchronized block.
+ // So we shouldn't be here, unless what must've happened is:
+ // by the time synchronization block was entered, the prev update was deleted by DBQ. Since
+ // now that update is not in index, the vinfo.lookupVersion() is possibly giving us a version
+ // from the deleted list (which might be older than the prev update!)
+ AddUpdateCommand fetchedFromLeader = fetchFullUpdateFromLeader(cmd.getPrintableId(),
+ cmd.getReq().getCore().getCoreDescriptor().getCoreContainer().getUpdateShardHandler());
+
+ // nocommit: this log message isn't seem useful in it's current form...
+ // nocommit: it doesn't explain *why* it did a "fetch from leader" let alone why anyone should care
+ // nocommit: perhaps: ("In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}", idBytes, ...)
+ log.info("Fetched from leader: {}", (fetchedFromLeader == null? null: fetchedFromLeader.solrDoc));
+
+ if (fetchedFromLeader == null) {
+ log.debug("Leader told us that this document was subsequently deleted. lastVersion: " +
+ lastVersion + ", prev: " + prev + ", id: " + idBytes.utf8ToString() + ", lastFoundVersion: " + dependentVersionFound);
+ return true;
+ } else {
+ // Newer document was fetched from the leader. Apply that document instead of this current
+ // in-place update.
+ log.debug("Newer document is available now. lastVersion: " +
+ lastVersion + ", prev: " + prev + ", id: " + idBytes.utf8ToString() + ", lastFoundVersion: " + dependentVersionFound);
+
+ // nocommit: INNER ELSE -- I'm concerned/confused by this code happening here...
+ //
+ // at this point, we're replacing the data in our existing "in-place" update (cmd) so it
+ // becomes a normal "add" using the full SolrInputDocument fetched from the leader
+ //
+ // but this if/else is itself is wrapped in a bigger if (grep for "OUTER IF" above
+ // and "OUTER ELSE" below) where the "else" clause does some sanity checking / processing
+ // logic for "// non inplace update, i.e. full document update" -- all of which is
+ // skipped for our modified "cmd"
+ //
+ // shouldn't the logic in that outer else clause also be applied to our
+ // "no longer really an in-place" update?
+ cmd.solrDoc = fetchedFromLeader.solrDoc;
+ cmd.prevVersion = -1;
+ cmd.setVersion((long)cmd.solrDoc.getFieldValue(VERSION_FIELD));
+ }
+ }
+
+ if (lastVersion != null && prev < Math.abs(lastVersion)) {
+ // this means we got a newer full doc update and in that case it makes no sense to apply the older
+ // inplace update. Drop this update
+ log.info("Update was applied on version: " + prev + ", but last version I have is: " + lastVersion
+ + ". Dropping current update.");
return true;
+ } else {
+ // We're good, we should apply this update. First, update the bucket's highest.
+ if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+ bucket.updateHighest(versionOnUpdate);
+ }
}
+ } else { // non inplace update, i.e. full document update
+ // nocommit: OUTER ELSE (see nocommits above regarding INNER ELSE)
+
+ // if we aren't the leader, then we need to check that updates were not re-ordered
+ if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+ // we're OK... this update has a version higher than anything we've seen
+ // in this bucket so far, so we know that no reordering has yet occurred.
+ bucket.updateHighest(versionOnUpdate);
+ } else {
+ // there have been updates higher than the current update. we need to check
+ // the specific version for this id.
+ Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
+ // This update is a repeat, or was reordered. We need to drop this update.
+ log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
+ return true;
+ }
- // also need to re-apply newer deleteByQuery commands
- checkDeleteByQueries = true;
+ // also need to re-apply newer deleteByQuery commands
+ checkDeleteByQueries = true;
+ }
}
}
}
@@ -1122,11 +1212,213 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return false;
}
+ /**
+ * This method checks the update/transaction logs and index to find out if the update ("previous update") that the current update
+ * depends on (in the case that this current update is an in-place update) has already been completed. If not,
+ * this method will wait for the missing update until it has arrived. If it doesn't arrive within a timeout threshold,
+ * then this actively fetches from the leader.
+ *
+ * @return -1 if the current in-place should be dropped, or last found version if previous update has been indexed.
+ */
+ private long waitForDependentUpdates(AddUpdateCommand cmd, long versionOnUpdate,
+ boolean isReplayOrPeersync, VersionBucket bucket) throws IOException {
+ long lastFoundVersion = 0;
+ TimeOut waitTimeout = new TimeOut(5, TimeUnit.SECONDS);
+
+ vinfo.lockForUpdate();
+ try {
+ synchronized (bucket) {
+ Long lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ lastFoundVersion = lookedUpVersion == null ? 0L: lookedUpVersion;
+
+ if (Math.abs(lastFoundVersion) < cmd.prevVersion) {
+ log.debug("Re-ordered inplace update. version={}, prevVersion={}, lastVersion={}, replayOrPeerSync={}, id={}",
+ (cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()), cmd.prevVersion, lastFoundVersion, isReplayOrPeersync, cmd.getPrintableId());
+ }
+
+ while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut()) {
+ try {
+ long timeLeft = waitTimeout.timeLeft(TimeUnit.MILLISECONDS);
+ if (timeLeft > 0) { // wait(0) waits forever until notified, but we don't want that.
+ bucket.wait(timeLeft);
+ }
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+ lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ lastFoundVersion = lookedUpVersion == null ? 0L: lookedUpVersion;
+ }
+ }
+ } finally {
+ vinfo.unlockForUpdate();
+ }
+
+ if (Math.abs(lastFoundVersion) > cmd.prevVersion) {
+ // This must've been the case due to a higher version full update succeeding concurrently, while we were waiting or
+ // trying to index this partial update. Since a full update more recent than this partial update has succeeded,
+ // we can drop the current update.
+ if (log.isDebugEnabled()) {
+ log.debug("Update was applied on version: {}, but last version I have is: {}"
+ + ". Current update should be dropped. id={}", cmd.prevVersion, lastFoundVersion, cmd.getPrintableId());
+ }
+ return -1;
+ } else if (lastFoundVersion == cmd.prevVersion) {
+ // nocommit: suspicious comparison: why aren't we using Math.abs(lastFoundVersion) here?
+ //
+ // nocommit: most conditional checks on lastFoundVersion use Math.abs(lastFoundVersion) to account
+ // nocommit: for the possibility that it's negative -- which IIUC means that it was a delete
+ //
+ // i'm not entirely sure how/why/if the cmd.prevVersion could ever refer to a version corrisponding
+ // to a delete, but it's a possibility the code should account for one way or another..
+ //
+ // if the answer is "should never happen" then -- as with any situation where someone tells me
+ // something can never happen -- my response is "assert that it doesn't"
+ //
+ // nocommit: suggest replacing this "else if" clause with...
+ //
+ // } else if (Math.abs(lastFoundVersion) == cmd.prevVersion) {
+ // assert 0 < lastFoundVersion : "prevVersion " + cmd.prevVersion + " found but is a delete!"
+ // if (log.isDebugEnabled()) {
+ // log.debug("Dependent update found. id={}", cmd.getPrintableId());
+ // }
+ // return lastFoundVersion;
+ // }
+ //
+ // nocommit: or am i missunderstanding? is there a deliberate reason Math.abs isn't being used here?
+ if (log.isDebugEnabled()) {
+ log.debug("Dependent update found. id={}", cmd.getPrintableId());
+ }
+ return lastFoundVersion;
+ }
+
+ // We have waited enough, but dependent update didn't arrive. Its time to actively fetch it from leader
+ log.info("Missing update, on which current in-place update depends on, hasn't arrived. id={}, looking for version={}, last found version={}",
+ cmd.getPrintableId(), cmd.prevVersion, lastFoundVersion);
+
+ AddUpdateCommand missingUpdate = fetchFullUpdateFromLeader(cmd.getPrintableId(),
+ cmd.getReq().getCore().getCoreDescriptor().getCoreContainer().getUpdateShardHandler());
+ if (missingUpdate == null) {
+ // nocommit: this message isn't helpful to users...
+ // nocommit: it doesn't make it clear that we know/understand why his happened and it's "ok"
+ // nocommit: suggest: "Tried to fetch missing update from the leader, but leader says document has been deleted, skipping update (Last found version: {}, was looking for {}, id={})"
+ log.info("Tried to fetch missing update from the leader, but missing wasn't present at leader. "
+ + "Last found version: " + lastFoundVersion + ", was looking for: " + cmd.prevVersion + ", id=" + cmd.getPrintableId());
+ return -1; // -1 indicates deleted at leader.
+ }
+ log.info("Fetched the update: {}", missingUpdate);
+
+ versionAdd(missingUpdate);
+
+ log.info("Added the fetched update, id="+missingUpdate.getPrintableId()+", version="+missingUpdate.getVersion());
+ return missingUpdate.getVersion();
+ }
+
+ /**
+ * This method is used when an update on which a particular in-place update has been lost for some reason. This method
+ * sends a request to the shard leader to fetch the latest full document as seen on the leader.
+ * @return AddUpdateCommand containing latest full doc at shard leader for the given id, or null if not found.
+ */
+ private AddUpdateCommand fetchFullUpdateFromLeader(String id, UpdateShardHandler updateHandler) throws IOException {
+ // nocommit: why does this method take in a an a String id and UpdateShardHandler
+ // nocommit: when the only callers always pass the exact same values from an AddUpdateCommand?
+ //
+ // this method should just take in an AddUpdateCommand and let call cmd.getPrintableId() and cmd.getReq().getCore().getCoreDescriptor().getCoreContainer().getUpdateShardHandler() itself
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set("distrib", false);
+ params.set("getInputDocument", id);
+ params.set("onlyIfActive", true);
+ SolrRequest<SimpleSolrResponse> ur = new GenericSolrRequest(METHOD.GET, "/get", params);
+
+ String leaderUrl = req.getParams().get(DISTRIB_FROM);
+
+ if (leaderUrl == null) {
+ // An update we're dependent upon didn't arrive! This is unexpected. Perhaps likely our leader is
+ // down or partitioned from us for some reason. Lets force refresh cluster state, and request the
+ // leader for the update.
+ if (zkController == null) { // we should be in cloud mode, but wtf? could be a unit test
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Can't find document with id=" + id + ", but fetching from leader "
+ + "failed since we're not in cloud mode.");
+ }
+ try {
+ // nocommit: do not use forceUpdateCollection here, per shalin's comments back in August...
+
+ // Under no circumstances should we we calling `forceUpdateCollection` in the indexing code path.
+ // It is just too dangerous in the face of high indexing rates. Instead we should do what the
+ // DUP is already doing i.e. calling getLeaderRetry to figure out the current leader. If the
+ // current replica is partitioned from the leader then we have other mechanisms for taking care
+ // of it and the replica has no business trying to determine this.
+ zkController.getZkStateReader().forceUpdateCollection(cloudDesc.getCollectionName()); // nocommit
+ } catch (KeeperException | InterruptedException e1) { /* No worries if the force refresh failed */ }
+ Replica leader = zkController.getClusterState().
+ getCollection(cloudDesc.getCollectionName()).getLeader(cloudDesc.getShardId());
+ leaderUrl = leader.getCoreUrl();
+ }
+
+ HttpSolrClient hsc = new HttpSolrClient.Builder(leaderUrl).
+ withHttpClient(updateHandler.getHttpClient()).build();
+ NamedList rsp = null;
+ try {
+ rsp = hsc.request(ur);
+ } catch (SolrServerException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Error during fetching [" + id +
+ "] from leader (" + leaderUrl + "): ", e);
+ } finally {
+ hsc.close();
+ }
+ Object inputDocObj = rsp.get("inputDocument");
+ SolrInputDocument leaderDoc = (SolrInputDocument) inputDocObj;
+
+ if (leaderDoc == null) {
+ return null; // desired update not found (likely deleted by now)
+
+ // nocommit: i think there's a gap in the DUP logic related to how the results of this method
+ // nocommit: are used when it returns null -- ie: replicas never get dependent updates...
+ //
+ // nocommit: consider a situation like this..
+ //
+ // - assume some doc D exists.
+ // - assume replica1 gets an in-place update for D we'll call Y,
+ // which is dependent on a prevVersion update we'll call X
+ // - assume replica1 never recieved update X
+ // - In current (patched) DUP code, this will cause replica1 to wait to see if X arrives out of order.
+ // - assume X never arrives.
+ // - In the current (patched) DUP code, replica1 will then ask the leader for the latest SolrInputDocument of D
+ // - assume leader has already processed some update we'll call Z which did a DBQ that deleted D
+ // - because of this, leader will return "null" to indicate D has been deleted
+ // - when replica1 gets "null" from the leader, the current (patched) DUP code "drops" Y
+ // - presumably this is because it shouldn't mater, and we expect to (eventually process) Z
+ // to delete the doc from the local replica1
+ // - but what if the Z's DBQ was dependent of the modified values of Y?
+ // - ie: what if Y was "add(id=mydocid,field_i={set=42})" and Z was "deleteByQuery(field_i:42)"
+ //
+ // nocommit: Shouldn't the current code paths where replicas will drop/ignore update Y when
+ // nocommit: fetchFullUpdateFromLeader returns null be changed so that instead replicas will explicitly
+ // nocommit: delete the docId involved???
+ // nocommit: or at the very least, fallback to handling this as a normal (non-inplace) "atomic update" ?
+
+ }
+ AddUpdateCommand cmd = new AddUpdateCommand(req);
+ cmd.solrDoc = leaderDoc;
+ cmd.setVersion((long)leaderDoc.getFieldValue(VERSION_FIELD));
+ return cmd;
+ }
+
// TODO: may want to switch to using optimistic locking in the future for better concurrency
// that's why this code is here... need to retry in a loop closely around/in versionAdd
boolean getUpdatedDocument(AddUpdateCommand cmd, long versionOnUpdate) throws IOException {
if (!AtomicUpdateDocumentMerger.isAtomicUpdate(cmd)) return false;
+ Set<String> inPlaceUpdatedFields = AtomicUpdateDocumentMerger.isInPlaceUpdate(cmd);
+ if (inPlaceUpdatedFields.size() > 0) { // non-empty means this is suitable for in-place updates
+ if (docMerger.doInPlaceUpdateMerge(cmd, inPlaceUpdatedFields)) {
+ return true;
+ } else {
+ // in-place update failed, so fall through and re-try the same with a full atomic update
+ }
+ }
+
+ // full (non-inplace) atomic update
SolrInputDocument sdoc = cmd.getSolrInputDocument();
BytesRef id = cmd.getIndexedId();
SolrInputDocument oldDoc = RealTimeGetComponent.getInputDocument(cmd.getReq().getCore(), id);
@@ -1142,7 +1434,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
} else {
oldDoc.remove(VERSION_FIELD);
}
-
+
cmd.solrDoc = docMerger.merge(sdoc, oldDoc);
return true;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java
index c21ea76..1ac66e0 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java
@@ -261,7 +261,12 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
SolrInputDocument oldDoc = null;
if (useFieldCache) {
- oldDoc = RealTimeGetComponent.getInputDocumentFromTlog(core, indexedDocId);
+ // nocommit: why is this passing false?
+ // nocommit: prior to this patch, there was no such thing as an inplace update...
+ // nocommit: any call to getInputDocument* would by definition return a "full document"
+ //
+ // (last patch fixed getInputDocument call, but not this getInputDocumentFromTlog; intentional?)
+ oldDoc = RealTimeGetComponent.getInputDocumentFromTlog(core, indexedDocId, false);
if (oldDoc == RealTimeGetComponent.DELETED) {
return true;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/test-files/solr/collection1/conf/schema-inplace-updates.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/collection1/conf/schema-inplace-updates.xml b/solr/core/src/test-files/solr/collection1/conf/schema-inplace-updates.xml
new file mode 100644
index 0000000..77b852d
--- /dev/null
+++ b/solr/core/src/test-files/solr/collection1/conf/schema-inplace-updates.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<schema name="inplace-updates" version="1.6">
+ <uniqueKey>id</uniqueKey>
+ <field name="id" type="string" indexed="true" stored="true" docValues="true"/>
+ <field name="_version_" type="long" indexed="false" stored="false" docValues="true" />
+
+ <!-- specific schema fields for dv in-place updates -->
+ <field name="inplace_updatable_float" type="float" indexed="false" stored="false" docValues="true" default="0"/>
+ <field name="inplace_updatable_int" type="int" indexed="false" stored="false" docValues="true" default="0"/>
+
+ <!-- dynamic fields which *ONLY* use docValues so they can be updated in place -->
+ <dynamicField name="*_i_dvo" multiValued="false" type="int" docValues="true" indexed="false" stored="false"/>
+ <dynamicField name="*_f_dvo" multiValued="false" type="float" docValues="true" indexed="false" stored="false"/>
+ <dynamicField name="*_l_dvo" multiValued="false" type="long" docValues="true" indexed="false" stored="false"/>
+
+ <!-- dynamic fields that must *NOT* support in place updates -->
+ <dynamicField name="*_s" type="string" indexed="true" stored="true"/>
+ <dynamicField name="*_i" type="int" indexed="true" stored="true"/>
+ <dynamicField name="*_l" type="long" indexed="true" stored="true"/>
+
+ <!-- Copy fields -->
+
+ <!-- The id field has a non in-place updateable copy target, but in-place updates should still work. -->
+ <copyField source="id" dest="id_field_copy_that_does_not_support_in_place_update_s"/>
+
+ <!-- copyfield1: src and dest are both updatable -->
+ <field name="copyfield1_src__both_updateable" type="int" indexed="false" stored="false" docValues="true" default="0"/>
+ <copyField source="copyfield1_src__both_updateable" dest="copyfield1_dest__both_updatable_i_dvo"/>
+
+ <!-- copyfield2: src is updatable but dest is not -->
+ <field name="copyfield2_src__only_src_updatable" type="int" indexed="false" stored="false" docValues="true" default="0"/>
+ <copyField source="copyfield2_src__only_src_updatable" dest="copyfield2_dest__only_src_updatable_i"/>
+
+
+ <!-- cruft needed by the solrconfig used in our tests for startup, but not used in the tests -->
+ <field name="signatureField" type="string" indexed="true" stored="false"/>
+ <dynamicField name="*_sS" type="string" indexed="true" stored="true"/>
+
+
+ <fieldType name="string" class="solr.StrField" multiValued="false" indexed="false" stored="false" docValues="false" />
+ <fieldType name="long" class="solr.TrieLongField" multiValued="false" indexed="false" stored="false" docValues="false"/>
+ <fieldType name="float" class="solr.TrieFloatField" multiValued="false" indexed="false" stored="false" docValues="false"/>
+ <fieldType name="int" class="solr.TrieIntField" multiValued="false" indexed="false" stored="false" docValues="false"/>
+
+</schema>
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/test-files/solr/collection1/conf/schema.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/collection1/conf/schema.xml b/solr/core/src/test-files/solr/collection1/conf/schema.xml
index 35de166..1955759 100644
--- a/solr/core/src/test-files/solr/collection1/conf/schema.xml
+++ b/solr/core/src/test-files/solr/collection1/conf/schema.xml
@@ -574,7 +574,7 @@
<field name="tlong" type="tlong" indexed="true" stored="true"/>
- <field name="_version_" type="long" indexed="true" stored="true" multiValued="false"/>
+ <field name="_version_" type="long" indexed="false" stored="false" docValues="true" multiValued="false" useDocValuesAsStored="true"/>
<field name="title_stringNoNorms" type="string" omitNorms="true" indexed="true" stored="true"/>
@@ -679,15 +679,15 @@
<dynamicField name="*_f1_dv" type="float" indexed="true" stored="true" docValues="true" multiValued="false"/>
<!-- Non-stored, DocValues=true -->
- <dynamicField name="*_i_dvo" multiValued="false" type="int" docValues="true" indexed="true" stored="false"
+ <dynamicField name="*_i_dvo" multiValued="false" type="int" docValues="true" indexed="false" stored="false"
useDocValuesAsStored="true"/>
- <dynamicField name="*_d_dvo" multiValued="false" type="double" docValues="true" indexed="true" stored="false"
+ <dynamicField name="*_d_dvo" multiValued="false" type="double" docValues="true" indexed="false" stored="false"
useDocValuesAsStored="true"/>
- <dynamicField name="*_s_dvo" multiValued="false" type="string" docValues="true" indexed="true" stored="false"
+ <dynamicField name="*_s_dvo" multiValued="false" type="string" docValues="true" indexed="false" stored="false"
useDocValuesAsStored="true"/>
- <dynamicField name="*_ii_dvo" multiValued="true" type="int" docValues="true" indexed="true" stored="false"
+ <dynamicField name="*_ii_dvo" multiValued="true" type="int" docValues="true" indexed="false" stored="false"
useDocValuesAsStored="true"/>
- <dynamicField name="*_dd_dvo" multiValued="true" type="double" docValues="true" indexed="true" stored="false"
+ <dynamicField name="*_dd_dvo" multiValued="true" type="double" docValues="true" indexed="false" stored="false"
useDocValuesAsStored="true"/>
<!-- Non-stored, DocValues=true, useDocValuesAsStored=false -->
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/test-files/solr/collection1/conf/schema15.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/collection1/conf/schema15.xml b/solr/core/src/test-files/solr/collection1/conf/schema15.xml
index d545149..e2c14f0 100644
--- a/solr/core/src/test-files/solr/collection1/conf/schema15.xml
+++ b/solr/core/src/test-files/solr/collection1/conf/schema15.xml
@@ -529,7 +529,7 @@
<field name="copyfield_source" type="string" indexed="true" stored="true" multiValued="true"/>
<!-- for versioning -->
- <field name="_version_" type="long" indexed="true" stored="true"/>
+ <field name="_version_" type="long" indexed="false" stored="false" docValues="true"/>
<!-- points to the root document of a block of nested documents -->
<field name="_root_" type="string" indexed="true" stored="true"/>
@@ -545,6 +545,11 @@
<dynamicField name="tv_mv_*" type="text" indexed="true" stored="true" multiValued="true"
termVectors="true" termPositions="true" termOffsets="true"/>
+ <!-- for in-place updates -->
+ <dynamicField name="*_i_dvo" multiValued="false" type="int" docValues="true" indexed="false" stored="false"/>
+ <dynamicField name="*_f_dvo" multiValued="false" type="float" docValues="true" indexed="false" stored="false"/>
+ <dynamicField name="*_l_dvo" multiValued="false" type="long" docValues="true" indexed="false" stored="false"/>
+
<dynamicField name="*_mfacet" type="string" indexed="true" stored="false" multiValued="true"/>
<dynamicField name="*_sw" type="text_sw" indexed="true" stored="true" multiValued="true"/>
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/test/org/apache/solr/cloud/TestStressInPlaceUpdates.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestStressInPlaceUpdates.java b/solr/core/src/test/org/apache/solr/cloud/TestStressInPlaceUpdates.java
new file mode 100644
index 0000000..d028614
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestStressInPlaceUpdates.java
@@ -0,0 +1,495 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.math3.primes.Primes;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.zookeeper.KeeperException;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Slow
+public class TestStressInPlaceUpdates extends AbstractFullDistribZkTestBase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @BeforeClass
+ public static void beforeSuperClass() throws Exception {
+
+ // nocommit: does this test need to randomize between diff schema/fields used?
+ // nocommit: see nocommits/jira questions related to special dynamicField logic in AtomicUpdateDocumentMerger.isInPlaceUpdate
+
+ schemaString = "schema-inplace-updates.xml";
+ configString = "solrconfig-tlog.xml";
+
+ // sanity check that autocommits are disabled
+ initCore(configString, schemaString);
+ assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxTime);
+ assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoSoftCommmitMaxTime);
+ assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxDocs);
+ assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoSoftCommmitMaxDocs);
+ }
+
+ public TestStressInPlaceUpdates() {
+ super();
+ sliceCount = 1;
+ fixShardCount(3);
+ }
+
+ protected final ConcurrentHashMap<Integer, DocInfo> model = new ConcurrentHashMap<>();
+ protected Map<Integer, DocInfo> committedModel = new HashMap<>();
+ protected long snapshotCount;
+ protected long committedModelClock;
+ protected int clientIndexUsedForCommit;
+ protected volatile int lastId;
+ protected final String field = "val_l";
+
+ private void initModel(int ndocs) {
+ for (int i = 0; i < ndocs; i++) {
+ model.put(i, new DocInfo(0l, 0, 0));
+ }
+ committedModel.putAll(model);
+ }
+
+ SolrClient leaderClient = null;
+
+ @Test
+ @ShardsFixed(num = 3)
+ public void stressTest() throws Exception {
+ waitForRecoveriesToFinish(true);
+
+ this.leaderClient = getClientForLeader();
+ assertNotNull("Couldn't obtain client for the leader of the shard", this.leaderClient);
+
+ final int commitPercent = 5 + random().nextInt(20);
+ final int softCommitPercent = 30 + random().nextInt(75); // what percent of the commits are soft
+ final int deletePercent = 4 + random().nextInt(25);
+ final int deleteByQueryPercent = random().nextInt(8);
+ final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200));
+ int nWriteThreads = 5 + random().nextInt(25);
+ int fullUpdatePercent = 5 + random().nextInt(50);
+
+ // query variables
+ final int percentRealtimeQuery = 75;
+ // number of cumulative read/write operations by all threads
+ final AtomicLong operations = new AtomicLong(25000);
+ int nReadThreads = 5 + random().nextInt(25);
+
+
+ /** // testing
+ final int commitPercent = 5;
+ final int softCommitPercent = 100; // what percent of the commits are soft
+ final int deletePercent = 0;
+ final int deleteByQueryPercent = 50;
+ final int ndocs = 10;
+ int nWriteThreads = 10;
+
+ final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
+
+ // query variables
+ final int percentRealtimeQuery = 101;
+ final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
+ int nReadThreads = 10;
+
+ int fullUpdatePercent = 20;
+ **/
+
+ log.info("{}", Arrays.asList
+ ("commitPercent", commitPercent, "softCommitPercent", softCommitPercent,
+ "deletePercent", deletePercent, "deleteByQueryPercent", deleteByQueryPercent,
+ "ndocs", ndocs, "nWriteThreads", nWriteThreads, "percentRealtimeQuery", percentRealtimeQuery,
+ "operations", operations, "nReadThreads", nReadThreads));
+
+ initModel(ndocs);
+
+ List<Thread> threads = new ArrayList<>();
+
+ for (int i = 0; i < nWriteThreads; i++) {
+ Thread thread = new Thread("WRITER" + i) {
+ Random rand = new Random(random().nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.decrementAndGet() > 0) {
+ int oper = rand.nextInt(100);
+
+ if (oper < commitPercent) {
+ Map<Integer, DocInfo> newCommittedModel;
+ long version;
+
+ // nocommit: dual sync blocks w/non-synced commits in between is not thread safe
+ // nocommit: see jira comments for bad scenerio
+ synchronized (TestStressInPlaceUpdates.this) {
+ // take a snapshot of the model
+ // this is safe to do w/o synchronizing on the model because it's a ConcurrentHashMap
+ newCommittedModel = new HashMap<>(model);
+ version = snapshotCount++;
+ }
+
+ int chosenClientIndex = rand.nextInt(clients.size());
+
+ if (rand.nextInt(100) < softCommitPercent) {
+ log.info("softCommit start");
+ clients.get(chosenClientIndex).commit(true, true, true);
+ log.info("softCommit end");
+ } else {
+ log.info("hardCommit start");
+ clients.get(chosenClientIndex).commit();
+ log.info("hardCommit end");
+ }
+
+ // nocommit: dual sync blocks w/non-synced commits in between is not thread safe
+ // nocommit: see jira comments for bad scenerio
+ synchronized (TestStressInPlaceUpdates.this) {
+
+ // install this model snapshot only if it's newer than the current one
+ if (version >= committedModelClock) {
+ if (VERBOSE) {
+ log.info("installing new committedModel version={}", committedModelClock);
+ }
+ clientIndexUsedForCommit = chosenClientIndex;
+ committedModel = newCommittedModel;
+ committedModelClock = version;
+ }
+ }
+ continue;
+ }
+
+ int id;
+
+ if (rand.nextBoolean()) {
+ id = rand.nextInt(ndocs);
+ } else {
+ id = lastId; // reuse the last ID half of the time to force more race conditions
+ }
+
+ // set the lastId before we actually change it sometimes to try and
+ // uncover more race conditions between writing and reading
+ boolean before = rand.nextBoolean();
+ if (before) {
+ lastId = id;
+ }
+
+ DocInfo info = model.get(id);
+
+ // yield after getting the next version to increase the odds of updates happening out of order
+ if (rand.nextBoolean()) Thread.yield();
+
+ if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
+ log.info("deleting id {}: {}",id,info);
+
+ long returnedVersion;
+
+ try {
+ final boolean dbq = (oper >= commitPercent + deletePercent);
+ returnedVersion = deleteDocAndGetVersion(Integer.toString(id), params("_version_", Long.toString(info.version)), dbq);
+ log.info((dbq? "DBI": "DBQ")+": Deleting id=" + id + "], version=" + info.version
+ + ". Returned version=" + returnedVersion);
+ } catch (RuntimeException e) {
+ if (e.getMessage() != null && e.getMessage().contains("version conflict")
+ || e.getMessage() != null && e.getMessage().contains("Conflict")) {
+ // Its okay for a leader to reject a concurrent request
+ log.warn("Conflict during partial update, rejected id=" + id + ", " + e);
+ returnedVersion = -1;
+ } else throw e;
+ }
+
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (Math.abs(returnedVersion) > Math.abs(currInfo.version)) {
+ model.put(id, new DocInfo(returnedVersion, 0, 0));
+ }
+ }
+
+ } else {
+ int val1 = info.intFieldValue;
+ long val2 = info.longFieldValue;
+ int nextVal1 = val1;
+ long nextVal2 = val2;
+
+ int addOper = rand.nextInt(100);
+ long returnedVersion;
+ if (addOper < fullUpdatePercent || info.version <= 0) { // if document was never indexed or was deleted
+ // FULL UPDATE
+ nextVal1 = Primes.nextPrime(val1 + 1);
+ nextVal2 = nextVal1 * 1000000000l;
+ try {
+ returnedVersion = addDocAndGetVersion("id", id, "title_s", "title" + id, "val1_i_dvo", nextVal1, "val2_l_dvo", nextVal2, "_version_", info.version);
+ log.info("FULL: Writing id=" + id + ", val=[" + nextVal1 + "," + nextVal2 + "], version=" + info.version + ", Prev was=[" + val1 + "," + val2 + "]. Returned version=" + returnedVersion);
+
+ } catch (RuntimeException e) {
+ if (e.getMessage() != null && e.getMessage().contains("version conflict")
+ || e.getMessage() != null && e.getMessage().contains("Conflict")) {
+ // Its okay for a leader to reject a concurrent request
+ log.warn("Conflict during partial update, rejected id=" + id + ", " + e);
+ returnedVersion = Integer.MIN_VALUE;
+ } else throw e;
+ }
+ } else {
+ // PARTIAL
+ nextVal2 = val2 + val1;
+ try {
+ returnedVersion = addDocAndGetVersion("id", id, "val2_l_dvo", map("inc", String.valueOf(val1)), "_version_", info.version);
+ log.info("PARTIAL: Writing id=" + id + ", val=[" + nextVal1 + "," + nextVal2 + "], version=" + info.version + ", Prev was=[" + val1 + "," + val2 + "]. Returned version=" + returnedVersion);
+ } catch (RuntimeException e) {
+ if (e.getMessage() != null && e.getMessage().contains("version conflict")
+ || e.getMessage() != null && e.getMessage().contains("Conflict")) {
+ // Its okay for a leader to reject a concurrent request
+ log.warn("Conflict during full update, rejected id=" + id + ", " + e);
+ returnedVersion = -1;
+ } else if (e.getMessage() != null && e.getMessage().contains("Document not found for update.")
+ && e.getMessage().contains("id="+id)) {
+ log.warn("Attempting a partial update for a recently deleted document, rejected id=" + id + ", " + e);
+ returnedVersion = Integer.MIN_VALUE;
+ } else throw e;
+ }
+ }
+
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (returnedVersion > currInfo.version) {
+ model.put(id, new DocInfo(returnedVersion, nextVal1, nextVal2));
+ }
+
+ }
+ }
+
+ if (!before) {
+ lastId = id;
+ }
+ }
+ } catch (Throwable e) {
+ operations.set(-1L);
+ log.error("", e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ threads.add(thread);
+
+ // nocommit: need a final pass over the model to check every doc
+ }
+
+ // Read threads
+ for (int i = 0; i < nReadThreads; i++) {
+ Thread thread = new Thread("READER" + i) {
+ Random rand = new Random(random().nextInt());
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+ try {
+ while (operations.decrementAndGet() >= 0) {
+ // bias toward a recently changed doc
+ int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
+
+ // when indexing, we update the index, then the model
+ // so when querying, we should first check the model, and then the index
+
+ boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
+ DocInfo info;
+
+ if (realTime) {
+ info = model.get(id);
+ } else {
+ synchronized (TestStressInPlaceUpdates.this) {
+ info = committedModel.get(id);
+ }
+ }
+
+ if (VERBOSE) {
+ log.info("querying id {}", id);
+ }
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ if (realTime) {
+ params.set("wt", "json");
+ params.set("qt", "/get");
+ params.set("ids", Integer.toString(id));
+ } else {
+ params.set("wt", "json");
+ params.set("q", "id:" + Integer.toString(id));
+ params.set("omitHeader", "true");
+ }
+
+ int clientId = rand.nextInt(clients.size());
+ if (!realTime) clientId = clientIndexUsedForCommit;
+
+ QueryResponse response = clients.get(clientId).query(params);
+ if (response.getResults().size() == 0) {
+ // there's no info we can get back with a delete, so not much we can check without further synchronization
+ } else if (response.getResults().size() == 1) {
+ assertNotNull("Realtime=" + realTime + ", Response is: " + response + ", model: " + info,
+ response.getResults().get(0).get("val2_l_dvo"));
+ assertNotNull("Realtime=" + realTime + ", Response is: " + response.getResults().get(0) + ", model: " + info + ", client="+((HttpSolrClient)clients.get(clientId)).getBaseURL()+", leaderClient="+((HttpSolrClient)leaderClient).getBaseURL(),
+ response.getResults().get(0).get("val1_i_dvo"));
+
+ Object obj1 = response.getResults().get(0).getFirstValue("val1_i_dvo");
+ int val1 = (Integer) obj1;
+ Object obj2 = response.getResults().get(0).getFirstValue("val2_l_dvo");
+ long val2 = (Long) obj2;
+ Object objVer = response.getResults().get(0).getFirstValue("_version_");
+ long foundVer = (Long) objVer;
+
+
+ if (!(val1 == 0 && val2 == 0 || val2 % val1 == 0)) {
+ assertTrue("Vals are: " + val1 + ", " + val2 + ", id=" + id + ", clientId=" + clients.get(clientId) + ", Doc retrived is: " + response.toString(),
+ val1 == 0 && val2 == 0 || val2 % val1 == 0);
+
+ }
+ if (foundVer < Math.abs(info.version)
+ || (foundVer == info.version && (val1 != info.intFieldValue || val2 != info.longFieldValue))) { // if the version matches, the val must
+ log.error("Realtime=" + realTime + ", ERROR, id=" + id + " found=" + response + " model=" + info + ", client="+((HttpSolrClient)clients.get(clientId)).getBaseURL()+", leaderClient="+((HttpSolrClient)leaderClient).getBaseURL());
+ assertTrue("Realtime=" + realTime + ", ERROR, id=" + id + " found=" + response + " model=" + info, false);
+ }
+ } else {
+ fail(String.format(Locale.ENGLISH, "There were more than one result: {}", response));
+ }
+ }
+ } catch (Throwable e) {
+ operations.set(-1L);
+ log.error("", e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+ // Start all threads
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ }
+
+ /**
+ * Used for storing the info for a document in an in-memory model.
+ */
+ private static class DocInfo {
+ long version;
+ int intFieldValue;
+ long longFieldValue;
+
+ public DocInfo(long version, int val1, long val2) {
+ this.version = version;
+ this.intFieldValue = val1;
+ this.longFieldValue = val2;
+ }
+
+ @Override
+ public String toString() {
+ return "[version=" + version + ", intValue=" + intFieldValue + ",longValue=" + longFieldValue + "]";
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ protected long addDocAndGetVersion(Object... fields) throws Exception {
+ SolrInputDocument doc = new SolrInputDocument();
+ addFields(doc, fields);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.add("versions", "true");
+
+ UpdateRequest ureq = new UpdateRequest();
+ ureq.setParams(params);
+ ureq.add(doc);
+ UpdateResponse resp;
+
+ // send updates to leader, to avoid SOLR-8733
+ resp = ureq.process(leaderClient);
+
+ long returnedVersion = Long.parseLong(((NamedList) resp.getResponse().get("adds")).getVal(0).toString());
+ assertTrue("Due to SOLR-8733, sometimes returned version is 0. Let us assert that we have successfully"
+ + " worked around that problem here.", returnedVersion > 0);
+ return returnedVersion;
+ }
+
+ @SuppressWarnings("rawtypes")
+ protected long deleteDocAndGetVersion(String id, ModifiableSolrParams params, boolean deleteByQuery) throws Exception {
+ params.add("versions", "true");
+
+ UpdateRequest ureq = new UpdateRequest();
+ ureq.setParams(params);
+ if (deleteByQuery) {
+ ureq.deleteByQuery("id:"+id);
+ } else {
+ ureq.deleteById(id);
+ }
+ UpdateResponse resp;
+ // send updates to leader, to avoid SOLR-8733
+ resp = ureq.process(leaderClient);
+
+ String key = deleteByQuery? "deleteByQuery": "deletes";
+ long returnedVersion = Long.parseLong(((NamedList) resp.getResponse().get(key)).getVal(0).toString());
+ assertTrue("Due to SOLR-8733, sometimes returned version is 0. Let us assert that we have successfully"
+ + " worked around that problem here.", returnedVersion < 0);
+ return returnedVersion;
+ }
+
+ /**
+ * Method gets the SolrClient for the leader replica. This is needed for a workaround for SOLR-8733.
+ */
+ public SolrClient getClientForLeader() throws KeeperException, InterruptedException {
+ ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+ cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
+ ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+ Replica leader = null;
+ Slice shard1 = clusterState.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1);
+ leader = shard1.getLeader();
+
+ for (int i = 0; i < clients.size(); i++) {
+ String leaderBaseUrl = zkStateReader.getBaseUrlForNodeName(leader.getNodeName());
+ if (((HttpSolrClient) clients.get(i)).getBaseURL().startsWith(leaderBaseUrl))
+ return clients.get(i);
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/test/org/apache/solr/search/TestRecovery.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/search/TestRecovery.java b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
index 12d3ec3..fa1e45f 100644
--- a/solr/core/src/test/org/apache/solr/search/TestRecovery.java
+++ b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
@@ -22,6 +22,7 @@ import static org.apache.solr.update.processor.DistributingUpdateProcessorFactor
import org.noggit.ObjectBuilder;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.schema.IndexSchema;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateHandler;
@@ -61,6 +62,12 @@ public class TestRecovery extends SolrTestCaseJ4 {
savedFactory = System.getProperty("solr.DirectoryFactory");
System.setProperty("solr.directoryFactory", "org.apache.solr.core.MockFSDirectoryFactory");
initCore("solrconfig-tlog.xml","schema15.xml");
+
+ // validate that the schema was not changed to an unexpected state
+ IndexSchema schema = h.getCore().getLatestSchema();
+ assertTrue(schema.getFieldOrNull("_version_").hasDocValues() && !schema.getFieldOrNull("_version_").indexed()
+ && !schema.getFieldOrNull("_version_").stored());
+
}
@AfterClass
@@ -87,6 +94,10 @@ public class TestRecovery extends SolrTestCaseJ4 {
@Test
public void testLogReplay() throws Exception {
+ // nocommit: we also need testing that replay works correctly when mixing in-place updates + deletes
+ // nocommit: both deletes by id, and DBQ (against pre/post in-place updates values)
+ // nocommit: see longer jira comment about this method
+
try {
DirectUpdateHandler2.commitOnClose = false;
@@ -113,7 +124,8 @@ public class TestRecovery extends SolrTestCaseJ4 {
versions.addFirst(addAndGetVersion(sdoc("id", "A12"), null));
versions.addFirst(deleteByQueryAndGetVersion("id:A11", null));
versions.addFirst(addAndGetVersion(sdoc("id", "A13"), null));
-
+ versions.addFirst(addAndGetVersion(sdoc("id", "A12", "val_i_dvo", map("set", 1)), null)); // atomic update
+ versions.addFirst(addAndGetVersion(sdoc("id", "A12", "val_i_dvo", map("set", 2)), null)); // in-place update
assertJQ(req("q","*:*"),"/response/numFound==0");
assertJQ(req("qt","/get", "getVersions",""+versions.size()) ,"/versions==" + versions);
@@ -138,6 +150,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
// wait until recovery has finished
assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
+ assertJQ(req("q","val_i_dvo:2") ,"/response/numFound==1"); // assert that in-place update is retained
assertJQ(req("q","*:*") ,"/response/numFound==3");
@@ -150,6 +163,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertU(adoc("id","A4"));
assertJQ(req("q","*:*") ,"/response/numFound==3");
+ assertJQ(req("q","val_i_dvo:2") ,"/response/numFound==1"); // assert that in-place update is retained
h.close();
createCore();
@@ -169,6 +183,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
// h.getCore().getUpdateHandler().getUpdateLog().recoverFromLog();
assertJQ(req("q","*:*") ,"/response/numFound==5");
+ assertJQ(req("q","val_i_dvo:2") ,"/response/numFound==1"); // assert that in-place update is retained
Thread.sleep(100);
assertEquals(permits, logReplay.availablePermits()); // no updates, so insure that recovery didn't run
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/test/org/apache/solr/update/HardAutoCommitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/HardAutoCommitTest.java b/solr/core/src/test/org/apache/solr/update/HardAutoCommitTest.java
index 3c652b2..e72d37d 100644
--- a/solr/core/src/test/org/apache/solr/update/HardAutoCommitTest.java
+++ b/solr/core/src/test/org/apache/solr/update/HardAutoCommitTest.java
@@ -48,6 +48,10 @@ public class HardAutoCommitTest extends AbstractSolrTestCase {
clearIndex();
// reload the core to clear stats
h.getCoreContainer().reload(h.getCore().getName());
+
+ // nocommit: why was this line added in last patch? .. seems fine, but also unrelated to issue?
+ // nocommit: addition seems fishy: was this a cut/paste mistake ment for one/all of the other tests modified by this patch?
+ assertFalse(h.getCore().getSolrConfig().getUpdateHandlerInfo().commitWithinSoftCommit);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
index 8f3a89a..624cea1 100644
--- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
+++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
@@ -16,21 +16,29 @@
*/
package org.apache.solr.update;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
-import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+import static org.junit.internal.matchers.StringContains.containsString;
@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
public class PeerSyncTest extends BaseDistributedSearchTestCase {
@@ -46,11 +54,24 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
// TODO: a better way to do this?
configString = "solrconfig-tlog.xml";
schemaString = "schema.xml";
+
+ // validate that the schema was not changed to an unexpected state
+ try {
+ initCore(configString, schemaString);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ IndexSchema schema = h.getCore().getLatestSchema();
+ assertTrue(schema.getFieldOrNull("_version_").hasDocValues() && !schema.getFieldOrNull("_version_").indexed()
+ && !schema.getFieldOrNull("_version_").stored());
+ assertTrue(!schema.getFieldOrNull("val_i_dvo").indexed() && !schema.getFieldOrNull("val_i_dvo").stored() &&
+ schema.getFieldOrNull("val_i_dvo").hasDocValues());
}
@Test
@ShardsFixed(num = 3)
public void test() throws Exception {
+ Set<Integer> docsAdded = new LinkedHashSet<>();
handle.clear();
handle.put("timestamp", SKIPVAL);
handle.put("score", SKIPVAL);
@@ -91,14 +112,17 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
add(client0, seenLeader, addRandFields(sdoc("id","8","_version_",++v)));
add(client0, seenLeader, addRandFields(sdoc("id","9","_version_",++v)));
add(client0, seenLeader, addRandFields(sdoc("id","10","_version_",++v)));
-
+ for (int i=0; i<10; i++) docsAdded.add(i+1);
assertSync(client1, numVersions, true, shardsArr[0]);
- client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*"), client0, client1);
+ client0.commit(); client1.commit();
+ QueryResponse qacResponse = queryAndCompare(params("q", "*:*", "rows", "10000"), client0, client1);
+ validateQACResponse(docsAdded, qacResponse);
int toAdd = (int)(numVersions *.95);
for (int i=0; i<toAdd; i++) {
add(client0, seenLeader, sdoc("id",Integer.toString(i+11),"_version_",v+i+1));
+ docsAdded.add(i+11);
}
// sync should fail since there's not enough overlap to give us confidence
@@ -111,19 +135,24 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
}
assertSync(client1, numVersions, true, shardsArr[0]);
- client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
+ client0.commit(); client1.commit();
+ qacResponse = queryAndCompare(params("q", "*:*", "rows", "10000", "sort","_version_ desc"), client0, client1);
+ validateQACResponse(docsAdded, qacResponse);
// test delete and deleteByQuery
v=1000;
- add(client0, seenLeader, sdoc("id","1000","_version_",++v));
+ SolrInputDocument doc = sdoc("id","1000","_version_",++v);
+ add(client0, seenLeader, doc);
add(client0, seenLeader, sdoc("id","1001","_version_",++v));
delQ(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "id:1001 OR id:1002");
add(client0, seenLeader, sdoc("id","1002","_version_",++v));
del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "1000");
+ docsAdded.add(1002); // 1002 added
assertSync(client1, numVersions, true, shardsArr[0]);
- client0.commit(); client1.commit();
- queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
+ client0.commit(); client1.commit();
+ qacResponse = queryAndCompare(params("q", "*:*", "rows", "10000", "sort","_version_ desc"), client0, client1);
+ validateQACResponse(docsAdded, qacResponse);
// test that delete by query is returned even if not requested, and that it doesn't delete newer stuff than it should
v=2000;
@@ -133,6 +162,7 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
delQ(client, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "id:2001 OR id:2002");
add(client, seenLeader, sdoc("id","2002","_version_",++v));
del(client, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "2000");
+ docsAdded.add(2002); // 2002 added
v=2000;
client = client1;
@@ -144,7 +174,9 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
del(client, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "2000");
assertSync(client1, numVersions, true, shardsArr[0]);
- client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
+ client0.commit(); client1.commit();
+ qacResponse = queryAndCompare(params("q", "*:*", "rows", "10000", "sort","_version_ desc"), client0, client1);
+ validateQACResponse(docsAdded, qacResponse);
//
// Test that handling reorders work when applying docs retrieved from peer
@@ -155,6 +187,7 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
add(client0, seenLeader, sdoc("id","3000","_version_",3001));
add(client1, seenLeader, sdoc("id","3000","_version_",3001));
del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","3000"), "3000");
+ docsAdded.add(3000);
// this should cause us to retrieve an add tha was previously deleted
add(client0, seenLeader, sdoc("id","3001","_version_",3003));
@@ -165,17 +198,23 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
add(client0, seenLeader, sdoc("id","3002","_version_",3004));
add(client0, seenLeader, sdoc("id","3002","_version_",3005));
add(client1, seenLeader, sdoc("id","3002","_version_",3005));
-
+ docsAdded.add(3001); // 3001 added
+ docsAdded.add(3002); // 3002 added
+
assertSync(client1, numVersions, true, shardsArr[0]);
- client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
+ client0.commit(); client1.commit();
+ qacResponse = queryAndCompare(params("q", "*:*", "rows", "10000", "sort","_version_ desc"), client0, client1);
+ validateQACResponse(docsAdded, qacResponse);
// now lets check fingerprinting causes appropriate fails
v = 4000;
add(client0, seenLeader, sdoc("id",Integer.toString((int)v),"_version_",v));
+ docsAdded.add(4000);
toAdd = numVersions+10;
for (int i=0; i<toAdd; i++) {
add(client0, seenLeader, sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
add(client1, seenLeader, sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
+ docsAdded.add((int)v+i+1);
}
// client0 now has an additional add beyond our window and the fingerprint should cause this to fail
@@ -198,14 +237,96 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
add(client0, seenLeader, sdoc("id", Integer.toString((int) v + i + 1), "_version_", v + i + 1));
}
assertSync(client1, numVersions, true, shardsArr[0]);
+
+ client0.commit(); client1.commit();
+ qacResponse = queryAndCompare(params("q", "*:*", "rows", "10000", "sort","_version_ desc"), client0, client1);
+ validateQACResponse(docsAdded, qacResponse);
+
+ // lets add some in-place updates
+ // v = 5000; // nocommit: dead code?
+ add(client0, seenLeader, sdoc("id", "5000", "val_i_dvo", 0, "title", "mytitle", "_version_", 5000)); // full update
+ docsAdded.add(5000);
+ assertSync(client1, numVersions, true, shardsArr[0]);
+ // verify the in-place updated document (id=5000) has correct fields
+ assertEquals(0, client1.getById("5000").get("val_i_dvo"));
+ assertEquals(client0.getById("5000")+" and "+client1.getById("5000"),
+ "mytitle", client1.getById("5000").getFirstValue("title"));
+
+ ModifiableSolrParams inPlaceParams = new ModifiableSolrParams(seenLeader);
+ inPlaceParams.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, "5000");
+ add(client0, inPlaceParams, sdoc("id", "5000", "val_i_dvo", 1, "_version_", 5001)); // in-place update
+ assertSync(client1, numVersions, true, shardsArr[0]);
+ // verify the in-place updated document (id=5000) has correct fields
+ assertEquals(1, client1.getById("5000").get("val_i_dvo"));
+ assertEquals(client0.getById("5000")+" and "+client1.getById("5000"),
+ "mytitle", client1.getById("5000").getFirstValue("title"));
+
+ // interleave the in-place updates with a few deletes to other documents
+ del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","5002"), 4001);
+ delQ(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","5003"), "id:4002");
+ docsAdded.remove(4001);
+ docsAdded.remove(4002);
+
+ inPlaceParams.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, "5001");
+ add(client0, inPlaceParams, sdoc("id", 5000, "val_i_dvo", 2, "_version_", 5004)); // in-place update
+ assertSync(client1, numVersions, true, shardsArr[0]);
+ // verify the in-place updated document (id=5000) has correct fields
+ assertEquals(2, client1.getById("5000").get("val_i_dvo"));
+ assertEquals(client0.getById("5000")+" and "+client1.getById("5000"),
+ "mytitle", client1.getById("5000").getFirstValue("title"));
- }
+ // a DBQ with value
+ delQ(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","5005"), "val_i_dvo:1"); // current val is 2, so this should not delete anything
+ assertSync(client1, numVersions, true, shardsArr[0]);
+ boolean deleteTheUpdatedDocument = random().nextBoolean();
+ if (deleteTheUpdatedDocument) { // if doc with id=5000 is deleted, further in-place-updates should fail
+ delQ(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","5006"), "val_i_dvo:2"); // current val is 2, this will delete id=5000
+ assertSync(client1, numVersions, true, shardsArr[0]);
+ SolrException ex = expectThrows(SolrException.class, () -> {
+ inPlaceParams.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, "5004");
+ add(client0, inPlaceParams, sdoc("id", 5000, "val_i_dvo", 3, "_version_", 5007));
+ });
+ assertEquals(ex.toString(), SolrException.ErrorCode.SERVER_ERROR.code, ex.code());
+ assertThat(ex.getMessage(), containsString("Can't find document with id=5000"));
+ } else {
+ inPlaceParams.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, "5004");
+ add(client0, inPlaceParams, sdoc("id", 5000, "val_i_dvo", 3, "_version_", 5006));
+ assertSync(client1, numVersions, true, shardsArr[0]);
+
+ // verify the in-place updated document (id=5000) has correct fields
+ assertEquals(3, client1.getById("5000").get("val_i_dvo"));
+ assertEquals(client0.getById("5000")+" and "+client1.getById("5000"),
+ "mytitle", client1.getById("5000").getFirstValue("title"));
+
+ if (random().nextBoolean()) {
+ client0.commit(); client1.commit();
+ qacResponse = queryAndCompare(params("q", "*:*", "rows", "10000", "sort","_version_ desc"), client0, client1);
+ validateQACResponse(docsAdded, qacResponse);
+ }
+ del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","5007"), 5000);
+ docsAdded.remove(5000);
+ assertSync(client1, numVersions, true, shardsArr[0]);
+
+ client0.commit(); client1.commit();
+ qacResponse = queryAndCompare(params("q", "*:*", "rows", "10000", "sort","_version_ desc"), client0, client1);
+ validateQACResponse(docsAdded, qacResponse);
+ }
+ }
void assertSync(SolrClient client, int numVersions, boolean expectedResult, String... syncWith) throws IOException, SolrServerException {
QueryRequest qr = new QueryRequest(params("qt","/get", "getVersions",Integer.toString(numVersions), "sync", StrUtils.join(Arrays.asList(syncWith), ',')));
NamedList rsp = client.request(qr);
assertEquals(expectedResult, (Boolean) rsp.get("sync"));
}
+
+ void validateQACResponse(Set<Integer> docsAdded, QueryResponse qacResponse) {
+ Set<Integer> qacDocs = new LinkedHashSet<>();
+ for (int i=0; i<qacResponse.getResults().size(); i++) {
+ qacDocs.add(Integer.parseInt(qacResponse.getResults().get(i).getFieldValue("id").toString()));
+ }
+ assertEquals(docsAdded, qacDocs);
+ assertEquals(docsAdded.size(), qacResponse.getResults().getNumFound());
+ }
}
[2/4] lucene-solr:jira/solr-5944: SOLR-5944 Initial import into branch
Posted by is...@apache.org.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ae359a6/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
new file mode 100644
index 0000000..0f5f876
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
@@ -0,0 +1,998 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.request.schema.SchemaRequest.Field;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.client.solrj.response.schema.SchemaResponse.FieldResponse;
+import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.RefCounted;
+import org.apache.zookeeper.KeeperException;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests the in-place updates (docValues updates) for a one shard, three replica cluster.
+ */
+@Slow
+public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @BeforeClass
+ public static void beforeSuperClass() throws Exception {
+
+ // nocommit: does this test need to randomize between diff schema/fields used?
+ // nocommit: see nocommits/jira questions related to special dynamicField logic in AtomicUpdateDocumentMerger.isInPlaceUpdate
+
+ schemaString = "schema-inplace-updates.xml";
+ configString = "solrconfig-tlog.xml";
+
+ // sanity check that autocommits are disabled
+ initCore(configString, schemaString);
+ assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxTime);
+ assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoSoftCommmitMaxTime);
+ assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxDocs);
+ assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoSoftCommmitMaxDocs);
+ }
+
+ public TestInPlaceUpdatesDistrib() throws Exception {
+ super();
+ sliceCount = 1;
+ fixShardCount(3);
+ }
+
+ private SolrClient LEADER = null;
+ private List<SolrClient> NONLEADERS = null;
+
+ @Test
+ @ShardsFixed(num = 3)
+ @SuppressWarnings("unchecked")
+ public void test() throws Exception {
+ waitForRecoveriesToFinish(true);
+ mapReplicasToClients();
+
+ // sanity check no one broke the assumptions we make about our schema
+ checkExpectedSchemaField(map("name", "inplace_updatable_int",
+ "type","int",
+ "stored",Boolean.FALSE,
+ "indexed",Boolean.FALSE,
+ "docValues",Boolean.TRUE,
+ "default","0"));
+ checkExpectedSchemaField(map("name", "inplace_updatable_float",
+ "type","float",
+ "stored",Boolean.FALSE,
+ "indexed",Boolean.FALSE,
+ "docValues",Boolean.TRUE,
+ "default","0"));
+ checkExpectedSchemaField(map("name", "_version_",
+ "type","long",
+ "stored",Boolean.FALSE,
+ "indexed",Boolean.FALSE,
+ "docValues",Boolean.TRUE));
+
+ // Do the tests now:
+ outOfOrderDBQsTest();
+ docValuesUpdateTest();
+ ensureRtgWorksWithPartialUpdatesTest();
+ delayedReorderingFetchesMissingUpdateFromLeaderTest();
+ outOfOrderUpdatesIndividualReplicaTest();
+ outOfOrderDeleteUpdatesIndividualReplicaTest();
+ reorderedDBQsWithInPlaceUpdatesShouldNotThrowReplicaInLIRTest();
+ }
+
+ private void mapReplicasToClients() throws KeeperException, InterruptedException {
+ ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+ cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
+ ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+ Replica leader = null;
+ Slice shard1 = clusterState.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1);
+ leader = shard1.getLeader();
+
+ String leaderBaseUrl = zkStateReader.getBaseUrlForNodeName(leader.getNodeName());
+ for (int i=0; i<clients.size(); i++) {
+ if (((HttpSolrClient)clients.get(i)).getBaseURL().startsWith(leaderBaseUrl))
+ LEADER = clients.get(i);
+ }
+
+ NONLEADERS = new ArrayList<>();
+ for (Replica rep: shard1.getReplicas()) {
+ if (rep.equals(leader)) {
+ continue;
+ }
+ String baseUrl = zkStateReader.getBaseUrlForNodeName(rep.getNodeName());
+ for (int i=0; i<clients.size(); i++) {
+ if (((HttpSolrClient)clients.get(i)).getBaseURL().startsWith(baseUrl))
+ NONLEADERS.add(clients.get(i));
+ }
+ }
+
+ assertNotNull(LEADER);
+ assertEquals(2, NONLEADERS.size());
+ }
+
+ final int NUM_RETRIES = 100, WAIT_TIME = 10;
+
+ // The following should work: full update to doc 0, in-place update for doc 0, delete doc 0
+ private void outOfOrderDBQsTest() throws Exception {
+ // nocommit: need to build randomized index with doc we care about mixed into the index randomly
+ // nocommit: see jira comments for in-depth suggestions
+
+ del("*:*");
+ commit();
+
+ index("id", 0, "title_s", "title0", "id_i", 0);
+ commit();
+
+ float inplace_updatable_float = 1;
+
+ // update doc, set
+ index("id", 0, "inplace_updatable_float", map("set", inplace_updatable_float));
+
+ LEADER.commit();
+ SolrDocument sdoc = LEADER.getById("0"); // RTG straight from the index
+ assertEquals(inplace_updatable_float, sdoc.get("inplace_updatable_float"));
+ assertEquals("title0", sdoc.get("title_s"));
+ long version0 = (long) sdoc.get("_version_");
+
+ // put replica out of sync
+ float newinplace_updatable_float = 100;
+ List<UpdateRequest> updates = new ArrayList<>();
+ updates.add(simulatedUpdateRequest(null, "id", 0, "title_s", "title0_new", "inplace_updatable_float", newinplace_updatable_float, "_version_", version0 + 1)); // full update
+ updates.add(simulatedUpdateRequest(version0 + 1, "id", 0, "inplace_updatable_float", newinplace_updatable_float + 1, "_version_", version0 + 2)); // inplace_updatable_float=101
+ updates.add(simulatedDeleteRequest(0, version0 + 3));
+
+ // order the updates correctly for NONLEADER 1
+ for (UpdateRequest update : updates) {
+ log.info("Issuing well ordered update: " + update.getDocuments());
+ NONLEADERS.get(1).request(update);
+ }
+
+ // Reordering needs to happen using parallel threads
+ ExecutorService threadpool =
+ ExecutorUtil.newMDCAwareFixedThreadPool(updates.size() + 1, new DefaultSolrThreadFactory(getTestName()));
+
+ // re-order the updates for NONLEADER 0
+ List<UpdateRequest> reorderedUpdates = new ArrayList<>(updates);
+ Collections.shuffle(reorderedUpdates, random());
+ List<Future<UpdateResponse>> updateResponses = new ArrayList<>();
+ for (UpdateRequest update : reorderedUpdates) {
+ AsyncUpdateWithRandomCommit task = new AsyncUpdateWithRandomCommit(update, NONLEADERS.get(0), random().nextLong());
+ updateResponses.add(threadpool.submit(task));
+ // while we can't guarantee/trust what order the updates are executed in, since multiple threads
+ // are involved, but we're trying to bias the thread scheduling to run them in the order submitted
+ Thread.sleep(10);
+ }
+
+ threadpool.shutdown();
+ assertTrue("Thread pool didn't terminate within 10 secs", threadpool.awaitTermination(10, TimeUnit.SECONDS));
+
+ // assert all requests were successful
+ for (Future<UpdateResponse> resp: updateResponses) {
+ assertEquals(0, resp.get().getStatus());
+ }
+
+ // assert both replicas have same effect
+ for (SolrClient client : NONLEADERS) { // 0th is re-ordered replica, 1st is well-ordered replica
+ SolrDocument doc = client.getById(String.valueOf(0), params("distrib", "false"));
+ assertNull("This doc was supposed to have been deleted, but was: " + doc, doc);
+ }
+
+ log.info("outOfOrderDeleteUpdatesIndividualReplicaTest: This test passed fine...");
+ del("*:*");
+ commit();
+ }
+
+ private void docValuesUpdateTest() throws Exception {
+ del("*:*");
+ commit();
+
+ int numDocs = atLeast(100);
+ log.info("Trying num docs = " + numDocs);
+
+ // nocommit: reuse randomized index building code refactored out of ensureRtgWorksWithPartialUpdatesTest ?
+ //
+ // nocommit: doing that and changing rest of method to only use a subset of docs would help
+ // nocommit: to ensure we don't hit have any edge case bugs where this test only passes because
+ // nocommit: all docs are updated (or all in a segment, etc...)
+ for (int i = 0; i < numDocs; i++) {
+ index("id", i, "title_s", "title" + i, "id_i", i, "inplace_updatable_float", "101.0");
+ }
+ commit();
+
+ List<Integer> luceneDocids = new ArrayList<>();
+ ModifiableSolrParams params = params("q", "*:*", "fl", "*,[docid]", "rows", String.valueOf(numDocs), "sort", "id_i asc");
+ SolrDocumentList results = LEADER.query(params).getResults();
+ assertEquals(numDocs, results.size());
+ for (SolrDocument doc : results) {
+ luceneDocids.add((int) doc.get("[docid]"));
+ }
+ log.info("Initial results: "+results);
+ List<Float> valuesList = new ArrayList<Float>();
+ for (int i = 0; i < numDocs; i++) {
+ valuesList.add(r.nextFloat()*5.0f);
+ }
+ log.info("inplace_updatable_float: "+valuesList);
+ // update doc, set
+ for (int i = numDocs - 1; i >= 0; i--) {
+ index("id", i, "inplace_updatable_float", map("set", valuesList.get(i)));
+ }
+
+ commit();
+
+ // Keep querying until the commit is distributed and expected number of results are obtained
+ boolean expectedResults = false;
+ long numFound = 0;
+ // nocommit: need to loop to check all (non-leader) clients -- see jira for elabroration
+ SolrClient clientToTest = clients.get(random().nextInt(clients.size()));
+ for (int retries=0; retries < NUM_RETRIES; retries++) {
+ log.info("Attempt: "+retries+", Results: "+results);
+
+ // nocommit: useless validation query since every doc starts off with a value when index is built
+ // nocommit: see jira comment for more detailed suggestion on fix
+ numFound = clientToTest.query(
+ params("q", "inplace_updatable_float:[* TO *]")).getResults().getNumFound();
+ if (numFound != numDocs) {
+ Thread.sleep(WAIT_TIME);
+ } else {
+ expectedResults = true;
+ break;
+ }
+ }
+ if (!expectedResults) {
+ fail("Waited " + (NUM_RETRIES*WAIT_TIME/1000) + " secs, but not obtained expected results. numFound: "
+ + numFound + ", numDocs: " + numDocs);
+ }
+
+ // nocommit: picking random from "clients" means LEADER is included, making tests uselss 1/3 the time
+ // nocommit: need to loop to check all (non-leader) clients -- see jira for elabroration
+ results = clients.get(random().nextInt(clients.size())).query(params).getResults();
+ assertTrue(matchResults(results, luceneDocids, valuesList));
+
+ // update doc, increment
+ // ensure all ratings have a value such that abs(ratings) < X,
+ // then the second update can use an increment such that abs(inc) > X*3
+ // and we can use -ratings:[-X TO X] as the query in a retry loop
+ log.info("Updating the documents...");
+ // nocommit: what if min(valuesList) and max(valuesList) are both negative?
+ // nocommit: don't we need the abs of both values?
+ // nocommit: see jira comment for longer discussion about eliminating need to find X dynamically
+ float X = Math.max(Collections.max(valuesList), Math.abs(Collections.min(valuesList)));
+ for (int i = 0; i < numDocs; i++) {
+ int inc = r.nextBoolean()? (int)(Math.ceil(X*3)) + r.nextInt(1000): (int)(-Math.ceil(X*3)) - r.nextInt(1000);
+ valuesList.set(i, valuesList.get(i) + inc);
+ index("id", i, "inplace_updatable_float", map("inc", inc));
+ }
+ commit();
+
+ // Keep querying until the commit is distributed and expected results are obtained
+ expectedResults = false;
+ for (int retries=0; retries < NUM_RETRIES; retries++) {
+ log.info("Attempt: "+retries+", Results: "+results);
+ numFound = clientToTest.query(
+ params("q", "+inplace_updatable_float:[* TO *] -inplace_updatable_float:[-"+X+" TO "+X+"]"))
+ .getResults().getNumFound();
+ if (numFound != numDocs) {
+ Thread.sleep(WAIT_TIME);
+ } else {
+ expectedResults = true;
+ break;
+ }
+ }
+ if (!expectedResults) {
+ fail("Waited " + (NUM_RETRIES*WAIT_TIME/1000) + " secs, but not obtained expected results. numFound: "
+ + numFound + ", numDocs: " + numDocs);
+ }
+ // nocommit: picking random from "clients" means LEADER is included, making tests uselss 1/3 the time
+ // nocommit: need to loop to check all (non-leader) clients -- see jira for elabroration
+ results = clients.get(random().nextInt(clients.size())).query(params).getResults();
+ assertTrue(matchResults(results, luceneDocids, valuesList));
+ }
+
+ /**
+ * Return true if the (same ordered) luceneDocids & ratings match the results
+ */
+ private boolean matchResults(SolrDocumentList results, List<Integer> luceneDocids, List<Float> valuesList) {
+ // nocommit: method should just assert expected values, not return boolean
+ // nocommit: simplifies caller code, and protects against risk of missuse in future (tests forgeting to check return value)
+ // nocommit: should rename something like "assertDocIdsAndValuesInResults"
+
+ int counter = 0;
+ if (luceneDocids.size() == results.size()) {
+ for (SolrDocument doc : results) {
+ float r = (float) doc.get("inplace_updatable_float");
+ int id = (int) doc.get("[docid]");
+
+ if (!(luceneDocids.get(counter).intValue() == id &&
+ Math.abs(valuesList.get(counter).floatValue() - r) <= 0.001)) {
+ return false;
+ }
+ counter++;
+ }
+ } else {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ private void ensureRtgWorksWithPartialUpdatesTest() throws Exception {
+ del("*:*");
+ commit();
+
+ float inplace_updatable_float = 1;
+ String title = "title100";
+ long version = 0, currentVersion;
+
+ // nocommit: BEGIN: refactor this "build random index" code into a helper method for use by all test methods
+ // Adding random number of docs before adding the doc to be tested
+ int numDocsBefore = random().nextInt(1000);
+ for (int i=0; i<numDocsBefore; i++) {
+ index("id", 1000 + i, "title_s", "title" + (1000+i), "id_i", 1000 + i);
+ }
+
+ // Document to be tested
+ currentVersion = addDocAndGetVersion("id", 100, "title_s", "title100", "id_i", 100);
+ assertTrue(currentVersion > version);
+ version = currentVersion;
+
+ // Adding random number of docs after adding the doc to be tested
+ int numDocsAfter = random().nextInt(1000);
+ for (int i=0; i<numDocsAfter; i++) {
+ index("id", 5000 + i, "title_s", "title" + (5000+i), "id_i", 5000 + i);
+ }
+
+ LEADER.commit();
+ // nocommit: END: refactor this "build random index" code into a helper method for use by all test methods
+
+ // get the internal docids of id=100 document from the three replicas
+ List<Integer> docids = getInternalDocIds("100");
+
+ // update doc, set
+ currentVersion = addDocAndGetVersion("id", 100, "inplace_updatable_float", map("set", inplace_updatable_float));
+ assertTrue(currentVersion > version);
+ version = currentVersion;
+ LEADER.commit();
+ assertTrue("Earlier: "+docids+", now: "+getInternalDocIds("100"), docids.equals(getInternalDocIds("100")));
+
+ SolrDocument sdoc = LEADER.getById("100"); // RTG straight from the index
+ assertEquals(sdoc.toString(), (float) inplace_updatable_float, sdoc.get("inplace_updatable_float"));
+ assertEquals(sdoc.toString(), "title100", sdoc.get("title_s"));
+ assertEquals(sdoc.toString(), version, sdoc.get("_version_"));
+
+ if(random().nextBoolean()) {
+ title = "newtitle100";
+ currentVersion = addDocAndGetVersion("id", 100, "title_s", title, "inplace_updatable_float", inplace_updatable_float); // full indexing
+ assertTrue(currentVersion > version);
+ version = currentVersion;
+ docids = getInternalDocIds("100");
+ }
+
+ inplace_updatable_float++;
+ currentVersion = addDocAndGetVersion("id", 100, "inplace_updatable_float", map("inc", 1));
+ assertTrue(currentVersion > version);
+ version = currentVersion;
+ LEADER.commit();
+ assertTrue("Earlier: "+docids+", now: "+getInternalDocIds("100"), docids.equals(getInternalDocIds("100")));
+
+ currentVersion = addDocAndGetVersion("id", 100, "inplace_updatable_int", map("set", "100"));
+ assertTrue(currentVersion > version);
+ version = currentVersion;
+
+ inplace_updatable_float++;
+ currentVersion = addDocAndGetVersion("id", 100, "inplace_updatable_float", map("inc", 1));
+ assertTrue(currentVersion > version);
+ version = currentVersion;
+
+ // RTG from tlog
+ // nocommit: picking random from "clients" means LEADER is included, making tests uselss 1/3 the time
+ sdoc = clients.get(random().nextInt(clients.size())).getById("100");
+
+ assertEquals(sdoc.toString(), (int) 100, sdoc.get("inplace_updatable_int"));
+ assertEquals(sdoc.toString(), (float) inplace_updatable_float, sdoc.get("inplace_updatable_float"));
+ assertEquals(sdoc.toString(), title, sdoc.get("title_s"));
+ assertEquals(sdoc.toString(), version, sdoc.get("_version_"));
+
+ // assert that the internal docid for id=100 document remains same, in each replica, as before
+ LEADER.commit();
+ assertTrue("Earlier: "+docids+", now: "+getInternalDocIds("100"), docids.equals(getInternalDocIds("100")));
+ }
+
+ private List<Integer> getInternalDocIds(String id) throws IOException {
+ // nocommit: no reason for ths method to use SolrIndexSearcher directly -- will make converting to cloud test later hard
+ // nocommit: should just use RTG w/fl=[docid] to check values
+ // nocommit: seems like method created because of SOLR-9289, but that's been fixed for a while.
+
+ List<Integer> ret = new ArrayList<>();
+ for (CloudJettyRunner jetty: shardToJetty.get(SHARD1)) {
+ try (SolrCore core = jetty.jetty.getCoreContainer().getCore(DEFAULT_TEST_COLLECTION_NAME)) {
+ RefCounted<SolrIndexSearcher> holder = core.openNewSearcher(false, true);
+ try {
+ SolrIndexSearcher searcher = holder.get();
+ int docId = searcher.search(new TermQuery(new Term("id", id)), 1).scoreDocs[0].doc;
+ ret.add(docId);
+ // debug: System.out.println("gIDI: "+searcher.doc(docId));
+ } finally {
+ holder.decref();
+ }
+ }
+ }
+ return ret;
+ }
+
+ private void outOfOrderUpdatesIndividualReplicaTest() throws Exception {
+ // nocommit: adding doc we're going to update before doing index randomization defeats the point
+ // nocommit: see jira comments for in-depth suggestions
+
+ del("*:*");
+ commit();
+
+ index("id", 0, "title_s", "title0", "id_i", 0);
+ commit();
+
+ float inplace_updatable_float = 1;
+
+ // Adding random number of docs before adding the doc to be tested
+ int numDocsBefore = random().nextInt(1000);
+ for (int i=0; i<numDocsBefore; i++) {
+ index("id", 1000 + i, "title_s", "title" + (1000+i), "id_i", 1000 + i);
+ }
+
+ // update doc, set
+ index("id", 0, "inplace_updatable_float", map("set", inplace_updatable_float));
+
+ // Adding random number of docs after adding the doc to be tested
+ int numDocsAfter = random().nextInt(1000);
+ for (int i=0; i<numDocsAfter; i++) {
+ index("id", 5000 + i, "title_s", "title" + (5000+i), "id_i", 5000 + i);
+ }
+
+ LEADER.commit();
+ SolrDocument sdoc = LEADER.getById("0"); // RTG straight from the index
+ assertEquals(inplace_updatable_float, sdoc.get("inplace_updatable_float"));
+ assertEquals("title0", sdoc.get("title_s"));
+ long version0 = (long) sdoc.get("_version_");
+
+ // put replica out of sync
+ float newinplace_updatable_float = 100;
+ List<UpdateRequest> updates = new ArrayList<>();
+ updates.add(simulatedUpdateRequest(null, "id", 0, "title_s", "title0_new", "inplace_updatable_float", newinplace_updatable_float, "_version_", version0 + 1)); // full update
+ for (int i=1; i<atLeast(3); i++) {
+ updates.add(simulatedUpdateRequest(version0 + i, "id", 0, "inplace_updatable_float", newinplace_updatable_float + i, "_version_", version0 + i + 1));
+ }
+
+ // order the updates correctly for NONLEADER 1
+ for (UpdateRequest update : updates) {
+ log.info("Issuing well ordered update: " + update.getDocuments());
+ NONLEADERS.get(1).request(update);
+ }
+
+ // Reordering needs to happen using parallel threads, since some of these updates will
+ // be blocking calls, waiting for some previous updates to arrive on which it depends.
+ ExecutorService threadpool =
+ ExecutorUtil.newMDCAwareFixedThreadPool(updates.size() + 1, new DefaultSolrThreadFactory(getTestName()));
+
+ // re-order the updates for NONLEADER 0
+ List<UpdateRequest> reorderedUpdates = new ArrayList<>(updates);
+ Collections.shuffle(reorderedUpdates, random());
+ List<Future<UpdateResponse>> updateResponses = new ArrayList<>();
+ for (UpdateRequest update : reorderedUpdates) {
+ AsyncUpdateWithRandomCommit task = new AsyncUpdateWithRandomCommit(update, NONLEADERS.get(0), random().nextLong());
+ updateResponses.add(threadpool.submit(task));
+ // while we can't guarantee/trust what order the updates are executed in, since multiple threads
+ // are involved, but we're trying to bias the thread scheduling to run them in the order submitted
+ Thread.sleep(10);
+ }
+
+ threadpool.shutdown();
+ assertTrue("Thread pool didn't terminate within 10 secs", threadpool.awaitTermination(10, TimeUnit.SECONDS));
+
+ // assert all requests were successful
+ for (Future<UpdateResponse> resp: updateResponses) {
+ assertEquals(0, resp.get().getStatus());
+ }
+
+ // assert both replicas have same effect
+ for (SolrClient client : NONLEADERS) { // 0th is re-ordered replica, 1st is well-ordered replica
+ log.info("Testing client: " + ((HttpSolrClient)client).getBaseURL());
+ assertReplicaValue(client, 0, "inplace_updatable_float", (newinplace_updatable_float + (float)(updates.size() - 1)),
+ "inplace_updatable_float didn't match for replica at client: " + ((HttpSolrClient)client).getBaseURL());
+ assertReplicaValue(client, 0, "title_s", "title0_new",
+ "Title didn't match for replica at client: " + ((HttpSolrClient)client).getBaseURL());
+ assertEquals(version0 + updates.size(), getReplicaValue(client, 0, "_version_"));
+ }
+
+ log.info("outOfOrderUpdatesIndividualReplicaTest: This test passed fine...");
+ del("*:*");
+ commit();
+ }
+
+ // The following should work: full update to doc 0, in-place update for doc 0, delete doc 0
+ private void outOfOrderDeleteUpdatesIndividualReplicaTest() throws Exception {
+ // nocommit: need to build randomized index with doc we care about mixed into the index randomly
+ // nocommit: see jira comments for in-depth suggestions
+
+ del("*:*");
+ commit();
+
+ index("id", 0, "title_s", "title0", "id_i", 0);
+ commit();
+
+ float inplace_updatable_float = 1;
+
+ // update doc, set
+ index("id", 0, "inplace_updatable_float", map("set", inplace_updatable_float));
+
+ LEADER.commit();
+ SolrDocument sdoc = LEADER.getById("0"); // RTG straight from the index
+ assertEquals(inplace_updatable_float, sdoc.get("inplace_updatable_float"));
+ assertEquals("title0", sdoc.get("title_s"));
+ long version0 = (long) sdoc.get("_version_");
+
+ // put replica out of sync
+ float newinplace_updatable_float = 100;
+ List<UpdateRequest> updates = new ArrayList<>();
+ updates.add(simulatedUpdateRequest(null, "id", 0, "title_s", "title0_new", "inplace_updatable_float", newinplace_updatable_float, "_version_", version0 + 1)); // full update
+ updates.add(simulatedUpdateRequest(version0 + 1, "id", 0, "inplace_updatable_float", newinplace_updatable_float + 1, "_version_", version0 + 2)); // inplace_updatable_float=101
+ updates.add(simulatedDeleteRequest(0, version0 + 3));
+
+ // order the updates correctly for NONLEADER 1
+ for (UpdateRequest update : updates) {
+ log.info("Issuing well ordered update: " + update.getDocuments());
+ NONLEADERS.get(1).request(update);
+ }
+
+ // Reordering needs to happen using parallel threads
+ ExecutorService threadpool =
+ ExecutorUtil.newMDCAwareFixedThreadPool(updates.size() + 1, new DefaultSolrThreadFactory(getTestName()));
+
+ // re-order the updates for NONLEADER 0
+ List<UpdateRequest> reorderedUpdates = new ArrayList<>(updates);
+ Collections.shuffle(reorderedUpdates, random());
+ List<Future<UpdateResponse>> updateResponses = new ArrayList<>();
+ for (UpdateRequest update : reorderedUpdates) {
+ AsyncUpdateWithRandomCommit task = new AsyncUpdateWithRandomCommit(update, NONLEADERS.get(0), random().nextLong());
+ updateResponses.add(threadpool.submit(task));
+ // while we can't guarantee/trust what order the updates are executed in, since multiple threads
+ // are involved, but we're trying to bias the thread scheduling to run them in the order submitted
+ Thread.sleep(10);
+ }
+
+ threadpool.shutdown();
+ assertTrue("Thread pool didn't terminate within 10 secs", threadpool.awaitTermination(10, TimeUnit.SECONDS));
+
+ // assert all requests were successful
+ for (Future<UpdateResponse> resp: updateResponses) {
+ assertEquals(0, resp.get().getStatus());
+ }
+
+ // assert both replicas have same effect
+ for (SolrClient client : NONLEADERS) { // 0th is re-ordered replica, 1st is well-ordered replica
+ SolrDocument doc = client.getById(String.valueOf(0), params("distrib", "false"));
+ assertNull("This doc was supposed to have been deleted, but was: " + doc, doc);
+ }
+
+ log.info("outOfOrderDeleteUpdatesIndividualReplicaTest: This test passed fine...");
+ del("*:*");
+ commit();
+ }
+
+ /* Test for a situation when a document requiring in-place update cannot be "resurrected"
+ * when the original full indexed document has been deleted by an out of order DBQ.
+ * Expected behaviour in this case should be to throw the replica into LIR (since this will
+ * be rare). Here's an example of the situation:
+ ADD(id=x, val=5, ver=1)
+ UPD(id=x, val=10, ver = 2)
+ DBQ(q=val:10, v=4)
+ DV(id=x, val=5, ver=3)
+ */
+ private void reorderedDBQsWithInPlaceUpdatesShouldNotThrowReplicaInLIRTest() throws Exception {
+ // nocommit: need to build randomized index with doc we care about mixed into the index randomly
+ // nocommit: see jira comments for in-depth suggestions
+
+ del("*:*");
+ commit();
+
+ index("id", 0, "title_s", "title0", "id_i", 0);
+ commit();
+
+ LEADER.commit();
+ SolrDocument sdoc = LEADER.getById("0"); // RTG straight from the index
+ //assertEquals(value, sdoc.get("inplace_updatable_float"));
+ assertEquals("title0", sdoc.get("title_s"));
+ long version0 = (long) sdoc.get("_version_");
+
+ String field = "inplace_updatable_int";
+
+ // put replica out of sync
+ List<UpdateRequest> updates = new ArrayList<>();
+ updates.add(simulatedUpdateRequest(null, "id", 0, "title_s", "title0_new", field, 5, "_version_", version0 + 1)); // full update
+ updates.add(simulatedUpdateRequest(version0 + 1, "id", 0, field, 10, "_version_", version0 + 2)); // inplace_updatable_float=101
+ updates.add(simulatedUpdateRequest(version0 + 2, "id", 0, field, 5, "_version_", version0 + 3)); // inplace_updatable_float=101
+ updates.add(simulatedDeleteRequest(field+":10", version0 + 4)); // supposed to not delete anything
+
+ // order the updates correctly for NONLEADER 1
+ for (UpdateRequest update : updates) {
+ log.info("Issuing well ordered update: " + update.getDocuments());
+ NONLEADERS.get(1).request(update);
+ }
+
+ // Reordering needs to happen using parallel threads
+ ExecutorService threadpool =
+ ExecutorUtil.newMDCAwareFixedThreadPool(updates.size() + 1, new DefaultSolrThreadFactory(getTestName()));
+ long seed = random().nextLong(); // seed for randomization within the threads
+
+ // re-order the last two updates for NONLEADER 0
+ List<UpdateRequest> reorderedUpdates = new ArrayList<>(updates);
+ Collections.swap(reorderedUpdates, 2, 3);
+
+ List<Future<UpdateResponse>> updateResponses = new ArrayList<>();
+ for (UpdateRequest update : reorderedUpdates) {
+ // pretend as this update is coming from the other non-leader, so that
+ // the resurrection can happen from there (instead of the leader)
+ update.setParam(DistributedUpdateProcessor.DISTRIB_FROM, ((HttpSolrClient)NONLEADERS.get(1)).getBaseURL());
+ AsyncUpdateWithRandomCommit task = new AsyncUpdateWithRandomCommit(update, NONLEADERS.get(0), seed);
+ updateResponses.add(threadpool.submit(task));
+ // while we can't guarantee/trust what order the updates are executed in, since multiple threads
+ // are involved, but we're trying to bias the thread scheduling to run them in the order submitted
+ Thread.sleep(10);
+ }
+
+ threadpool.shutdown();
+ assertTrue("Thread pool didn't terminate within 10 secs", threadpool.awaitTermination(10, TimeUnit.SECONDS));
+
+ int successful = 0;
+ for (Future<UpdateResponse> resp: updateResponses) {
+ try {
+ UpdateResponse r = resp.get();
+ if (r.getStatus() == 0) {
+ successful++;
+ }
+ } catch (Exception ex) {
+ // reordered DBQ should trigger an error, thus throwing the replica into LIR.
+ // the cause of the error is that the full document was deleted by mistake due to the
+ // out of order DBQ, and the in-place update that arrives after the DBQ (but was supposed to
+ // arrive before) cannot be applied, since the full document can't now be "resurrected".
+
+ if (!ex.getMessage().contains("Tried to fetch missing update"
+ + " from the leader, but missing wasn't present at leader.")) {
+ throw ex;
+ }
+ }
+ }
+ // All should succeed, i.e. no LIR
+ assertEquals(updateResponses.size(), successful);
+
+ log.info("Non leader 0: "+((HttpSolrClient)NONLEADERS.get(0)).getBaseURL());
+ log.info("Non leader 1: "+((HttpSolrClient)NONLEADERS.get(1)).getBaseURL());
+
+ SolrDocument doc0 = NONLEADERS.get(0).getById(String.valueOf(0), params("distrib", "false"));
+ SolrDocument doc1 = NONLEADERS.get(1).getById(String.valueOf(0), params("distrib", "false"));
+
+ log.info("Doc in both replica 0: "+doc0);
+ log.info("Doc in both replica 1: "+doc1);
+ // assert both replicas have same effect
+ for (int i=0; i<NONLEADERS.size(); i++) { // 0th is re-ordered replica, 1st is well-ordered replica
+ SolrClient client = NONLEADERS.get(i);
+ SolrDocument doc = client.getById(String.valueOf(0), params("distrib", "false"));
+ assertNotNull("Client: "+((HttpSolrClient)client).getBaseURL(), doc);
+ assertEquals("Client: "+((HttpSolrClient)client).getBaseURL(), 5, doc.getFieldValue(field));
+ }
+
+ log.info("reorderedDBQsWithInPlaceUpdatesShouldNotThrowReplicaInLIRTest: This test passed fine...");
+ del("*:*");
+ commit();
+ }
+
+ private void delayedReorderingFetchesMissingUpdateFromLeaderTest() throws Exception {
+ // nocommit: need to build randomized index with doc we care about mixed into the index randomly
+ // nocommit: see jira comments for in-depth suggestions
+
+ del("*:*");
+ commit();
+
+ float inplace_updatable_float = 1;
+
+ index("id", 1, "title_s", "title1", "id_i", 1, "inplace_updatable_float", inplace_updatable_float);
+ commit();
+
+ float newinplace_updatable_float = 100;
+ List<UpdateRequest> updates = new ArrayList<>();
+ updates.add(regularUpdateRequest("id", 1, "title_s", "title1_new", "id_i", 1, "inplace_updatable_float", newinplace_updatable_float));
+ updates.add(regularUpdateRequest("id", 1, "inplace_updatable_float", map("inc", 1)));
+ updates.add(regularUpdateRequest("id", 1, "inplace_updatable_float", map("inc", 1)));
+
+ // The next request to replica2 will be delayed by 6 secs (timeout is 5s)
+ shardToJetty.get(SHARD1).get(1).jetty.getDebugFilter().addDelay(
+ "Waiting for dependant update to timeout", 1, 6000);
+
+ long seed = random().nextLong(); // seed for randomization within the threads
+ ExecutorService threadpool =
+ ExecutorUtil.newMDCAwareFixedThreadPool(updates.size() + 1, new DefaultSolrThreadFactory(getTestName()));
+ for (UpdateRequest update : updates) {
+ AsyncUpdateWithRandomCommit task = new AsyncUpdateWithRandomCommit(update, cloudClient, seed);
+ threadpool.submit(task);
+
+ // while we can't guarantee/trust what order the updates are executed in, since multiple threads
+ // are involved, but we're trying to bias the thread scheduling to run them in the order submitted
+ Thread.sleep(100);
+ }
+
+ threadpool.shutdown();
+ assertTrue("Thread pool didn't terminate within 10 secs", threadpool.awaitTermination(10, TimeUnit.SECONDS));
+
+ commit();
+
+ // TODO: Could try checking ZK for LIR flags to ensure LIR has not kicked in
+ // Check every 10ms, 100 times, for a replica to go down (& assert that it doesn't)
+ for (int i=0; i<100; i++) {
+ Thread.sleep(10);
+ cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
+ ClusterState state = cloudClient.getZkStateReader().getClusterState();
+
+ int numActiveReplicas = 0;
+ for (Replica rep: state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas())
+ if (rep.getState().equals(Replica.State.ACTIVE))
+ numActiveReplicas++;
+
+ assertEquals("The replica receiving reordered updates must not have gone down", 3, numActiveReplicas);
+ }
+
+ // nocommit: what's the point of this ad-hoc array?
+ // nocommit: if we want the leader and all non leaders, why not just loop over "clients" ?
+ for (SolrClient client: new SolrClient[] {LEADER, NONLEADERS.get(0),
+ NONLEADERS.get(1)}) { // nonleader 0 re-ordered replica, nonleader 1 well-ordered replica
+ log.info("Testing client (Fetch missing test): " + ((HttpSolrClient)client).getBaseURL());
+ log.info("Version at " + ((HttpSolrClient)client).getBaseURL() + " is: " + getReplicaValue(client, 1, "_version_"));
+
+ assertReplicaValue(client, 1, "inplace_updatable_float", (newinplace_updatable_float + 2.0f),
+ "inplace_updatable_float didn't match for replica at client: " + ((HttpSolrClient)client).getBaseURL());
+ assertReplicaValue(client, 1, "title_s", "title1_new",
+ "Title didn't match for replica at client: " + ((HttpSolrClient)client).getBaseURL());
+ }
+
+ // Try another round of these updates, this time with a delete request at the end.
+ // This is to ensure that the fetch missing update from leader doesn't bomb out if the
+ // document has been deleted on the leader later on
+ {
+ del("*:*");
+ commit();
+ shardToJetty.get(SHARD1).get(1).jetty.getDebugFilter().unsetDelay();
+
+ updates.add(regularDeleteRequest(1));
+
+ shardToJetty.get(SHARD1).get(1).jetty.getDebugFilter().addDelay("Waiting for dependant update to timeout", 1, 5999); // the first update
+ shardToJetty.get(SHARD1).get(1).jetty.getDebugFilter().addDelay("Waiting for dependant update to timeout", 4, 5998); // the delete update
+
+ threadpool =
+ ExecutorUtil.newMDCAwareFixedThreadPool(updates.size() + 1, new DefaultSolrThreadFactory(getTestName()));
+ for (UpdateRequest update : updates) {
+ AsyncUpdateWithRandomCommit task = new AsyncUpdateWithRandomCommit(update, cloudClient, seed);
+ threadpool.submit(task);
+
+ // while we can't guarantee/trust what order the updates are executed in, since multiple threads
+ // are involved, but we're trying to bias the thread scheduling to run them in the order submitted
+ Thread.sleep(100);
+ }
+
+ threadpool.shutdown();
+ assertTrue("Thread pool didn't terminate within 15 secs", threadpool.awaitTermination(15, TimeUnit.SECONDS));
+
+ commit();
+
+ // TODO: Could try checking ZK for LIR flags to ensure LIR has not kicked in
+ // Check every 10ms, 100 times, for a replica to go down (& assert that it doesn't)
+ ZkController zkController = shardToLeaderJetty.get(SHARD1).jetty.getCoreContainer().getZkController();
+ String lirPath = zkController.getLeaderInitiatedRecoveryZnodePath(DEFAULT_TEST_COLLECTION_NAME, SHARD1);
+ assertFalse (zkController.getZkClient().exists(lirPath, true));
+
+ for (int i=0; i<100; i++) {
+ Thread.sleep(10);
+ cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
+ ClusterState state = cloudClient.getZkStateReader().getClusterState();
+
+ int numActiveReplicas = 0;
+ for (Replica rep: state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas())
+ if (rep.getState().equals(Replica.State.ACTIVE))
+ numActiveReplicas++;
+
+ assertEquals("The replica receiving reordered updates must not have gone down", 3, numActiveReplicas);
+ }
+
+ for (SolrClient client: new SolrClient[] {LEADER, NONLEADERS.get(0),
+ NONLEADERS.get(1)}) { // nonleader 0 re-ordered replica, nonleader 1 well-ordered replica
+ SolrDocument doc = client.getById(String.valueOf(1), params("distrib", "false"));
+ assertNull("This doc was supposed to have been deleted, but was: " + doc, doc);
+ }
+
+ }
+ log.info("delayedReorderingFetchesMissingUpdateFromLeaderTest: This test passed fine...");
+ }
+
+ /**
+ * Use the schema API to verify that the specified expected Field exists with those exact attributes.
+ */
+ public void checkExpectedSchemaField(Map<String,Object> expected) throws Exception {
+ String fieldName = (String) expected.get("name");
+ assertNotNull("expected contains no name: " + expected, fieldName);
+ FieldResponse rsp = new Field(fieldName).process(this.cloudClient);
+ assertNotNull("Field Null Response: " + fieldName, rsp);
+ assertEquals("Field Status: " + fieldName + " => " + rsp.toString(), 0, rsp.getStatus());
+ assertEquals("Field: " + fieldName, expected, rsp.getField());
+ }
+
+ private static class AsyncUpdateWithRandomCommit implements Callable<UpdateResponse> {
+ UpdateRequest update;
+ SolrClient solrClient;
+ final Random rnd;
+
+ public AsyncUpdateWithRandomCommit (UpdateRequest update, SolrClient solrClient, long seed) {
+ this.update = update;
+ this.solrClient = solrClient;
+ this.rnd = new Random(seed);
+ }
+
+ @Override
+ public UpdateResponse call() throws Exception {
+ UpdateResponse resp = update.process(solrClient); //solrClient.request(update);
+ if (rnd.nextInt(3) == 0)
+ solrClient.commit();
+ return resp;
+ }
+ }
+
+ Object getReplicaValue(SolrClient client, int doc, String field) throws SolrServerException, IOException {
+ // nocommit: all other usages of SolrClient.getById in this class should be replaced by calls to this method
+ // nocommit: that way we ensure distrib=false
+ SolrDocument sdoc = client.getById(String.valueOf(doc), params("distrib", "false"));
+ return sdoc.get(field);
+ }
+
+ void assertReplicaValue(SolrClient client, int doc, String field, Object expected,
+ String message) throws SolrServerException, IOException {
+ assertEquals(message, expected, getReplicaValue(client, doc, field));
+ }
+
+ // This returns an UpdateRequest with the given fields that represent a document.
+ // This request is constructed such that it is a simulation of a request coming from
+ // a leader to a replica.
+ UpdateRequest simulatedUpdateRequest(Long prevVersion, Object... fields) throws SolrServerException, IOException {
+ SolrInputDocument doc = sdoc(fields);
+
+ // get baseUrl of the leader
+ String baseUrl = getBaseUrl(doc.get("id").toString());
+
+ UpdateRequest ur = new UpdateRequest();
+ ur.add(doc);
+ ur.setParam("update.distrib", "FROMLEADER");
+ if (prevVersion != null) {
+ ur.setParam("distrib.inplace.prevversion", String.valueOf(prevVersion));
+ ur.setParam("distrib.inplace.update", "true");
+ }
+ ur.setParam("distrib.from", baseUrl);
+ return ur;
+ }
+
+ UpdateRequest simulatedDeleteRequest(int id, long version) throws SolrServerException, IOException {
+ String baseUrl = getBaseUrl(""+id);
+
+ UpdateRequest ur = new UpdateRequest();
+ if (random().nextBoolean()) {
+ ur.deleteById(""+id);
+ } else {
+ ur.deleteByQuery("id:"+id);
+ }
+ ur.setParam("_version_", ""+version);
+ ur.setParam("update.distrib", "FROMLEADER");
+ ur.setParam("distrib.from", baseUrl);
+ return ur;
+ }
+
+ UpdateRequest simulatedDeleteRequest(String query, long version) throws SolrServerException, IOException {
+ // nocommit: why clients.get(0)? ... if we want LEADER why not just use LEADER ?
+ String baseUrl = getBaseUrl((HttpSolrClient)clients.get(0));
+
+ UpdateRequest ur = new UpdateRequest();
+ ur.deleteByQuery(query);
+ ur.setParam("_version_", ""+version);
+ ur.setParam("update.distrib", "FROMLEADER");
+ ur.setParam("distrib.from", baseUrl + DEFAULT_COLLECTION + "/");
+ return ur;
+ }
+
+ private String getBaseUrl(String id) {
+ DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION);
+ Slice slice = collection.getRouter().getTargetSlice(id, null, null, null, collection);
+ String baseUrl = slice.getLeader().getCoreUrl();
+ return baseUrl;
+ }
+
+ UpdateRequest regularUpdateRequest(Object... fields) throws SolrServerException, IOException {
+ UpdateRequest ur = new UpdateRequest();
+ SolrInputDocument doc = sdoc(fields);
+ ur.add(doc);
+ return ur;
+ }
+
+ UpdateRequest regularDeleteRequest(int id) throws SolrServerException, IOException {
+ UpdateRequest ur = new UpdateRequest();
+ ur.deleteById(""+id);
+ return ur;
+ }
+
+ @SuppressWarnings("rawtypes")
+ protected long addDocAndGetVersion(Object... fields) throws Exception {
+ SolrInputDocument doc = new SolrInputDocument();
+ addFields(doc, fields);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.add("versions", "true");
+
+ UpdateRequest ureq = new UpdateRequest();
+ ureq.setParams(params);
+ ureq.add(doc);
+ UpdateResponse resp;
+
+ synchronized (cloudClient) { // nocommit: WTF? do we need sync or not? if so why cloudClient?
+ // send updates to leader, to avoid SOLR-8733
+ resp = ureq.process(LEADER);
+ }
+
+ long returnedVersion = Long.parseLong(((NamedList)resp.getResponse().get("adds")).getVal(0).toString());
+ assertTrue("Due to SOLR-8733, sometimes returned version is 0. Let us assert that we have successfully"
+ + " worked around that problem here.", returnedVersion > 0);
+ return returnedVersion;
+ }
+}