You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2014/11/21 07:04:47 UTC
svn commit: r1640857 - in /lucene/dev/trunk/solr/core/src:
java/org/apache/solr/cloud/ java/org/apache/solr/core/
java/org/apache/solr/handler/ java/org/apache/solr/util/
test/org/apache/solr/handler/
Author: noble
Date: Fri Nov 21 06:04:47 2014
New Revision: 1640857
URL: http://svn.apache.org/r1640857
Log:
SOLR-6533 Added a testcase for config reload, hardened watching for changes
Added:
lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestConfigReload.java (with props)
Modified:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/Config.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/ConfigOverlay.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrConfig.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/CommandOperation.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSolrConfigHandlerConcurrent.java
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1640857&r1=1640856&r2=1640857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Nov 21 06:04:47 2014
@@ -41,7 +41,6 @@ import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
@@ -1222,9 +1221,9 @@ public final class ZkController {
overseerJobQueue.offer(ZkStateReader.toJSON(m));
if(configLocation != null) {
- synchronized (confDirectoryWatchers) {
+ synchronized (confDirectoryListeners) {
log.info("This conf directory is no more watched {0}",configLocation);
- confDirectoryWatchers.remove(configLocation);
+ confDirectoryListeners.remove(configLocation);
}
}
}
@@ -2115,29 +2114,42 @@ public final class ZkController {
*
* @return true on success
*/
- public static boolean persistConfigResourceToZooKeeper( SolrResourceLoader loader, int znodeVersion , String resourceName, byte[] content, boolean createIfNotExists) {
+ public static boolean persistConfigResourceToZooKeeper( SolrResourceLoader loader, int znodeVersion ,
+ String resourceName, byte[] content,
+ boolean createIfNotExists) {
final ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader)loader;
final ZkController zkController = zkLoader.getZkController();
final SolrZkClient zkClient = zkController.getZkClient();
final String resourceLocation = zkLoader.getConfigSetZkPath() + "/" + resourceName;
- String errMsg = "Failed to persist resource at {0} - version mismatch";
+ String errMsg = "Failed to persist resource at {0} - version mismatch {1}";
try {
try {
zkClient.setData(resourceLocation , content,znodeVersion, true);
+ zkClient.setData(zkLoader.getConfigSetZkPath(),new byte[]{0},true);
} catch (NoNodeException e) {
if(createIfNotExists){
try {
zkClient.create(resourceLocation,content, CreateMode.PERSISTENT,true);
+ zkClient.setData(zkLoader.getConfigSetZkPath(), new byte[]{0}, true);
} catch (KeeperException.NodeExistsException nee) {
- log.info(MessageFormat.format(errMsg,resourceLocation));
- throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, MessageFormat.format(errMsg,resourceLocation) + ", retry.");
+ try {
+ Stat stat = zkClient.exists(resourceLocation, null, true);
+ log.info("failed to set data version in zk is {} and expected version is {} ", stat.getVersion(),znodeVersion);
+ } catch (Exception e1) {
+ log.warn("could not get stat");
+ }
+
+ log.info(MessageFormat.format(errMsg,resourceLocation,znodeVersion));
+ throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, MessageFormat.format(errMsg,resourceLocation,znodeVersion) + ", retry.");
}
}
}
} catch (KeeperException.BadVersionException bve){
log.info(MessageFormat.format(errMsg,resourceLocation));
- throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, MessageFormat.format(errMsg,resourceLocation) + ", retry.");
+ throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, MessageFormat.format(errMsg,resourceLocation,znodeVersion) + ", retry.");
+ }catch (ResourceModifiedInZkException e){
+ throw e;
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt(); // Restore the interrupted status
@@ -2157,9 +2169,13 @@ public final class ZkController {
public void unRegisterConfListener(Runnable listener) {
if(listener == null) return;
- synchronized (confDirectoryWatchers){
- for (Set<Runnable> runnables : confDirectoryWatchers.values()) {
- if(runnables != null) runnables.remove(listener);
+ synchronized (confDirectoryListeners){
+ for (Set<Runnable> listeners : confDirectoryListeners.values()) {
+ if(listeners != null) {
+ if(listeners.remove(listener)) {
+ log.info(" a listener was removed because of core close");
+ }
+ }
}
}
@@ -2172,9 +2188,9 @@ public final class ZkController {
*/
public void registerConfListenerForCore(String confDir,SolrCore core, final Runnable listener){
if(listener==null) throw new NullPointerException("listener cannot be null");
- synchronized (confDirectoryWatchers){
- if(confDirectoryWatchers.containsKey(confDir)){
- confDirectoryWatchers.get(confDir).add(listener);
+ synchronized (confDirectoryListeners){
+ if(confDirectoryListeners.containsKey(confDir)){
+ confDirectoryListeners.get(confDir).add(listener);
core.addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {
@@ -2184,69 +2200,76 @@ public final class ZkController {
@Override
public void postClose(SolrCore core) { }
});
-
-
} else {
throw new SolrException(ErrorCode.SERVER_ERROR,"This conf directory is not valid");
}
}
}
- private Map<String , Set<Runnable>> confDirectoryWatchers = new HashMap<>();
- void watchZKConfDir(final String zkDir) {
+ private final Map<String , Set<Runnable>> confDirectoryListeners = new HashMap<>();
- if(!confDirectoryWatchers.containsKey(zkDir)){
- confDirectoryWatchers.put(zkDir,new HashSet<Runnable>());
- }else{
- //it's already watched
- return;
- }
+ void watchZKConfDir(final String zkDir) {
+ log.info("watch zkdir " + zkDir);
+ if (!confDirectoryListeners.containsKey(zkDir)) {
+ confDirectoryListeners.put(zkDir, new HashSet<Runnable>());
+ setConfWatcher(zkDir, new WatcherImpl(zkDir));
- Watcher watcher = new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- try {
- synchronized (confDirectoryWatchers) {
- // if this is not among directories to be watched then don't set the watcher anymore
- if(!confDirectoryWatchers.containsKey(zkDir)) return;
- }
+ }
- if (event.getType() == Event.EventType.NodeChildrenChanged) {
- synchronized (confDirectoryWatchers) {
- final Set<Runnable> listeners = confDirectoryWatchers.get(zkDir);
- if (listeners != null) {
- new Thread() {
- @Override
- public synchronized void run() {
- //running in a separate thread so that the zk event thread is not
- // unnecessarily held up
- for (Runnable listener : listeners) listener.run();
- }
- }.start();
- }
- }
- }
- } finally {
- if (Event.EventType.None.equals(event.getType())) {
+ }
+ private class WatcherImpl implements Watcher{
+ private final String zkDir ;
+
+ private WatcherImpl(String dir) {
+ this.zkDir = dir;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ try {
+
+ synchronized (confDirectoryListeners) {
+ // if this is not among directories to be watched then don't set the watcher anymore
+ if( !confDirectoryListeners.containsKey(zkDir)) {
+ log.info("Watcher on {} is removed ", zkDir);
return;
- } else {
- setConfWatcher(zkDir,this);
}
+ final Set<Runnable> listeners = confDirectoryListeners.get(zkDir);
+ if (listeners != null && !listeners.isEmpty()) {
+ new Thread() {
+ //run these in a separate thread because this can be long running
+ public void run() {
+ for (final Runnable listener : listeners)
+ try {
+ listener.run();
+ } catch (Exception e) {
+ log.warn("listener throws error", e);
+ }
+ }
+ }.start();
+ }
+
}
- }
- };
- setConfWatcher(zkDir,watcher);
+ } finally {
+ if (Event.EventType.None.equals(event.getType())) {
+ log.info("A node got unwatched for {}", zkDir);
+ return;
+ } else {
+ setConfWatcher(zkDir,this);
+ }
+ }
+ }
}
private void setConfWatcher(String zkDir, Watcher watcher) {
try {
- zkClient.getChildren(zkDir,watcher,true);
+ zkClient.exists(zkDir,watcher,true);
} catch (KeeperException e) {
log.error("failed to set watcher for conf dir {} ", zkDir);
} catch (InterruptedException e) {
- Thread.interrupted();
+ Thread.currentThread().interrupt();
log.error("failed to set watcher for conf dir {} ", zkDir);
}
}
@@ -2255,11 +2278,10 @@ public final class ZkController {
return new OnReconnect() {
@Override
public void command() {
- synchronized (confDirectoryWatchers){
- for (String s : confDirectoryWatchers.keySet()) {
+ synchronized (confDirectoryListeners){
+ for (String s : confDirectoryListeners.keySet()) {
watchZKConfDir(s);
}
-
}
}
};
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java?rev=1640857&r1=1640856&r2=1640857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java Fri Nov 21 06:04:47 2014
@@ -111,10 +111,6 @@ public class ZkSolrResourceLoader extend
}
- public ZkByteArrayInputStream(byte[] buf, int offset, int length, Stat stat) {
- super(buf, offset, length);
- this.stat = stat;
- }
public Stat getStat(){
return stat;
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/Config.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/Config.java?rev=1640857&r1=1640856&r2=1640857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/Config.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/Config.java Fri Nov 21 06:04:47 2014
@@ -18,6 +18,7 @@
package org.apache.solr.core;
import org.apache.lucene.util.Version;
+import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException;
import org.apache.solr.util.DOMUtil;
import org.apache.solr.util.SystemIdResolver;
@@ -48,6 +49,7 @@ import javax.xml.xpath.XPathExpressionEx
import javax.xml.xpath.XPathFactory;
import java.io.IOException;
+import java.io.InputStream;
import java.text.ParseException;
import java.util.Arrays;
import java.util.HashSet;
@@ -74,6 +76,7 @@ public class Config {
private final String prefix;
private final String name;
private final SolrResourceLoader loader;
+ private int zkVersion = -1;
/**
* Builds a config from a resource name with no xpath prefix.
@@ -113,9 +116,14 @@ public class Config {
this.prefix = (prefix != null && !prefix.endsWith("/"))? prefix + '/' : prefix;
try {
javax.xml.parsers.DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
-
+
if (is == null) {
- is = new InputSource(loader.openConfig(name));
+ InputStream in = loader.openConfig(name);
+ if (in instanceof ZkSolrResourceLoader.ZkByteArrayInputStream) {
+ zkVersion = ((ZkSolrResourceLoader.ZkByteArrayInputStream) in).getStat().getVersion();
+ log.info("loaded config {} with version {} ",name,zkVersion);
+ }
+ is = new InputSource(in);
is.setSystemId(SystemIdResolver.createSystemIdFromResourceName(name));
}
@@ -464,6 +472,12 @@ public class Config {
return version;
}
+ /**If this config is loaded from zk the version is relevant other wise -1 is returned
+ */
+ public int getZnodeVersion(){
+ return zkVersion;
+ }
+
public Config getOriginalConfig() {
return new Config(loader, null, origDoc);
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/ConfigOverlay.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/ConfigOverlay.java?rev=1640857&r1=1640856&r2=1640857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/ConfigOverlay.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/ConfigOverlay.java Fri Nov 21 06:04:47 2014
@@ -34,7 +34,7 @@ import org.noggit.JSONParser;
import org.noggit.JSONWriter;
import org.noggit.ObjectBuilder;
-public class ConfigOverlay {
+public class ConfigOverlay implements MapSerializable{
private final int znodeVersion ;
private Map<String, Object> data;
private Map<String,Object> props;
@@ -177,13 +177,6 @@ public class ConfigOverlay {
return out.toString();
}
- public Map toOutputFormat() {
- Map result = new LinkedHashMap();
- result.put("version",znodeVersion);
- result.putAll(data);
- return result;
- }
-
public static final String RESOURCE_NAME = "configoverlay.json";
@@ -254,4 +247,12 @@ public class ConfigOverlay {
public Map<String, Object> getUserProps() {
return userProps;
}
+
+ @Override
+ public Map<String, Object> toMap() {
+ Map result = new LinkedHashMap();
+ result.put("znodeVersion",znodeVersion);
+ result.putAll(data);
+ return result;
+ }
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrConfig.java?rev=1640857&r1=1640856&r2=1640857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrConfig.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrConfig.java Fri Nov 21 06:04:47 2014
@@ -328,7 +328,7 @@ public class SolrConfig extends Config i
}
}
- private static ConfigOverlay getConfigOverlay(SolrResourceLoader loader) {
+ public static ConfigOverlay getConfigOverlay(SolrResourceLoader loader) {
InputStream in = null;
try {
in = loader.openResource(ConfigOverlay.RESOURCE_NAME);
@@ -712,6 +712,7 @@ public class SolrConfig extends Config i
@Override
public Map<String, Object> toMap() {
LinkedHashMap result = new LinkedHashMap();
+ if(getZnodeVersion() > -1) result.put("znodeVersion",getZnodeVersion());
result.put("luceneMatchVersion",luceneMatchVersion);
result.put("updateHandler", getUpdateHandlerInfo().toMap());
Map m = new LinkedHashMap();
@@ -780,6 +781,7 @@ public class SolrConfig extends Config i
public ConfigOverlay getOverlay() {
if(overlay ==null) {
overlay = getConfigOverlay(getResourceLoader());
+ log.info("$$$overlay_version "+ overlay.getZnodeVersion());
}
return overlay;
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java?rev=1640857&r1=1640856&r2=1640857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java Fri Nov 21 06:04:47 2014
@@ -19,8 +19,7 @@ package org.apache.solr.handler;
import java.io.IOException;
-import java.net.URL;
-import java.text.MessageFormat;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -28,30 +27,23 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.lucene.analysis.util.ResourceLoader;
-import org.apache.lucene.analysis.util.ResourceLoaderAware;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.core.CloseHook;
import org.apache.solr.core.ConfigOverlay;
+import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
-import org.apache.solr.core.SolrInfoMBean;
import org.apache.solr.core.SolrResourceLoader;
-import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.response.SolrQueryResponse;
@@ -94,30 +86,60 @@ public class SolrConfigHandler extends R
public void inform(final SolrCore core) {
if( ! (core.getResourceLoader() instanceof ZkSolrResourceLoader)) return;
final ZkSolrResourceLoader zkSolrResourceLoader = (ZkSolrResourceLoader) core.getResourceLoader();
- if(zkSolrResourceLoader != null){
- Runnable listener = new Runnable() {
- @Override
- public void run() {
- try {
- if(core.isClosed()) return;
- Stat stat = zkSolrResourceLoader.getZkController().getZkClient().exists((zkSolrResourceLoader).getConfigSetZkPath() + "/" + ConfigOverlay.RESOURCE_NAME, null, true);
- if(stat == null) return;
- if (stat.getVersion() > core.getSolrConfig().getOverlay().getZnodeVersion()) {
- core.getCoreDescriptor().getCoreContainer().reload(core.getName());
+ if(zkSolrResourceLoader != null)
+ zkSolrResourceLoader.getZkController().registerConfListenerForCore(
+ zkSolrResourceLoader.getConfigSetZkPath(),
+ core,
+ getListener(core, zkSolrResourceLoader));
+
+ }
+
+ private static Runnable getListener(SolrCore core, ZkSolrResourceLoader zkSolrResourceLoader) {
+ final String coreName = core.getName();
+ final CoreContainer cc = core.getCoreDescriptor().getCoreContainer();
+ final String overlayPath = (zkSolrResourceLoader).getConfigSetZkPath() + "/" + ConfigOverlay.RESOURCE_NAME;
+ final String solrConfigPath = (zkSolrResourceLoader).getConfigSetZkPath() + "/" + core.getSolrConfig().getName();
+ return new Runnable() {
+ @Override
+ public void run() {
+ log.info("config update_listener called");
+ SolrZkClient zkClient = cc.getZkController().getZkClient();
+ int solrConfigversion,overlayVersion;
+ try (SolrCore core = cc.getCore(coreName)) {
+ if (core.isClosed()) return;
+ solrConfigversion = core.getSolrConfig().getOverlay().getZnodeVersion();
+ overlayVersion = core.getSolrConfig().getZnodeVersion();
+ }
+
+ if (checkStale(zkClient, overlayPath, solrConfigversion) ||
+ checkStale(zkClient, solrConfigPath, overlayVersion)) {
+ log.info("core reload");
+ cc.reload(coreName);
}
- } catch (KeeperException.NoNodeException nne){
- //no problem
- } catch (KeeperException e) {
- log.error("error refreshing solrconfig ", e);
- } catch (InterruptedException e) {
- Thread.currentThread().isInterrupted();
}
- }
- };
+ };
+ }
- zkSolrResourceLoader.getZkController().registerConfListenerForCore(zkSolrResourceLoader.getConfigSetZkPath(), core,listener);
+ private static boolean checkStale(SolrZkClient zkClient, String zkPath, int currentVersion) {
+ try {
+ Stat stat = zkClient.exists(zkPath, null, true);
+ if(stat == null){
+ if(currentVersion>0) return true;
+ return false;
+ }
+ if (stat.getVersion() > currentVersion) {
+ log.info(zkPath+" is stale will need an update from {} to {}", currentVersion,stat.getVersion());
+ return true;
+ }
+ return false;
+ } catch (KeeperException.NoNodeException nne){
+ //no problem
+ } catch (KeeperException e) {
+ log.error("error refreshing solrconfig ", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().isInterrupted();
}
-
+ return false;
}
@@ -136,8 +158,7 @@ public class SolrConfigHandler extends R
String path = (String) req.getContext().get("path");
if(path == null) path="/config";
if("/config/overlay".equals(path)){
- resp.add("overlay", req.getCore().getSolrConfig().getOverlay().toOutputFormat());
- return;
+ resp.add("overlay", req.getCore().getSolrConfig().getOverlay().toMap());
} else {
List<String> parts =StrUtils.splitSmart(path, '/');
if(parts.get(0).isEmpty()) parts.remove(0);
@@ -152,13 +173,32 @@ public class SolrConfigHandler extends R
private void handlePOST() throws IOException {
- Iterable<ContentStream> streams = req.getContentStreams();
- if(streams == null ){
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "missing content stream");
- }
+ Iterable<ContentStream> streams = req.getContentStreams();
+ if (streams == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "missing content stream");
+ }
+ ArrayList<CommandOperation> ops = new ArrayList<>();
+
+ for (ContentStream stream : streams)
+ ops.addAll(CommandOperation.parse(stream.getReader()));
+ List<Map> errList = CommandOperation.captureErrors(ops);
+ if(!errList.isEmpty()) {
+ resp.add(CommandOperation.ERR_MSGS,errList);
+ return;
+ }
+
try {
- for (ContentStream stream : streams) {
- runCommandsTillSuccess(stream);
+ for (;;) {
+ ArrayList<CommandOperation> opsCopy = new ArrayList<>(ops.size());
+ ConfigOverlay overlay = SolrConfig.getConfigOverlay(req.getCore().getResourceLoader());
+ for (CommandOperation op : ops) opsCopy.add(op.getCopy());
+ try {
+ handleCommands(opsCopy, overlay);
+ break;
+ } catch (ZkController.ResourceModifiedInZkException e) {
+ //retry
+ log.info("Race condition, the node is modified in ZK by someone else " +e.getMessage());
+ }
}
} catch (Exception e) {
resp.setException(e);
@@ -167,30 +207,21 @@ public class SolrConfigHandler extends R
}
- private void runCommandsTillSuccess(ContentStream stream) throws IOException {
- for (;;) {
- try {
- handleCommands(stream);
- break;
- } catch (ZkController.ResourceModifiedInZkException e) {
- log.info(e.getMessage());
-
- }
- }
- }
-
- private void handleCommands( ContentStream stream) throws IOException {
- ConfigOverlay overlay = req.getCore().getSolrConfig().getOverlay();
- List<CommandOperation> ops = CommandOperation.parse(stream.getReader());
+ private void handleCommands(List<CommandOperation> ops, ConfigOverlay overlay ) throws IOException {
for (CommandOperation op : ops) {
- if(SET_PROPERTY.equals( op.name) ){
- overlay = applySetProp(op, overlay);
- }else if(UNSET_PROPERTY.equals(op.name)){
- overlay = applyUnset(op,overlay);
- }else if(SET_USER_PROPERTY.equals(op.name)){
- overlay = applySetUserProp(op ,overlay);
- }else if(UNSET_USER_PROPERTY.equals(op.name)){
- overlay = applyUnsetUserProp(op, overlay);
+ switch (op.name) {
+ case SET_PROPERTY:
+ overlay = applySetProp(op, overlay);
+ break;
+ case UNSET_PROPERTY:
+ overlay = applyUnset(op, overlay);
+ break;
+ case SET_USER_PROPERTY:
+ overlay = applySetUserProp(op, overlay);
+ break;
+ case UNSET_USER_PROPERTY:
+ overlay = applyUnsetUserProp(op, overlay);
+ break;
}
}
List errs = CommandOperation.captureErrors(ops);
@@ -204,21 +235,6 @@ public class SolrConfigHandler extends R
ZkController.persistConfigResourceToZooKeeper(loader,overlay.getZnodeVersion(),
ConfigOverlay.RESOURCE_NAME,overlay.toByteArray(),true);
- String collectionName = req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName();
- Map map = ZkNodeProps.makeMap(CoreAdminParams.ACTION, CollectionParams.CollectionAction.RELOAD.toString() ,
- CollectionParams.NAME, collectionName);
-
- SolrQueryRequest solrQueryRequest = new LocalSolrQueryRequest(req.getCore(), new MapSolrParams(map));
- SolrQueryResponse tmpResp = new SolrQueryResponse();
- try {
- //doing a collection reload
- req.getCore().getCoreDescriptor().getCoreContainer().getCollectionsHandler().handleRequestBody(solrQueryRequest,tmpResp);
- } catch (Exception e) {
- String msg = MessageFormat.format("Unable to reload collection {0}", collectionName);
- log.error(msg);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, msg);
- }
-
} else {
SolrResourceLoader.persistConfLocally(loader, ConfigOverlay.RESOURCE_NAME, overlay.toByteArray());
req.getCore().getCoreDescriptor().getCoreContainer().reload(req.getCore().getName());
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/CommandOperation.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/CommandOperation.java?rev=1640857&r1=1640856&r2=1640857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/CommandOperation.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/CommandOperation.java Fri Nov 21 06:04:47 2014
@@ -210,5 +210,8 @@ public class CommandOperation {
}
}
+ public CommandOperation getCopy(){
+ return new CommandOperation(name,commandData);
+ }
}
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestConfigReload.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestConfigReload.java?rev=1640857&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestConfigReload.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestConfigReload.java Fri Nov 21 06:04:47 2014
@@ -0,0 +1,142 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.util.EntityUtils;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.core.SolrConfig;
+import org.apache.solr.util.RESTfulServerProvider;
+import org.apache.solr.util.RestTestHarness;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.noggit.JSONParser;
+import org.noggit.ObjectBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Arrays.asList;
+import static org.apache.solr.core.ConfigOverlay.getObjectByPath;
+
+public class TestConfigReload extends AbstractFullDistribZkTestBase {
+
+
+ static final Logger log = LoggerFactory.getLogger(TestConfigReload.class);
+ private List<RestTestHarness> restTestHarnesses = new ArrayList<>();
+
+ private void setupHarnesses() {
+ for (final SolrServer client : clients) {
+ RestTestHarness harness = new RestTestHarness(new RESTfulServerProvider() {
+ @Override
+ public String getBaseURL() {
+ return ((HttpSolrServer)client).getBaseURL();
+ }
+ });
+ restTestHarnesses.add(harness);
+ }
+ }
+
+ @Override
+ public void doTest() throws Exception {
+ setupHarnesses();
+ reloadTest();
+ }
+
+ private void reloadTest() throws Exception {
+ SolrZkClient client = cloudClient.getZkStateReader().getZkClient();
+ log.info("live_nodes_count : " + cloudClient.getZkStateReader().getClusterState().getLiveNodes());
+ String confPath = ZkController.CONFIGS_ZKNODE+"/conf1/";
+// checkConfReload(client, confPath + ConfigOverlay.RESOURCE_NAME, "overlay");
+ checkConfReload(client, confPath + SolrConfig.DEFAULT_CONF_FILE,"solrConfig", "/config");
+
+ }
+
+ private void checkConfReload(SolrZkClient client, String resPath, String name, String uri) throws Exception {
+ Stat stat = new Stat();
+ byte[] data = null;
+ try {
+ data = client.getData(resPath, null, stat, true);
+ } catch (KeeperException.NoNodeException e) {
+ data = "{}".getBytes(StandardCharsets.UTF_8);
+ log.info("creating_node {}",resPath);
+ client.create(resPath,data, CreateMode.PERSISTENT,true);
+ }
+ long startTime = System.nanoTime();
+ Stat newStat = client.setData(resPath, data, true);
+ client.setData("/configs/conf1", new byte[]{1}, true);
+ assertTrue(newStat.getVersion() > stat.getVersion());
+ log.info("new_version "+ newStat.getVersion());
+ Integer newVersion = newStat.getVersion();
+ long maxTimeoutSeconds = 20;
+ DocCollection coll = cloudClient.getZkStateReader().getClusterState().getCollection("collection1");
+ List<String> urls = new ArrayList<>();
+ for (Slice slice : coll.getSlices()) {
+ for (Replica replica : slice.getReplicas())
+ urls.add(""+replica.get(ZkStateReader.BASE_URL_PROP) + "/"+replica.get(ZkStateReader.CORE_NAME_PROP));
+ }
+ HashSet<String> succeeded = new HashSet<>();
+
+ while ( TimeUnit.SECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS) < maxTimeoutSeconds){
+ Thread.sleep(50);
+ for (String url : urls) {
+ Map respMap = getAsMap(url+uri+"?wt=json");
+ if(String.valueOf(newVersion).equals(String.valueOf( getObjectByPath(respMap, true, asList(name, "znodeVersion"))))){
+ succeeded.add(url);
+ }
+ }
+ if(succeeded.size() == urls.size()) break;
+ succeeded.clear();
+ }
+ assertEquals(MessageFormat.format("tried these servers {0} succeeded only in {1} ", urls,succeeded) , urls.size(), succeeded.size());
+ }
+
+ private Map getAsMap(String uri) throws Exception {
+ HttpGet get = new HttpGet(uri) ;
+ HttpEntity entity = null;
+ try {
+ entity = cloudClient.getLbServer().getHttpClient().execute(get).getEntity();
+ String response = EntityUtils.toString(entity, StandardCharsets.UTF_8);
+ return (Map) ObjectBuilder.getVal(new JSONParser(new StringReader(response)));
+ } finally {
+ EntityUtils.consumeQuietly(entity);
+ }
+ }
+
+
+
+
+}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSolrConfigHandlerConcurrent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSolrConfigHandlerConcurrent.java?rev=1640857&r1=1640856&r2=1640857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSolrConfigHandlerConcurrent.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestSolrConfigHandlerConcurrent.java Fri Nov 21 06:04:47 2014
@@ -67,7 +67,6 @@ public class TestSolrConfigHandlerConcur
@Override
public void doTest() throws Exception {
-
Map editable_prop_map = (Map) new ObjectBuilder(new JSONParser(new StringReader(
ConfigOverlay.MAPPING))).getObject();
Map caches = (Map) editable_prop_map.get("query");
@@ -147,7 +146,7 @@ public class TestSolrConfigHandlerConcur
RestTestHarness harness = restTestHarnesses.get(r.nextInt(restTestHarnesses.size()));
long startTime = System.nanoTime();
boolean success = false;
- long maxTimeoutSeconds = 10;
+ long maxTimeoutSeconds = 20;
while ( TimeUnit.SECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS) < maxTimeoutSeconds) {
Thread.sleep(100);
errmessages.clear();
@@ -155,19 +154,19 @@ public class TestSolrConfigHandlerConcur
Map m = (Map) respMap.get("overlay");
if(m!= null) m = (Map) m.get("props");
if(m == null) {
- errmessages.add(MessageFormat.format( "overlay does not exist for cache: {} , iteration: {} response {} ", cacheName, i, respMap.toString()));
+ errmessages.add(MessageFormat.format( "overlay does not exist for cache: {0} , iteration: {1} response {2} ", cacheName, i, respMap.toString()));
continue;
}
Object o = getObjectByPath(m, true, asList("query", cacheName, "size"));
- if(!val1.equals(o)) errmessages.add(MessageFormat.format("'size' property not set, expected = {}, actual {}", val1,o));
+ if(!val1.equals(o)) errmessages.add(MessageFormat.format("'size' property not set, expected = {0}, actual {1}", val1,o));
o = getObjectByPath(m, true, asList("query", cacheName, "initialSize"));
- if(!val2.equals(o)) errmessages.add(MessageFormat.format("'initialSize' property not set, expected = {}, actual {}", val2,o));
+ if(!val2.equals(o)) errmessages.add(MessageFormat.format("'initialSize' property not set, expected = {0}, actual {1}", val2,o));
o = getObjectByPath(m, true, asList("query", cacheName, "autowarmCount"));
- if(!val3.equals(o)) errmessages.add(MessageFormat.format("'autowarmCount' property not set, expected = {}, actual {}", val3,o));
+ if(!val3.equals(o)) errmessages.add(MessageFormat.format("'autowarmCount' property not set, expected = {0}, actual {1}", val3,o));
if(errmessages.isEmpty()) break;
}
if(!errmessages.isEmpty()) {