You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2015/03/31 18:18:19 UTC
svn commit: r1670381 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/core/
core/src/java/org/apache/solr/handler/ core/src/test/org/apache/solr/core/
core/src/test/org/apache/solr/handler/ solrj/src/ja...
Author: noble
Date: Tue Mar 31 16:18:18 2015
New Revision: 1670381
URL: http://svn.apache.org/r1670381
Log:
SOLR-6924: The config API forcefully refreshes all replicas in the collection to ensure all are
updated
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/ConfigOverlay.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/RequestParams.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrConfig.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReqParamsAPI.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1670381&r1=1670380&r2=1670381&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Tue Mar 31 16:18:18 2015
@@ -344,6 +344,9 @@ Bug Fixes
* SOLR-7309: Make bin/solr, bin/post work when Solr installation directory contains spaces
(Ramkumar Aiyengar, Martijn Koster)
+* SOLR-6924: The config API forcefully refreshes all replicas in the collection to ensure all are
+ updated (Noble Paul)
+
Optimizations
----------------------
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1670381&r1=1670380&r2=1670381&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Tue Mar 31 16:18:18 2015
@@ -2175,9 +2175,10 @@ public final class ZkController {
*
* @return true on success
*/
- public static boolean persistConfigResourceToZooKeeper(ZkSolrResourceLoader zkLoader, int znodeVersion,
+ public static int persistConfigResourceToZooKeeper(ZkSolrResourceLoader zkLoader, int znodeVersion,
String resourceName, byte[] content,
boolean createIfNotExists) {
+ int latestVersion = znodeVersion;
final ZkController zkController = zkLoader.getZkController();
final SolrZkClient zkClient = zkController.getZkClient();
final String resourceLocation = zkLoader.getConfigSetZkPath() + "/" + resourceName;
@@ -2185,17 +2186,19 @@ public final class ZkController {
try {
try {
zkClient.setData(resourceLocation, content, znodeVersion, true);
+ latestVersion = znodeVersion + 1;// if the set succeeded , it should have incremented the version by one always
log.info("Persisted config data to node {} ", resourceLocation);
touchConfDir(zkLoader);
} catch (NoNodeException e) {
if (createIfNotExists) {
try {
zkClient.create(resourceLocation, content, CreateMode.PERSISTENT, true);
+ latestVersion = 0;//just created so version must be zero
touchConfDir(zkLoader);
} catch (KeeperException.NodeExistsException nee) {
try {
Stat stat = zkClient.exists(resourceLocation, null, true);
- log.info("failed to set data version in zk is {0} and expected version is {1} ", stat.getVersion(), znodeVersion);
+ log.info("failed to set data version in zk is {} and expected version is {} ", stat.getVersion(), znodeVersion);
} catch (Exception e1) {
log.warn("could not get stat");
}
@@ -2227,7 +2230,7 @@ public final class ZkController {
log.error(msg, e);
throw new SolrException(ErrorCode.SERVER_ERROR, msg, e);
}
- return true;
+ return latestVersion;
}
public static void touchConfDir(ZkSolrResourceLoader zkLoader) {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/ConfigOverlay.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/ConfigOverlay.java?rev=1670381&r1=1670380&r2=1670381&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/ConfigOverlay.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/ConfigOverlay.java Tue Mar 31 16:18:18 2015
@@ -187,14 +187,14 @@ public class ConfigOverlay implements Ma
public static final String RESOURCE_NAME = "configoverlay.json";
- private static final Long STR_ATTR = 0L;
+ /*private static final Long STR_ATTR = 0L;
private static final Long STR_NODE = 1L;
private static final Long BOOL_ATTR = 10L;
private static final Long BOOL_NODE = 11L;
private static final Long INT_ATTR = 20L;
private static final Long INT_NODE = 21L;
private static final Long FLOAT_ATTR = 30L;
- private static final Long FLOAT_NODE = 31L;
+ private static final Long FLOAT_NODE = 31L;*/
private static Map editable_prop_map;
//The path maps to the xml xpath and value of 1 means it is a tag with a string value and value
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/RequestParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/RequestParams.java?rev=1670381&r1=1670380&r2=1670381&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/RequestParams.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/RequestParams.java Tue Mar 31 16:18:18 2015
@@ -148,6 +148,7 @@ public class RequestParams implements Ma
ZkSolrResourceLoader resourceLoader = (ZkSolrResourceLoader) loader;
try {
Stat stat = resourceLoader.getZkController().getZkClient().exists(resourceLoader.getConfigSetZkPath() + "/" + RequestParams.RESOURCE, null, true);
+ log.debug("latest version of {} in ZK is : {}", resourceLoader.getConfigSetZkPath() + "/" + RequestParams.RESOURCE, stat == null ? "": stat.getVersion());
if (stat == null) {
requestParams = new RequestParams(Collections.EMPTY_MAP, -1);
} else if (requestParams == null || stat.getVersion() > requestParams.getZnodeVersion()) {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrConfig.java?rev=1670381&r1=1670380&r2=1670381&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrConfig.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrConfig.java Tue Mar 31 16:18:18 2015
@@ -77,6 +77,7 @@ import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static org.apache.solr.core.ConfigOverlay.ZNODEVER;
import static org.apache.solr.core.SolrConfig.PluginOpts.LAZY;
import static org.apache.solr.core.SolrConfig.PluginOpts.MULTI_OK;
import static org.apache.solr.core.SolrConfig.PluginOpts.NOOP;
@@ -819,7 +820,7 @@ public class SolrConfig extends Config i
@Override
public Map<String, Object> toMap() {
LinkedHashMap result = new LinkedHashMap();
- if (getZnodeVersion() > -1) result.put("znodeVersion", getZnodeVersion());
+ if (getZnodeVersion() > -1) result.put(ZNODEVER, getZnodeVersion());
result.put("luceneMatchVersion", luceneMatchVersion);
result.put("updateHandler", getUpdateHandlerInfo().toMap());
Map m = new LinkedHashMap();
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1670381&r1=1670380&r2=1670381&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java Tue Mar 31 16:18:18 2015
@@ -66,6 +66,7 @@ import org.apache.lucene.search.BooleanQ
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.cloud.CloudDescriptor;
@@ -132,7 +133,6 @@ import org.apache.solr.update.processor.
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.update.processor.UpdateRequestProcessorChain.ProcessorInfo;
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
-import org.apache.solr.util.ConcurrentLRUCache;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.PropertiesInputStream;
import org.apache.solr.util.RefCounted;
@@ -2077,13 +2077,13 @@ public final class SolrCore implements S
HashMap<String, QueryResponseWriter> m= new HashMap<>();
m.put("xml", new XMLResponseWriter());
m.put("standard", m.get("xml"));
- m.put("json", new JSONResponseWriter());
+ m.put(CommonParams.JSON, new JSONResponseWriter());
m.put("python", new PythonResponseWriter());
m.put("php", new PHPResponseWriter());
m.put("phps", new PHPSerializedResponseWriter());
m.put("ruby", new RubyResponseWriter());
m.put("raw", new RawResponseWriter());
- m.put("javabin", new BinaryResponseWriter());
+ m.put(CommonParams.JAVABIN, new BinaryResponseWriter());
m.put("csv", new CSVResponseWriter());
m.put("xsort", new SortingResponseWriter());
m.put("schema.xml", new SchemaXmlResponseWriter());
@@ -2463,12 +2463,12 @@ public final class SolrCore implements S
zkSolrResourceLoader.getZkController().registerConfListenerForCore(
zkSolrResourceLoader.getConfigSetZkPath(),
this,
- getListener(this, zkSolrResourceLoader));
+ getConfListener(this, zkSolrResourceLoader));
}
- private static Runnable getListener(SolrCore core, ZkSolrResourceLoader zkSolrResourceLoader) {
+ public static Runnable getConfListener(SolrCore core, ZkSolrResourceLoader zkSolrResourceLoader) {
final String coreName = core.getName();
final CoreContainer cc = core.getCoreDescriptor().getCoreContainer();
final String overlayPath = zkSolrResourceLoader.getConfigSetZkPath() + "/" + ConfigOverlay.RESOURCE_NAME;
@@ -2506,9 +2506,7 @@ public final class SolrCore implements S
cc.reload(coreName);
return;
}
- //some files in conf directoy has changed other than schema.xml,
- // solrconfig.xml. so fire event listeners
-
+ //some files in conf directory may have other than managedschema, overlay, params
try (SolrCore core = cc.solrCores.getCoreFromAnyList(coreName, true)) {
if (core == null || core.isClosed()) return;
for (Runnable listener : core.confListeners) {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java?rev=1670381&r1=1670380&r2=1670381&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java Tue Mar 31 16:18:18 2015
@@ -21,6 +21,7 @@ package org.apache.solr.handler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -29,35 +30,63 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import com.google.common.collect.ImmutableSet;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.cloud.ZkCLI;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.MapSolrParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ContentStream;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.ConfigOverlay;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.ImplicitPlugins;
import org.apache.solr.core.RequestParams;
import org.apache.solr.core.SolrConfig;
+import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrResourceLoader;
+import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestHandler;
+import org.apache.solr.response.BinaryResponseWriter;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.SchemaManager;
import org.apache.solr.util.CommandOperation;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.singletonList;
+import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
import static org.apache.solr.common.params.CoreAdminParams.NAME;
import static org.apache.solr.common.util.StrUtils.formatString;
import static org.apache.solr.core.ConfigOverlay.NOT_EDITABLE;
+import static org.apache.solr.core.ConfigOverlay.ZNODEVER;
import static org.apache.solr.core.SolrConfig.PluginOpts.REQUIRE_CLASS;
import static org.apache.solr.core.SolrConfig.PluginOpts.REQUIRE_NAME;
import static org.apache.solr.core.SolrConfig.PluginOpts.REQUIRE_NAME_IN_OVERLAY;
@@ -67,6 +96,7 @@ public class SolrConfigHandler extends R
public static final Logger log = LoggerFactory.getLogger(SolrConfigHandler.class);
public static final boolean configEditing_disabled = Boolean.getBoolean("disable.configEdit");
private static final Map<String, SolrConfig.SolrPluginInfo> namedPlugins;
+ private Lock reloadLock = new ReentrantLock(true);
static {
Map<String, SolrConfig.SolrPluginInfo> map = new HashMap<>();
@@ -99,7 +129,7 @@ public class SolrConfigHandler extends R
}
- private static class Command {
+ private class Command {
private final SolrQueryRequest req;
private final SolrQueryResponse resp;
private final String method;
@@ -122,6 +152,7 @@ public class SolrConfigHandler extends R
private void handleGET() {
if (parts.size() == 1) {
+ //this is the whole config. sent out the whole payload
resp.add("config", getConfigDetails());
} else {
if (ConfigOverlay.NAME.equals(parts.get(1))) {
@@ -131,9 +162,9 @@ public class SolrConfigHandler extends R
RequestParams params = req.getCore().getSolrConfig().getRequestParams();
MapSolrParams p = params.getParams(parts.get(2));
Map m = new LinkedHashMap<>();
- m.put(ConfigOverlay.ZNODEVER, params.getZnodeVersion());
+ m.put(ZNODEVER, params.getZnodeVersion());
if (p != null) {
- m.put(RequestParams.NAME, ZkNodeProps.makeMap(parts.get(2), p.getMap()));
+ m.put(RequestParams.NAME, makeMap(parts.get(2), p.getMap()));
}
resp.add(SolrQueryResponse.NAME, m);
} else {
@@ -141,8 +172,53 @@ public class SolrConfigHandler extends R
}
} else {
- Map<String, Object> m = getConfigDetails();
- resp.add("config", ZkNodeProps.makeMap(parts.get(1), m.get(parts.get(1))));
+ if (ZNODEVER.equals(parts.get(1))) {
+ resp.add(ZNODEVER, ZkNodeProps.makeMap(
+ ConfigOverlay.NAME, req.getCore().getSolrConfig().getOverlay().getZnodeVersion(),
+ RequestParams.NAME, req.getCore().getSolrConfig().getRequestParams().getZnodeVersion()));
+ boolean checkStale = false;
+ int expectedVersion = req.getParams().getInt(ConfigOverlay.NAME, -1);
+ int actualVersion = req.getCore().getSolrConfig().getOverlay().getZnodeVersion();
+ if (expectedVersion > actualVersion) {
+ log.info("expecting overlay version {} but my version is {}", expectedVersion, actualVersion);
+ checkStale = true;
+ } else if (expectedVersion != -1) {
+ log.info("I already have the expected version {} of config", expectedVersion);
+ }
+ expectedVersion = req.getParams().getInt(RequestParams.NAME, -1);
+ actualVersion = req.getCore().getSolrConfig().getRequestParams().getZnodeVersion();
+ if (expectedVersion > actualVersion) {
+ log.info("expecting params version {} but my version is {}", expectedVersion, actualVersion);
+ checkStale = true;
+ } else if (expectedVersion != -1) {
+ log.info("I already have the expected version {} of params", expectedVersion);
+ }
+ if (checkStale && req.getCore().getResourceLoader() instanceof ZkSolrResourceLoader) {
+ new Thread(SolrConfigHandler.class.getSimpleName() + "-refreshconf") {
+ @Override
+ public void run() {
+ if (!reloadLock.tryLock()) {
+ log.info("Another reload is in progress . Not doing anything");
+ return;
+ }
+ try {
+ log.info("Trying to update my configs");
+ SolrCore.getConfListener(req.getCore(), (ZkSolrResourceLoader) req.getCore().getResourceLoader()).run();
+ } catch (Exception e) {
+ log.error("Unable to refresh conf ", e);
+ } finally {
+ reloadLock.unlock();
+ }
+ }
+ }.start();
+ } else {
+ log.info("checkStale {} , resourceloader {}", checkStale, req.getCore().getResourceLoader().getClass().getName());
+ }
+
+ } else {
+ Map<String, Object> m = getConfigDetails();
+ resp.add("config", makeMap(parts.get(1), m.get(parts.get(1))));
+ }
}
}
}
@@ -277,8 +353,15 @@ public class SolrConfigHandler extends R
if (ops.isEmpty()) {
ZkController.touchConfDir(zkLoader);
} else {
- ZkController.persistConfigResourceToZooKeeper(zkLoader, params.getZnodeVersion(),
- RequestParams.RESOURCE, params.toByteArray(), true);
+ log.info("persisting params version : {}", params.toMap());
+ int latestVersion = ZkController.persistConfigResourceToZooKeeper(zkLoader,
+ params.getZnodeVersion(),
+ RequestParams.RESOURCE,
+ params.toByteArray(), true);
+ waitForAllReplicasState(req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName(),
+ req.getCore().getCoreDescriptor().getCoreContainer().getZkController(),
+ RequestParams.NAME,
+ latestVersion, 30);
}
} else {
@@ -326,17 +409,20 @@ public class SolrConfigHandler extends R
}
List errs = CommandOperation.captureErrors(ops);
if (!errs.isEmpty()) {
- log.info("Failed to run commands errors are {}", StrUtils.join(errs, ','));
+ log.info("Failed to run commands. errors are {}", StrUtils.join(errs, ','));
resp.add(CommandOperation.ERR_MSGS, errs);
return;
}
SolrResourceLoader loader = req.getCore().getResourceLoader();
if (loader instanceof ZkSolrResourceLoader) {
- ZkController.persistConfigResourceToZooKeeper((ZkSolrResourceLoader) loader, overlay.getZnodeVersion(),
+ int latestVersion = ZkController.persistConfigResourceToZooKeeper((ZkSolrResourceLoader) loader, overlay.getZnodeVersion(),
ConfigOverlay.RESOURCE_NAME, overlay.toByteArray(), true);
-
- log.info("Executed config commands successfully and persited to ZK {}", ops);
+ log.info("Executed config commands successfully and persisted to ZK {}", ops);
+ waitForAllReplicasState(req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName(),
+ req.getCore().getCoreDescriptor().getCoreContainer().getZkController(),
+ ConfigOverlay.NAME,
+ latestVersion, 30);
} else {
SolrResourceLoader.persistConfLocally(loader, ConfigOverlay.RESOURCE_NAME, overlay.toByteArray());
req.getCore().getCoreDescriptor().getCoreContainer().reload(req.getCore().getName());
@@ -519,7 +605,7 @@ public class SolrConfigHandler extends R
private static Set<String> subPaths = new HashSet<>(Arrays.asList("/overlay", "/params",
- "/query", "/jmx", "/requestDispatcher"));
+ "/query", "/jmx", "/requestDispatcher", "/znodeVersion"));
static {
for (SolrConfig.SolrPluginInfo solrPluginInfo : SolrConfig.plugins)
@@ -556,4 +642,170 @@ public class SolrConfigHandler extends R
public static final String CREATE = "create";
private static Set<String> cmdPrefixes = ImmutableSet.of(CREATE, UPDATE, "delete", "add");
+ /**
+ * Block up to a specified maximum time until we see agreement on the schema
+ * version in ZooKeeper across all replicas for a collection.
+ */
+ private static void waitForAllReplicasState(String collection,
+ ZkController zkController,
+ String prop,
+ int expectedVersion,
+ int maxWaitSecs) {
+ long startMs = System.currentTimeMillis();
+ // get a list of active replica cores to query for the schema zk version (skipping this core of course)
+ List<PerReplicaCallable> concurrentTasks = new ArrayList<>();
+
+ for (String coreUrl : getActiveReplicaCoreUrls(zkController, collection)) {
+ PerReplicaCallable e = new PerReplicaCallable(coreUrl, prop, expectedVersion, maxWaitSecs);
+ concurrentTasks.add(e);
+ }
+ if (concurrentTasks.isEmpty()) return; // nothing to wait for ...
+
+ log.info(formatString("Waiting up to {0} secs for {1} replicas to set the property {2} to be of version {3} for collection {4}",
+ maxWaitSecs, concurrentTasks.size(), prop, expectedVersion, collection));
+
+ // use an executor service to invoke schema zk version requests in parallel with a max wait time
+ int poolSize = Math.min(concurrentTasks.size(), 10);
+ ExecutorService parallelExecutor =
+ Executors.newFixedThreadPool(poolSize, new DefaultSolrThreadFactory("solrHandlerExecutor"));
+ try {
+ List<Future<Boolean>> results =
+ parallelExecutor.invokeAll(concurrentTasks, maxWaitSecs, TimeUnit.SECONDS);
+
+ // determine whether all replicas have the update
+ List<String> failedList = null; // lazily init'd
+ for (int f = 0; f < results.size(); f++) {
+ Boolean success = false;
+ Future<Boolean> next = results.get(f);
+ if (next.isDone() && !next.isCancelled()) {
+ // looks to have finished, but need to check if it succeeded
+ try {
+ success = next.get();
+ } catch (ExecutionException e) {
+ // shouldn't happen since we checked isCancelled
+ }
+ }
+
+ if (!success) {
+ String coreUrl = concurrentTasks.get(f).coreUrl;
+ log.warn("Core " + coreUrl + "could not get the expected version " + expectedVersion);
+ if (failedList == null) failedList = new ArrayList<>();
+ failedList.add(coreUrl);
+ }
+ }
+
+ // if any tasks haven't completed within the specified timeout, it's an error
+ if (failedList != null)
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ formatString("{0} out of {1} the property {2} to be of version {3} within {4} seconds! Failed cores: {5}",
+ failedList.size(), concurrentTasks.size() + 1, prop, expectedVersion, maxWaitSecs, failedList));
+
+ } catch (InterruptedException ie) {
+ log.warn(formatString(
+ "Core was interrupted . trying to set the property {1} to version {2} to propagate to {3} replicas for collection {4}",
+ prop, expectedVersion, concurrentTasks.size(), collection));
+ Thread.currentThread().interrupt();
+ } finally {
+ if (!parallelExecutor.isShutdown())
+ parallelExecutor.shutdownNow();
+ }
+
+ long diffMs = (System.currentTimeMillis() - startMs);
+ log.info(formatString(
+ "Took {0} secs to set the property {1} to be of version {2} for collection {3}",
+ Math.round(diffMs / 1000d), prop, expectedVersion, collection));
+ }
+
+ public static List<String> getActiveReplicaCoreUrls(ZkController zkController,
+ String collection) {
+ List<String> activeReplicaCoreUrls = new ArrayList<>();
+ ClusterState clusterState = zkController.getZkStateReader().getClusterState();
+ Set<String> liveNodes = clusterState.getLiveNodes();
+ Collection<Slice> activeSlices = clusterState.getActiveSlices(collection);
+ if (activeSlices != null && activeSlices.size() > 0) {
+ for (Slice next : activeSlices) {
+ Map<String, Replica> replicasMap = next.getReplicasMap();
+ if (replicasMap != null) {
+ for (Map.Entry<String, Replica> entry : replicasMap.entrySet()) {
+ Replica replica = entry.getValue();
+ if (ZkStateReader.ACTIVE.equals(replica.getStr(ZkStateReader.STATE_PROP)) &&
+ liveNodes.contains(replica.getNodeName())) {
+ activeReplicaCoreUrls.add(replica.getCoreUrl());
+ }
+ }
+ }
+ }
+ }
+ return activeReplicaCoreUrls;
+ }
+
+ private static class PerReplicaCallable extends SolrRequest implements Callable<Boolean> {
+ String coreUrl;
+ String prop;
+ int expectedZkVersion;
+ Number remoteVersion = null;
+ int maxWait;
+
+ PerReplicaCallable(String coreUrl, String prop, int expectedZkVersion, int maxWait) {
+ super(METHOD.GET, "/config/" + ZNODEVER);
+ this.coreUrl = coreUrl;
+ this.expectedZkVersion = expectedZkVersion;
+ this.prop = prop;
+ this.maxWait = maxWait;
+ }
+
+ @Override
+ public SolrParams getParams() {
+ return new ModifiableSolrParams()
+ .set(prop, expectedZkVersion)
+ .set(CommonParams.WT, CommonParams.JAVABIN);
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ long startTime = System.currentTimeMillis();
+ int attempts = 0;
+ try (HttpSolrClient solr = new HttpSolrClient(coreUrl)) {
+ // eventually, this loop will get killed by the ExecutorService's timeout
+ while (true) {
+ try {
+ long timeElapsed = (System.currentTimeMillis() - startTime) / 1000;
+ if (timeElapsed >= maxWait) {
+ return false;
+ }
+ log.info("Time elapsed : {} secs, maxWait {}", timeElapsed, maxWait);
+ Thread.sleep(100);
+ NamedList<Object> resp = solr.httpUriRequest(this).future.get();
+ if (resp != null) {
+ Map m = (Map) resp.get(ZNODEVER);
+ if (m != null) {
+ remoteVersion = (Number) m.get(prop);
+ if (remoteVersion != null && remoteVersion.intValue() >= expectedZkVersion) break;
+ }
+ }
+
+ attempts++;
+ log.info(formatString("Could not get expectedVersion {0} from {1} for prop {2} after {3} attempts", expectedZkVersion, coreUrl, prop, attempts));
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ break; // stop looping
+ } else {
+ log.warn("Failed to get /schema/zkversion from " + coreUrl + " due to: " + e);
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public Collection<ContentStream> getContentStreams() throws IOException {
+ return null;
+ }
+
+ @Override
+ protected SolrResponse createResponse(SolrClient client) {
+ return null;
+ }
+ }
}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java?rev=1670381&r1=1670380&r2=1670381&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java Tue Mar 31 16:18:18 2015
@@ -21,7 +21,6 @@ package org.apache.solr.core;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
-import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReqParamsAPI.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReqParamsAPI.java?rev=1670381&r1=1670380&r2=1670381&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReqParamsAPI.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReqParamsAPI.java Tue Mar 31 16:18:18 2015
@@ -41,7 +41,6 @@ import static org.apache.solr.handler.Te
* limitations under the License.
*/
-@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-6924")
public class TestReqParamsAPI extends AbstractFullDistribZkTestBase {
static final Logger log = LoggerFactory.getLogger(TestSolrConfigHandlerCloud.class);
private List<RestTestHarness> restTestHarnesses = new ArrayList<>();
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java?rev=1670381&r1=1670380&r2=1670381&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java Tue Mar 31 16:18:18 2015
@@ -21,6 +21,8 @@ import org.noggit.JSONUtil;
import java.util.Map;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
public class Replica extends ZkNodeProps {
private final String name;
@@ -35,6 +37,9 @@ public class Replica extends ZkNodeProps
public String getName() {
return name;
}
+ public String getCoreUrl() {
+ return ZkCoreNodeProps.getCoreUrl(getStr(BASE_URL_PROP), getStr(CORE_NAME_PROP));
+ }
/** The name of the node this replica resides on */
public String getNodeName() {
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java?rev=1670381&r1=1670380&r2=1670381&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java Tue Mar 31 16:18:18 2015
@@ -224,5 +224,9 @@ public interface CommonParams {
* When querying a node, prefer local node's cores for distributed queries.
*/
public static final String PREFER_LOCAL_SHARDS = "preferLocalShards";
+
+ public static final String JAVABIN = "javabin";
+
+ public static final String JSON = "json";
}