You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by cp...@apache.org on 2017/05/08 12:06:37 UTC

[05/33] lucene-solr:jira/solr-8668: SOLR-9530: An Update Processor to convert normal update operation to an atomic operations such as add, set, inc, remove , set, removeregex

SOLR-9530: An Update Processor to convert normal update operation to an atomic operations such as add, set,inc, remove ,set, removeregex


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/faa74ec7
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/faa74ec7
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/faa74ec7

Branch: refs/heads/jira/solr-8668
Commit: faa74ec7dcddb9fccf240e81a36b21f90336726c
Parents: c6ebee6
Author: Noble Paul <no...@apache.org>
Authored: Fri May 5 10:47:57 2017 +0930
Committer: Noble Paul <no...@apache.org>
Committed: Fri May 5 10:49:11 2017 +0930

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   4 +
 .../java/org/apache/solr/core/PluginBag.java    |   2 +-
 .../processor/AtomicUpdateProcessorFactory.java | 183 ++++++++++++
 .../processor/UpdateRequestProcessorChain.java  |  12 +-
 .../AtomicUpdateProcessorFactoryTest.java       | 280 +++++++++++++++++++
 5 files changed, 477 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/faa74ec7/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index f9576c2..ae1ed1f 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -206,6 +206,10 @@ New Features
 
 * SOLR-10583: JSON Faceting now supports a query time 'join' domain change option (hossman)
 
+* SOLR-9530: An Update Processor to convert normal update operation to an atomic operations such as
+  add, set,inc, remove ,set, removeregex (Amrit Sarkar, noble)
+
+
 Optimizations
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/faa74ec7/solr/core/src/java/org/apache/solr/core/PluginBag.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/PluginBag.java b/solr/core/src/java/org/apache/solr/core/PluginBag.java
index 4c0858e..e03fc06 100644
--- a/solr/core/src/java/org/apache/solr/core/PluginBag.java
+++ b/solr/core/src/java/org/apache/solr/core/PluginBag.java
@@ -122,7 +122,7 @@ public class PluginBag<T> implements AutoCloseable {
     return result;
   }
 
-  PluginHolder<T> createPlugin(PluginInfo info) {
+  public PluginHolder<T> createPlugin(PluginInfo info) {
     if ("true".equals(String.valueOf(info.attributes.get("runtimeLib")))) {
       log.debug(" {} : '{}'  created with runtimeLib=true ", meta.getCleanTag(), info.name);
       return new LazyPluginHolder<>(meta, info, core, core.getMemClassLoader());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/faa74ec7/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateProcessorFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateProcessorFactory.java
new file mode 100644
index 0000000..2292dc8
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateProcessorFactory.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update.processor;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.VersionInfo;
+import org.apache.solr.util.plugin.SolrCoreAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An update processor that will convert conventional field-value document to atomic update document
+ * <p>
+ * sample request:
+ * curl -X POST -H Content-Type: application/json
+ * http://localhost:8983/solr/test/update/json/docs?processor=atomic;ampersand;Atomic.my_newfield=add;ampersand;Atomic.subject=set;ampersand;Atomic.count_i=inc;ampersand;commit=true
+ * --data-binary {"id": 1,"title": "titleA"}
+ * </p>
+ * currently supports all types of atomic updates
+ */
+
+public class AtomicUpdateProcessorFactory extends UpdateRequestProcessorFactory implements SolrCoreAware {
+
+  private final static String ADD = "add";
+  private final static String INC = "inc";
+  private final static String REMOVE = "remove";
+  private final static String SET = "set";
+  private final static String REMOVEREGEX = "removeregex";
+  private final static Set<String> VALID_OPS = new HashSet<>(Arrays.asList(ADD, INC, REMOVE, SET, REMOVEREGEX));
+
+  private final static String VERSION = "_version_";
+  private final static String ATOMIC_FIELD_PREFIX = "Atomic.";
+  private final static int MAX_ATTEMPTS = 5;
+
+  private VersionInfo vinfo;
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+
+  @SuppressWarnings({"static-access", "rawtypes", "null"})
+  @Override
+  public void init(final NamedList args) {
+
+  }
+
+  @Override
+  public void inform(SolrCore core) {
+    this.vinfo = core.getUpdateHandler().getUpdateLog() == null ? null : core.getUpdateHandler().getUpdateLog().getVersionInfo();
+
+  }
+
+  @Override
+  public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp,
+                                            UpdateRequestProcessor next) {
+    if (vinfo == null) {
+      throw new SolrException
+          (SolrException.ErrorCode.BAD_REQUEST,
+              "Atomic document updates are not supported unless <updateLog/> is configured");
+    }
+    return new AtomicUpdateProcessor(req, next);
+  }
+
+  private class AtomicUpdateProcessor extends UpdateRequestProcessor {
+
+    @SuppressWarnings("unused")
+    private final SolrQueryRequest req;
+    private final UpdateRequestProcessor next;
+
+    private AtomicUpdateProcessor(SolrQueryRequest req,
+                                  UpdateRequestProcessor next) {
+      super(next);
+      this.next = next;
+      this.req = req;
+    }
+
+    /*
+     * 1. convert incoming update document to atomic-type update document 
+     * for specified fields in processor definition.
+     * 2. if incoming update document contains already atomic-type updates, skip
+     * 3. fields not specified in processor param(s) in solrconfig.xml for atomic action
+     * will be treated as conventional updates.
+     * 4. retry when encounter version conflict
+     */
+    @SuppressWarnings("unchecked")
+    @Override
+    public void processAdd(AddUpdateCommand cmd)
+        throws IOException {
+
+      SolrInputDocument orgdoc = cmd.getSolrInputDocument();
+      boolean isAtomicUpdateAddedByMe = false;
+
+      Iterator<String> paramsIterator = req.getParams().getParameterNamesIterator();
+
+      while (paramsIterator.hasNext()) {
+
+        String param = paramsIterator.next();
+
+        if (!param.startsWith(ATOMIC_FIELD_PREFIX)) continue;
+
+        String field = param.substring(ATOMIC_FIELD_PREFIX.length(), param.length());
+        String operation = req.getParams().get(param);
+
+        if (!VALID_OPS.contains(operation)) {
+          throw new SolrException(SERVER_ERROR,
+              "Unexpected param(s) for AtomicUpdateProcessor, invalid atomic op passed: '" +
+                  req.getParams().get(param) + "'");
+        }
+        if (orgdoc.get(field) == null || orgdoc.get(field).getValue() instanceof Map) {
+          // no value for the field or it's already an atomic update operation
+          //continue processing other fields
+          continue;
+        }
+
+        orgdoc.setField(field, singletonMap(operation, orgdoc.get(field).getValue()));
+        isAtomicUpdateAddedByMe = true;
+      }
+
+      // if atomic, put _version_ for optimistic concurrency if doc present in index
+      if (isAtomicUpdateAddedByMe) {
+        Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+        if (lastVersion != null) {
+          orgdoc.setField(VERSION, lastVersion);
+        }
+        processAddWithRetry(cmd, 0);
+      } else {
+        super.processAdd(cmd);
+      }
+      // else send it for doc to get inserted for the first time
+    }
+
+    private void processAddWithRetry(AddUpdateCommand cmd, int attempts) throws IOException {
+      try {
+        super.processAdd(cmd);
+      } catch (SolrException e) {
+        if (attempts++ >= MAX_ATTEMPTS) {//maximum number of attempts allowed: 5
+          throw new SolrException(SERVER_ERROR,
+              "Atomic update failed after multiple attempts due to " + e.getMessage());
+        }
+        if (e.code() == ErrorCode.CONFLICT.code) { // version conflict
+          log.warn("Atomic update failed due to " + e.getMessage() +
+              "Retrying with new version .... (" + attempts + ")");
+          Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+          if (lastVersion != null) {
+            cmd.solrDoc.setField(VERSION, lastVersion);
+          }
+          processAddWithRetry(cmd, attempts);
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/faa74ec7/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java b/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java
index 0ed626c..05d1a5a 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java
@@ -28,6 +28,8 @@ import org.apache.solr.common.params.MapSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.PluginBag;
 import org.apache.solr.core.PluginInfo;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.request.SolrQueryRequest;
@@ -271,9 +273,13 @@ public final class UpdateRequestProcessorChain implements PluginInfoInitialized
       UpdateRequestProcessorFactory p = core.getUpdateProcessors().get(s);
       if (p == null) {
         try {
-          p = core.createInstance(s + "UpdateProcessorFactory", UpdateRequestProcessorFactory.class,
-              "updateProcessor", null, core.getMemClassLoader());
-          core.getUpdateProcessors().put(s, p);
+          PluginInfo pluginInfo = new PluginInfo("updateProcessor",
+              Utils.makeMap("name", s,
+                  "class", s + "UpdateProcessorFactory",
+                  "runtimeLib", "true"));
+
+          PluginBag.PluginHolder<UpdateRequestProcessorFactory> pluginHolder = core.getUpdateProcessors().createPlugin(pluginInfo);
+          core.getUpdateProcessors().put(s, p = pluginHolder.get());
         } catch (SolrException e) {
         }
         if (p == null)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/faa74ec7/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateProcessorFactoryTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateProcessorFactoryTest.java b/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateProcessorFactoryTest.java
new file mode 100644
index 0000000..780e2ba
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateProcessorFactoryTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update.processor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringJoiner;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.AddUpdateCommand;
+import org.junit.BeforeClass;
+
+/**
+ * test class for @see AtomicUpdateProcessorFactory
+ */
+public class AtomicUpdateProcessorFactoryTest extends SolrTestCaseJ4 {
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    initCore("solrconfig.xml", "schema.xml");
+  }
+
+  public void testWrongAtomicOpPassed() throws Exception {
+    AddUpdateCommand cmd = new AddUpdateCommand(new LocalSolrQueryRequest(h.getCore(),
+        new ModifiableSolrParams()
+            .add("processor", "Atomic")
+            .add("Atomic.cat", "delete")
+            .add("commit","true")
+    ));
+
+    try {
+      AtomicUpdateProcessorFactory factory = new AtomicUpdateProcessorFactory();
+      factory.inform(h.getCore());
+      factory.getInstance(cmd.getReq(), new SolrQueryResponse(),
+          null).processAdd(cmd);
+    } catch (SolrException e) {
+      assertEquals("Unexpected param(s) for AtomicUpdateProcessor, invalid atomic op passed: 'delete'",
+          e.getMessage());
+    }
+  }
+
+  public void testNoUniqueIdPassed() throws Exception { //TODO
+    AddUpdateCommand cmd = new AddUpdateCommand(new LocalSolrQueryRequest(h.getCore(),
+        new ModifiableSolrParams()
+            .add("processor", "Atomic")
+            .add("Atomic.cat", "add")
+            .add("commit","true")
+    ));
+
+    cmd.solrDoc = new SolrInputDocument();
+    cmd.solrDoc.addField("title", 1);
+
+    try {
+      AtomicUpdateProcessorFactory factory = new AtomicUpdateProcessorFactory();
+      factory.inform(h.getCore());
+      factory.getInstance(cmd.getReq(), new SolrQueryResponse(),
+          null).processAdd(cmd);
+    } catch (SolrException e) {
+      assertEquals("Document passed with no unique field: 'id'", e.getMessage());
+    }
+  }
+
+  public void testBasics() throws Exception {
+
+    AddUpdateCommand cmd = new AddUpdateCommand(new LocalSolrQueryRequest(h.getCore(),
+        new ModifiableSolrParams()
+            .add("processor", "Atomic")
+            .add("Atomic.cat", "add")
+            .add("Atomic.title", "set")
+            .add("Atomic.count_i", "set")
+            .add("Atomic.name_s", "set")
+            .add("Atomic.multiDefault", "set")
+            .add("commit","true")
+    ));
+
+    cmd.solrDoc = new SolrInputDocument();
+    cmd.solrDoc.addField("id", 1);
+    cmd.solrDoc.addField("cat", "human");
+    cmd.solrDoc.addField("title", "Mr");
+    cmd.solrDoc.addField("count_i", 20);
+    cmd.solrDoc.addField("name_s", "Virat");
+    cmd.solrDoc.addField("multiDefault", "Delhi");
+
+    AtomicUpdateProcessorFactory factory = new AtomicUpdateProcessorFactory();
+    factory.inform(h.getCore());
+    factory.getInstance(cmd.getReq(), new SolrQueryResponse(),
+        new DistributedUpdateProcessor(cmd.getReq(), new SolrQueryResponse(),
+            new RunUpdateProcessor(cmd.getReq(), null))).processAdd(cmd);
+
+    assertU(commit());
+
+    assertQ("Check the total number of docs",
+        req("q", "id:1")
+        , "//result[@numFound=1]");
+
+    assertQ("Check the total number of docs",
+        req("q", "cat:human")
+        , "//result[@numFound=1]");
+
+    assertQ("Check the total number of docs",
+        req("q", "title:Mr")
+        , "//result[@numFound=1]");
+
+    assertQ("Check the total number of docs",
+        req("q", "count_i:20")
+        , "//result[@numFound=1]");
+
+    assertQ("Check the total number of docs",
+        req("q", "name_s:Virat")
+        , "//result[@numFound=1]");
+
+    assertQ("Check the total number of docs",
+        req("q", "multiDefault:Delhi")
+        , "//result[@numFound=1]");
+
+    cmd = new AddUpdateCommand(new LocalSolrQueryRequest(h.getCore(),
+        new ModifiableSolrParams()
+            .add("processor", "Atomic")
+            .add("Atomic.cat", "add")
+            .add("Atomic.title", "set")
+            .add("Atomic.count_i", "inc")
+            .add("Atomic.name_s", "remove")
+            .add("Atomic.multiDefault", "removeregex")
+            .add("commit","true")
+    ));
+
+    cmd.solrDoc = new SolrInputDocument();
+    cmd.solrDoc.addField("id", 1);
+    cmd.solrDoc.addField("cat", "animal");
+    cmd.solrDoc.addField("title", "Dr");
+    cmd.solrDoc.addField("count_i", 20);
+    cmd.solrDoc.addField("name_s", "Virat");
+    cmd.solrDoc.addField("multiDefault", ".elh.");
+
+    factory = new AtomicUpdateProcessorFactory();
+    factory.inform(h.getCore());
+    factory.getInstance(cmd.getReq(), new SolrQueryResponse(),
+        new DistributedUpdateProcessor(cmd.getReq(), new SolrQueryResponse(),
+            new RunUpdateProcessor(cmd.getReq(), null))).processAdd(cmd);
+
+    assertU(commit());
+
+    assertQ("Check the total number of docs",
+        req("q", "id:1")
+        , "//result[@numFound=1]");
+
+    assertQ("Check the total number of docs",
+        req("q", "cat:human")
+        , "//result[@numFound=1]");
+
+    assertQ("Check the total number of docs",
+        req("q", "cat:animal")
+        , "//result[@numFound=1]");
+
+    assertQ("Check the total number of docs",
+        req("q", "title:Mr")
+        , "//result[@numFound=0]");
+
+    assertQ("Check the total number of docs",
+        req("q", "title:Dr")
+        , "//result[@numFound=1]");
+
+    assertQ("Check the total number of docs",
+        req("q", "count_i:20")
+        , "//result[@numFound=0]");
+
+    assertQ("Check the total number of docs",
+        req("q", "count_i:40")
+        , "//result[@numFound=1]");
+
+    assertQ("Check the total number of docs",
+        req("q", "name_s:Virat")
+        , "//result[@numFound=0]");
+
+    assertQ("Check the total number of docs",
+        req("q", "multiDefault:Delhi")
+        , "//result[@numFound=0]");
+
+  }
+
+  public void testMultipleThreads() throws Exception {
+    clearIndex();
+    String[] strings = new String[5];
+    for (int i=0; i<5; i++) {
+      strings[i] = generateRandomString();
+    }
+
+    List<Thread> threads = new ArrayList<>(100);
+    int finalCount = 0; //int_i
+
+    for (int i = 0; i < 100; i++) {
+      int index = random().nextInt(5);
+      Thread t = new Thread() {
+        @Override
+        public void run() {
+          AddUpdateCommand cmd = new AddUpdateCommand(new LocalSolrQueryRequest(h.getCore(),
+              new ModifiableSolrParams()
+                  .add("processor", "Atomic")
+                  .add("Atomic.cat", "add")
+                  .add("Atomic.int_i", "inc")
+                  .add("commit","true")
+
+          ));
+
+          cmd.solrDoc = new SolrInputDocument();
+          cmd.solrDoc.addField("id", 10); //hardcoded id=2
+          cmd.solrDoc.addField("cat", strings[index]);
+          cmd.solrDoc.addField("int_i", index);
+
+          try {
+            AtomicUpdateProcessorFactory factory = new AtomicUpdateProcessorFactory();
+            factory.inform(h.getCore());
+            factory.getInstance(cmd.getReq(), new SolrQueryResponse(),
+                new DistributedUpdateProcessor(cmd.getReq(), new SolrQueryResponse(),
+                    new RunUpdateProcessor(cmd.getReq(), null))).processAdd(cmd);
+          } catch (IOException e) {
+          }
+        }
+      };
+      t.run();
+      threads.add(t);
+      finalCount += index; //int_i
+    }
+
+    for (Thread thread: threads) {
+      thread.join();
+    }
+
+    assertU(commit());
+
+    assertQ("Check the total number of docs",
+        req("q", "id:10"), "//result[@numFound=1]");
+
+
+    StringJoiner queryString = new StringJoiner(" ");
+    for(String string: strings) {
+      queryString.add(string);
+    }
+
+    assertQ("Check the total number of docs",
+        req("q", "cat:" + queryString.toString())
+        , "//result[@numFound=1]");
+
+    assertQ("Check the total number of docs",
+        req("q", "int_i:" + finalCount)
+        , "//result[@numFound=1]");
+
+  }
+
+  private String generateRandomString() {
+    char[] chars = "abcdefghijklmnopqrstuvwxyz0123456789".toCharArray();
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < 20; i++) {
+      char c = chars[random().nextInt(chars.length)];
+      sb.append(c);
+    }
+    return sb.toString();
+  }
+
+}
\ No newline at end of file