You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by cp...@apache.org on 2017/05/08 12:06:37 UTC
[05/33] lucene-solr:jira/solr-8668: SOLR-9530: An Update Processor to
convert normal update operation to an atomic operations such as add, set, inc,
remove , set, removeregex
SOLR-9530: An Update Processor to convert normal update operation to an atomic operations such as add, set,inc, remove ,set, removeregex
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/faa74ec7
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/faa74ec7
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/faa74ec7
Branch: refs/heads/jira/solr-8668
Commit: faa74ec7dcddb9fccf240e81a36b21f90336726c
Parents: c6ebee6
Author: Noble Paul <no...@apache.org>
Authored: Fri May 5 10:47:57 2017 +0930
Committer: Noble Paul <no...@apache.org>
Committed: Fri May 5 10:49:11 2017 +0930
----------------------------------------------------------------------
solr/CHANGES.txt | 4 +
.../java/org/apache/solr/core/PluginBag.java | 2 +-
.../processor/AtomicUpdateProcessorFactory.java | 183 ++++++++++++
.../processor/UpdateRequestProcessorChain.java | 12 +-
.../AtomicUpdateProcessorFactoryTest.java | 280 +++++++++++++++++++
5 files changed, 477 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/faa74ec7/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index f9576c2..ae1ed1f 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -206,6 +206,10 @@ New Features
* SOLR-10583: JSON Faceting now supports a query time 'join' domain change option (hossman)
+* SOLR-9530: An Update Processor to convert normal update operation to an atomic operations such as
+ add, set,inc, remove ,set, removeregex (Amrit Sarkar, noble)
+
+
Optimizations
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/faa74ec7/solr/core/src/java/org/apache/solr/core/PluginBag.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/PluginBag.java b/solr/core/src/java/org/apache/solr/core/PluginBag.java
index 4c0858e..e03fc06 100644
--- a/solr/core/src/java/org/apache/solr/core/PluginBag.java
+++ b/solr/core/src/java/org/apache/solr/core/PluginBag.java
@@ -122,7 +122,7 @@ public class PluginBag<T> implements AutoCloseable {
return result;
}
- PluginHolder<T> createPlugin(PluginInfo info) {
+ public PluginHolder<T> createPlugin(PluginInfo info) {
if ("true".equals(String.valueOf(info.attributes.get("runtimeLib")))) {
log.debug(" {} : '{}' created with runtimeLib=true ", meta.getCleanTag(), info.name);
return new LazyPluginHolder<>(meta, info, core, core.getMemClassLoader());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/faa74ec7/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateProcessorFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateProcessorFactory.java
new file mode 100644
index 0000000..2292dc8
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateProcessorFactory.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update.processor;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.VersionInfo;
+import org.apache.solr.util.plugin.SolrCoreAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An update processor that will convert conventional field-value document to atomic update document
+ * <p>
+ * sample request:
+ * curl -X POST -H Content-Type: application/json
+ * http://localhost:8983/solr/test/update/json/docs?processor=atomic;ampersand;Atomic.my_newfield=add;ampersand;Atomic.subject=set;ampersand;Atomic.count_i=inc;ampersand;commit=true
+ * --data-binary {"id": 1,"title": "titleA"}
+ * </p>
+ * currently supports all types of atomic updates
+ */
+
+public class AtomicUpdateProcessorFactory extends UpdateRequestProcessorFactory implements SolrCoreAware {
+
+ private final static String ADD = "add";
+ private final static String INC = "inc";
+ private final static String REMOVE = "remove";
+ private final static String SET = "set";
+ private final static String REMOVEREGEX = "removeregex";
+ private final static Set<String> VALID_OPS = new HashSet<>(Arrays.asList(ADD, INC, REMOVE, SET, REMOVEREGEX));
+
+ private final static String VERSION = "_version_";
+ private final static String ATOMIC_FIELD_PREFIX = "Atomic.";
+ private final static int MAX_ATTEMPTS = 5;
+
+ private VersionInfo vinfo;
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+
+ @SuppressWarnings({"static-access", "rawtypes", "null"})
+ @Override
+ public void init(final NamedList args) {
+
+ }
+
+ @Override
+ public void inform(SolrCore core) {
+ this.vinfo = core.getUpdateHandler().getUpdateLog() == null ? null : core.getUpdateHandler().getUpdateLog().getVersionInfo();
+
+ }
+
+ @Override
+ public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp,
+ UpdateRequestProcessor next) {
+ if (vinfo == null) {
+ throw new SolrException
+ (SolrException.ErrorCode.BAD_REQUEST,
+ "Atomic document updates are not supported unless <updateLog/> is configured");
+ }
+ return new AtomicUpdateProcessor(req, next);
+ }
+
+ private class AtomicUpdateProcessor extends UpdateRequestProcessor {
+
+ @SuppressWarnings("unused")
+ private final SolrQueryRequest req;
+ private final UpdateRequestProcessor next;
+
+ private AtomicUpdateProcessor(SolrQueryRequest req,
+ UpdateRequestProcessor next) {
+ super(next);
+ this.next = next;
+ this.req = req;
+ }
+
+ /*
+ * 1. convert incoming update document to atomic-type update document
+ * for specified fields in processor definition.
+ * 2. if incoming update document contains already atomic-type updates, skip
+ * 3. fields not specified in processor param(s) in solrconfig.xml for atomic action
+ * will be treated as conventional updates.
+ * 4. retry when encounter version conflict
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void processAdd(AddUpdateCommand cmd)
+ throws IOException {
+
+ SolrInputDocument orgdoc = cmd.getSolrInputDocument();
+ boolean isAtomicUpdateAddedByMe = false;
+
+ Iterator<String> paramsIterator = req.getParams().getParameterNamesIterator();
+
+ while (paramsIterator.hasNext()) {
+
+ String param = paramsIterator.next();
+
+ if (!param.startsWith(ATOMIC_FIELD_PREFIX)) continue;
+
+ String field = param.substring(ATOMIC_FIELD_PREFIX.length(), param.length());
+ String operation = req.getParams().get(param);
+
+ if (!VALID_OPS.contains(operation)) {
+ throw new SolrException(SERVER_ERROR,
+ "Unexpected param(s) for AtomicUpdateProcessor, invalid atomic op passed: '" +
+ req.getParams().get(param) + "'");
+ }
+ if (orgdoc.get(field) == null || orgdoc.get(field).getValue() instanceof Map) {
+ // no value for the field or it's already an atomic update operation
+ //continue processing other fields
+ continue;
+ }
+
+ orgdoc.setField(field, singletonMap(operation, orgdoc.get(field).getValue()));
+ isAtomicUpdateAddedByMe = true;
+ }
+
+ // if atomic, put _version_ for optimistic concurrency if doc present in index
+ if (isAtomicUpdateAddedByMe) {
+ Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ if (lastVersion != null) {
+ orgdoc.setField(VERSION, lastVersion);
+ }
+ processAddWithRetry(cmd, 0);
+ } else {
+ super.processAdd(cmd);
+ }
+ // else send it for doc to get inserted for the first time
+ }
+
+ private void processAddWithRetry(AddUpdateCommand cmd, int attempts) throws IOException {
+ try {
+ super.processAdd(cmd);
+ } catch (SolrException e) {
+ if (attempts++ >= MAX_ATTEMPTS) {//maximum number of attempts allowed: 5
+ throw new SolrException(SERVER_ERROR,
+ "Atomic update failed after multiple attempts due to " + e.getMessage());
+ }
+ if (e.code() == ErrorCode.CONFLICT.code) { // version conflict
+ log.warn("Atomic update failed due to " + e.getMessage() +
+ "Retrying with new version .... (" + attempts + ")");
+ Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ if (lastVersion != null) {
+ cmd.solrDoc.setField(VERSION, lastVersion);
+ }
+ processAddWithRetry(cmd, attempts);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/faa74ec7/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java b/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java
index 0ed626c..05d1a5a 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java
@@ -28,6 +28,8 @@ import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.PluginBag;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
@@ -271,9 +273,13 @@ public final class UpdateRequestProcessorChain implements PluginInfoInitialized
UpdateRequestProcessorFactory p = core.getUpdateProcessors().get(s);
if (p == null) {
try {
- p = core.createInstance(s + "UpdateProcessorFactory", UpdateRequestProcessorFactory.class,
- "updateProcessor", null, core.getMemClassLoader());
- core.getUpdateProcessors().put(s, p);
+ PluginInfo pluginInfo = new PluginInfo("updateProcessor",
+ Utils.makeMap("name", s,
+ "class", s + "UpdateProcessorFactory",
+ "runtimeLib", "true"));
+
+ PluginBag.PluginHolder<UpdateRequestProcessorFactory> pluginHolder = core.getUpdateProcessors().createPlugin(pluginInfo);
+ core.getUpdateProcessors().put(s, p = pluginHolder.get());
} catch (SolrException e) {
}
if (p == null)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/faa74ec7/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateProcessorFactoryTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateProcessorFactoryTest.java b/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateProcessorFactoryTest.java
new file mode 100644
index 0000000..780e2ba
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateProcessorFactoryTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update.processor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringJoiner;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.AddUpdateCommand;
+import org.junit.BeforeClass;
+
+/**
+ * test class for @see AtomicUpdateProcessorFactory
+ */
+public class AtomicUpdateProcessorFactoryTest extends SolrTestCaseJ4 {
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ initCore("solrconfig.xml", "schema.xml");
+ }
+
+ public void testWrongAtomicOpPassed() throws Exception {
+ AddUpdateCommand cmd = new AddUpdateCommand(new LocalSolrQueryRequest(h.getCore(),
+ new ModifiableSolrParams()
+ .add("processor", "Atomic")
+ .add("Atomic.cat", "delete")
+ .add("commit","true")
+ ));
+
+ try {
+ AtomicUpdateProcessorFactory factory = new AtomicUpdateProcessorFactory();
+ factory.inform(h.getCore());
+ factory.getInstance(cmd.getReq(), new SolrQueryResponse(),
+ null).processAdd(cmd);
+ } catch (SolrException e) {
+ assertEquals("Unexpected param(s) for AtomicUpdateProcessor, invalid atomic op passed: 'delete'",
+ e.getMessage());
+ }
+ }
+
+ public void testNoUniqueIdPassed() throws Exception { //TODO
+ AddUpdateCommand cmd = new AddUpdateCommand(new LocalSolrQueryRequest(h.getCore(),
+ new ModifiableSolrParams()
+ .add("processor", "Atomic")
+ .add("Atomic.cat", "add")
+ .add("commit","true")
+ ));
+
+ cmd.solrDoc = new SolrInputDocument();
+ cmd.solrDoc.addField("title", 1);
+
+ try {
+ AtomicUpdateProcessorFactory factory = new AtomicUpdateProcessorFactory();
+ factory.inform(h.getCore());
+ factory.getInstance(cmd.getReq(), new SolrQueryResponse(),
+ null).processAdd(cmd);
+ } catch (SolrException e) {
+ assertEquals("Document passed with no unique field: 'id'", e.getMessage());
+ }
+ }
+
+ public void testBasics() throws Exception {
+
+ AddUpdateCommand cmd = new AddUpdateCommand(new LocalSolrQueryRequest(h.getCore(),
+ new ModifiableSolrParams()
+ .add("processor", "Atomic")
+ .add("Atomic.cat", "add")
+ .add("Atomic.title", "set")
+ .add("Atomic.count_i", "set")
+ .add("Atomic.name_s", "set")
+ .add("Atomic.multiDefault", "set")
+ .add("commit","true")
+ ));
+
+ cmd.solrDoc = new SolrInputDocument();
+ cmd.solrDoc.addField("id", 1);
+ cmd.solrDoc.addField("cat", "human");
+ cmd.solrDoc.addField("title", "Mr");
+ cmd.solrDoc.addField("count_i", 20);
+ cmd.solrDoc.addField("name_s", "Virat");
+ cmd.solrDoc.addField("multiDefault", "Delhi");
+
+ AtomicUpdateProcessorFactory factory = new AtomicUpdateProcessorFactory();
+ factory.inform(h.getCore());
+ factory.getInstance(cmd.getReq(), new SolrQueryResponse(),
+ new DistributedUpdateProcessor(cmd.getReq(), new SolrQueryResponse(),
+ new RunUpdateProcessor(cmd.getReq(), null))).processAdd(cmd);
+
+ assertU(commit());
+
+ assertQ("Check the total number of docs",
+ req("q", "id:1")
+ , "//result[@numFound=1]");
+
+ assertQ("Check the total number of docs",
+ req("q", "cat:human")
+ , "//result[@numFound=1]");
+
+ assertQ("Check the total number of docs",
+ req("q", "title:Mr")
+ , "//result[@numFound=1]");
+
+ assertQ("Check the total number of docs",
+ req("q", "count_i:20")
+ , "//result[@numFound=1]");
+
+ assertQ("Check the total number of docs",
+ req("q", "name_s:Virat")
+ , "//result[@numFound=1]");
+
+ assertQ("Check the total number of docs",
+ req("q", "multiDefault:Delhi")
+ , "//result[@numFound=1]");
+
+ cmd = new AddUpdateCommand(new LocalSolrQueryRequest(h.getCore(),
+ new ModifiableSolrParams()
+ .add("processor", "Atomic")
+ .add("Atomic.cat", "add")
+ .add("Atomic.title", "set")
+ .add("Atomic.count_i", "inc")
+ .add("Atomic.name_s", "remove")
+ .add("Atomic.multiDefault", "removeregex")
+ .add("commit","true")
+ ));
+
+ cmd.solrDoc = new SolrInputDocument();
+ cmd.solrDoc.addField("id", 1);
+ cmd.solrDoc.addField("cat", "animal");
+ cmd.solrDoc.addField("title", "Dr");
+ cmd.solrDoc.addField("count_i", 20);
+ cmd.solrDoc.addField("name_s", "Virat");
+ cmd.solrDoc.addField("multiDefault", ".elh.");
+
+ factory = new AtomicUpdateProcessorFactory();
+ factory.inform(h.getCore());
+ factory.getInstance(cmd.getReq(), new SolrQueryResponse(),
+ new DistributedUpdateProcessor(cmd.getReq(), new SolrQueryResponse(),
+ new RunUpdateProcessor(cmd.getReq(), null))).processAdd(cmd);
+
+ assertU(commit());
+
+ assertQ("Check the total number of docs",
+ req("q", "id:1")
+ , "//result[@numFound=1]");
+
+ assertQ("Check the total number of docs",
+ req("q", "cat:human")
+ , "//result[@numFound=1]");
+
+ assertQ("Check the total number of docs",
+ req("q", "cat:animal")
+ , "//result[@numFound=1]");
+
+ assertQ("Check the total number of docs",
+ req("q", "title:Mr")
+ , "//result[@numFound=0]");
+
+ assertQ("Check the total number of docs",
+ req("q", "title:Dr")
+ , "//result[@numFound=1]");
+
+ assertQ("Check the total number of docs",
+ req("q", "count_i:20")
+ , "//result[@numFound=0]");
+
+ assertQ("Check the total number of docs",
+ req("q", "count_i:40")
+ , "//result[@numFound=1]");
+
+ assertQ("Check the total number of docs",
+ req("q", "name_s:Virat")
+ , "//result[@numFound=0]");
+
+ assertQ("Check the total number of docs",
+ req("q", "multiDefault:Delhi")
+ , "//result[@numFound=0]");
+
+ }
+
+ public void testMultipleThreads() throws Exception {
+ clearIndex();
+ String[] strings = new String[5];
+ for (int i=0; i<5; i++) {
+ strings[i] = generateRandomString();
+ }
+
+ List<Thread> threads = new ArrayList<>(100);
+ int finalCount = 0; //int_i
+
+ for (int i = 0; i < 100; i++) {
+ int index = random().nextInt(5);
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ AddUpdateCommand cmd = new AddUpdateCommand(new LocalSolrQueryRequest(h.getCore(),
+ new ModifiableSolrParams()
+ .add("processor", "Atomic")
+ .add("Atomic.cat", "add")
+ .add("Atomic.int_i", "inc")
+ .add("commit","true")
+
+ ));
+
+ cmd.solrDoc = new SolrInputDocument();
+ cmd.solrDoc.addField("id", 10); //hardcoded id=2
+ cmd.solrDoc.addField("cat", strings[index]);
+ cmd.solrDoc.addField("int_i", index);
+
+ try {
+ AtomicUpdateProcessorFactory factory = new AtomicUpdateProcessorFactory();
+ factory.inform(h.getCore());
+ factory.getInstance(cmd.getReq(), new SolrQueryResponse(),
+ new DistributedUpdateProcessor(cmd.getReq(), new SolrQueryResponse(),
+ new RunUpdateProcessor(cmd.getReq(), null))).processAdd(cmd);
+ } catch (IOException e) {
+ }
+ }
+ };
+ t.run();
+ threads.add(t);
+ finalCount += index; //int_i
+ }
+
+ for (Thread thread: threads) {
+ thread.join();
+ }
+
+ assertU(commit());
+
+ assertQ("Check the total number of docs",
+ req("q", "id:10"), "//result[@numFound=1]");
+
+
+ StringJoiner queryString = new StringJoiner(" ");
+ for(String string: strings) {
+ queryString.add(string);
+ }
+
+ assertQ("Check the total number of docs",
+ req("q", "cat:" + queryString.toString())
+ , "//result[@numFound=1]");
+
+ assertQ("Check the total number of docs",
+ req("q", "int_i:" + finalCount)
+ , "//result[@numFound=1]");
+
+ }
+
+ private String generateRandomString() {
+ char[] chars = "abcdefghijklmnopqrstuvwxyz0123456789".toCharArray();
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 20; i++) {
+ char c = chars[random().nextInt(chars.length)];
+ sb.append(c);
+ }
+ return sb.toString();
+ }
+
+}
\ No newline at end of file