You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2014/07/03 13:13:36 UTC
svn commit: r1607587 [1/2] - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/ core/src/java/org/apache/solr/cloud/
core/src/java/org/apache/solr/handler/admin/
core/src/java/org/apache/solr/servlet/ core/src/java/org/apache/solr/util/
core/...
Author: noble
Date: Thu Jul 3 11:13:35 2014
New Revision: 1607587
URL: http://svn.apache.org/r1607587
Log:
reverting SOLR-5473 , SOLR-5474
Removed:
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/SolrLogFormatter.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/SolrLogLayout.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AssignTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java
lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Thu Jul 3 11:13:35 2014
@@ -70,10 +70,6 @@ New Features
* SOLR-6103: Added DateRangeField for indexing date ranges, especially
multi-valued ones. Based on LUCENE-5648. (David Smiley)
-* SOLR-5473: Make one state.json per collection (noble, shalin, Timothy Potter ,Jessica Cheng, Anshum Gupta, Mark Miller)
-
-* SOLR-5474: Add stateFormat=2 support to CloudSolrServer (Timothy Potter , noble , Jessica Cheng)
-
Other Changes
----------------------
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/SolrLogFormatter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/SolrLogFormatter.java?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/SolrLogFormatter.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/SolrLogFormatter.java Thu Jul 3 11:13:35 2014
@@ -263,7 +263,7 @@ sb.append("(group_name=").append(tg.getN
private Map<String,Object> getReplicaProps(ZkController zkController, SolrCore core) {
final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- Replica replica = zkController.getClusterState().getReplica(collection, core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName(),true);
+ Replica replica = zkController.getClusterState().getReplica(collection, core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
if(replica!=null) {
return replica.getProperties();
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Thu Jul 3 11:13:35 2014
@@ -33,7 +33,6 @@ import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -105,8 +104,6 @@ public class Overseer {
private Map clusterProps;
private boolean isClosed = false;
- private final Map<String, Object> updateNodes = new ConcurrentHashMap<>();
- private boolean isClusterStateModified = false;
public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) {
@@ -120,7 +117,6 @@ public class Overseer {
this.myId = myId;
this.reader = reader;
clusterProps = reader.getClusterProps();
- reader.setEphemeralCollectionData(Collections.unmodifiableMap(updateNodes));
}
public Stats getStateUpdateQueueStats() {
@@ -261,7 +257,6 @@ public class Overseer {
stateUpdateQueue.poll();
if (isClosed || System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS)) break;
- if(!updateNodes.isEmpty()) break;
// if an event comes in the next 100ms batch it together
head = stateUpdateQueue.peek(100);
}
@@ -301,28 +296,8 @@ public class Overseer {
TimerContext timerContext = stats.time("update_state");
boolean success = false;
try {
- if(!updateNodes.isEmpty()) {
- for (Entry<String, Object> e : updateNodes.entrySet()) {
- if (e.getValue() == null) {
- if (zkClient.exists(e.getKey(), true)) zkClient.delete(e.getKey(), 0, true);
- } else {
- if (zkClient.exists(e.getKey(), true)) {
- log.info("going to update_collection {}", e.getKey());
- zkClient.setData(e.getKey(), ZkStateReader.toJSON(e.getValue()), true);
- } else {
- log.info("going to create_collection {}", e.getValue());
- zkClient.create(e.getKey(), ZkStateReader.toJSON(e.getValue()), CreateMode.PERSISTENT, true);
- }
- }
- }
- updateNodes.clear();
- }
-
- if(isClusterStateModified) {
- lastUpdatedTime = System.nanoTime();
- zkClient.setData(ZkStateReader.CLUSTER_STATE, ZkStateReader.toJSON(clusterState), true);
- isClusterStateModified = false;
- }
+ zkClient.setData(ZkStateReader.CLUSTER_STATE, ZkStateReader.toJSON(clusterState), true);
+ lastUpdatedTime = System.nanoTime();
success = true;
} finally {
timerContext.stop();
@@ -724,7 +699,7 @@ public class Overseer {
}
Slice slice = clusterState.getSlice(collection, sliceName);
-
+
Map<String,Object> replicaProps = new LinkedHashMap<>();
replicaProps.putAll(message.getProperties());
@@ -742,7 +717,7 @@ public class Overseer {
replicaProps.remove(ZkStateReader.SHARD_ID_PROP);
replicaProps.remove(ZkStateReader.COLLECTION_PROP);
replicaProps.remove(QUEUE_OPERATION);
-
+
// remove any props with null values
Set<Entry<String,Object>> entrySet = replicaProps.entrySet();
List<String> removeKeys = new ArrayList<>();
@@ -890,18 +865,10 @@ public class Overseer {
}
collectionProps.put(DocCollection.DOC_ROUTER, routerSpec);
- if(message.getStr("fromApi") == null) collectionProps.put("autoCreated","true");
- String znode = message.getInt(DocCollection.STATE_FORMAT, 1) == 1 ? null : ZkStateReader.getCollectionPath(collectionName);
- DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router , -1,znode);
- isClusterStateModified = true;
- log.info("state version {} {}", collectionName, newCollection.getStateFormat());
- if (newCollection.getStateFormat() > 1) {
- updateNodes.put(ZkStateReader.getCollectionPath(collectionName),
- new ClusterState(-1, Collections.<String>emptySet(), singletonMap(newCollection.getName(), newCollection), state.getStateReader()));
- return state;
+ if(message.getStr("fromApi") == null) collectionProps.put("autoCreated","true");
+ DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router);
+ return newState(state, singletonMap(newCollection.getName(), newCollection));
}
- return newState(state, singletonMap(newCollection.getName(), newCollection));
- }
/*
* Return an already assigned id or null if not assigned
@@ -938,27 +905,30 @@ public class Overseer {
}
return null;
}
-
+
private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) {
// System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates()));
// System.out.println("Updating slice:" + slice);
- DocCollection newCollection = null;
+
DocCollection coll = state.getCollectionOrNull(collectionName) ;
Map<String,Slice> slices;
+ Map<String,Object> props;
+ DocRouter router;
+
if (coll == null) {
// when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself
// without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router.
- slices = new LinkedHashMap<>(1);
- slices.put(slice.getName(), slice);
- Map<String,Object> props = new HashMap<>(1);
+ slices = new HashMap<>(1);
+ props = new HashMap<>(1);
props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name",ImplicitDocRouter.NAME));
- newCollection = new DocCollection(collectionName, slices, props, new ImplicitDocRouter());
+ router = new ImplicitDocRouter();
} else {
+ props = coll.getProperties();
+ router = coll.getRouter();
slices = new LinkedHashMap<>(coll.getSlicesMap()); // make a shallow copy
- slices.put(slice.getName(), slice);
- newCollection = coll.copyWith(slices);
}
-
+ slices.put(slice.getName(), slice);
+ DocCollection newCollection = new DocCollection(collectionName, slices, props, router);
// System.out.println("###!!!### NEW CLUSTERSTATE: " + JSONUtil.toJSON(newCollections));
@@ -1017,54 +987,27 @@ public class Overseer {
}
- DocCollection newCollection = coll.copyWith(slices);
+ DocCollection newCollection = new DocCollection(coll.getName(), slices, coll.getProperties(), coll.getRouter());
return newState(state, singletonMap(collectionName, newCollection));
}
- private ClusterState newState(ClusterState state, Map<String, DocCollection> colls) {
- for (Entry<String, DocCollection> e : colls.entrySet()) {
- DocCollection c = e.getValue();
- if (c == null) {
- isClusterStateModified = true;
- state = state.copyWith(singletonMap(e.getKey(), (DocCollection) null));
- continue;
- }
-
- if (c.getStateFormat() >1) {
- state.getStateReader().updateWatchedCollection(c);
- updateNodes.put(ZkStateReader.getCollectionPath(c.getName()), new ClusterState(-1, Collections.<String>emptySet(), singletonMap(c.getName(), c), state.getStateReader()));
- } else {
- isClusterStateModified = true;
- state = state.copyWith(singletonMap(e.getKey(), c));
- }
+ private ClusterState newState(ClusterState state, Map<String, DocCollection> colls) {
+ return state.copyWith(colls);
}
- return state;
- }
-
- /*
- * Remove collection from cloudstate
- */
- private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) {
- final String collection = message.getStr("name");
- if (!checkKeyExistence(message, "name")) return clusterState;
- DocCollection coll = clusterState.getCollectionOrNull(collection);
- if(coll !=null) {
- isClusterStateModified = true;
- if(coll.getStateFormat()>1){
- try {
- log.info("Deleting state for collection : {}", collection);
- zkClient.delete(ZkStateReader.getCollectionPath(collection),-1,true);
- } catch (Exception e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,"Unable to remove collection state :"+collection);
- }
- return clusterState;
- } else{
+ /*
+ * Remove collection from cloudstate
+ */
+ private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) {
+ final String collection = message.getStr("name");
+ if (!checkKeyExistence(message, "name")) return clusterState;
+ DocCollection coll = clusterState.getCollectionOrNull(collection);
+ if(coll !=null) {
return clusterState.copyWith(singletonMap(collection,(DocCollection)null));
}
+ return clusterState;
}
- return clusterState;
- }
+
/*
* Remove collection slice from cloudstate
*/
@@ -1080,7 +1023,7 @@ public class Overseer {
Map<String, Slice> newSlices = new LinkedHashMap<>(coll.getSlicesMap());
newSlices.remove(sliceId);
- DocCollection newCollection = coll.copyWith(newSlices);
+ DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
return newState(clusterState, singletonMap(collection,newCollection));
}
@@ -1092,6 +1035,8 @@ public class Overseer {
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return clusterState;
+// final Map<String, DocCollection> newCollections = new LinkedHashMap<>(clusterState.getCollectionStates()); // shallow copy
+// DocCollection coll = newCollections.get(collection);
DocCollection coll = clusterState.getCollectionOrNull(collection) ;
if (coll == null) {
// TODO: log/error that we didn't find it?
@@ -1129,7 +1074,7 @@ public class Overseer {
newSlices.put(slice.getName(), slice);
}
}
-
+
if (lastSlice) {
// remove all empty pre allocated slices
for (Slice slice : coll.getSlices()) {
@@ -1146,7 +1091,7 @@ public class Overseer {
// but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or
// ZkController out of the Overseer.
try {
- zkClient.delete("/collections/" + collection,-1,true);
+ zkClient.clean("/collections/" + collection);
} catch (InterruptedException e) {
SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collection, e);
Thread.currentThread().interrupt();
@@ -1156,8 +1101,8 @@ public class Overseer {
return newState(clusterState,singletonMap(collection, (DocCollection) null));
} else {
- DocCollection newCollection = coll.copyWith(newSlices);
- return newState(clusterState,singletonMap(collection,newCollection));
+ DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
+ return newState(clusterState,singletonMap(collection,newCollection));
}
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Thu Jul 3 11:13:35 2014
@@ -707,13 +707,7 @@ public class OverseerCollectionProcessor
Set<String> collections = clusterState.getCollections();
for (String name : collections) {
Map<String, Object> collectionStatus = null;
- if (clusterState.getCollection(name).getStateFormat()>1) {
- bytes = ZkStateReader.toJSON(clusterState.getCollection(name));
- Map<String, Object> docCollection = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
- collectionStatus = getCollectionStatus(docCollection, name, shard);
- } else {
- collectionStatus = getCollectionStatus((Map<String, Object>) stateMap.get(name), name, shard);
- }
+ collectionStatus = getCollectionStatus((Map<String, Object>) stateMap.get(name), name, shard);
if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
collectionStatus.put("aliases", collectionVsAliases.get(name));
}
@@ -722,12 +716,8 @@ public class OverseerCollectionProcessor
} else {
String routeKey = message.getStr(ShardParams._ROUTE_);
Map<String, Object> docCollection = null;
- if (clusterState.getCollection(collection).getStateFormat()>1 ) {
- bytes = ZkStateReader.toJSON(clusterState.getCollection(collection));
- docCollection = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
- } else {
- docCollection = (Map<String, Object>) stateMap.get(collection);
- }
+
+ docCollection = (Map<String, Object>) stateMap.get(collection);
if (routeKey == null) {
Map<String, Object> collectionStatus = getCollectionStatus(docCollection, collection, shard);
if (collectionVsAliases.containsKey(collection) && !collectionVsAliases.get(collection).isEmpty()) {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Thu Jul 3 11:13:35 2014
@@ -1109,16 +1109,6 @@ public final class ZkController {
}
CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
- boolean removeWatch = true;
- for (SolrCore solrCore : cc.getCores()) {//if there is no SolrCoe which is a member of this collection, remove the watch
- CloudDescriptor cloudDesc = solrCore.getCoreDescriptor().getCloudDescriptor();
- if (cloudDesc != null && cloudDescriptor.getCollectionName().equals(cloudDesc.getCollectionName())) {
- //means
- removeWatch = false;
- break;
- }
- }
- if(removeWatch) zkStateReader.removeZKWatch(collection);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
Overseer.DELETECORE, ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.NODE_NAME_PROP, getNodeName(),
@@ -1421,10 +1411,6 @@ public final class ZkController {
publish(cd, ZkStateReader.DOWN, false, true);
DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(cd.getCloudDescriptor().getCollectionName());
- if(collection !=null && collection.getStateFormat() >1 ){
- log.info("Registering watch for collection {}",cd.getCloudDescriptor().getCollectionName());
- zkStateReader.addCollectionWatch(cd.getCloudDescriptor().getCollectionName());
- }
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Thu Jul 3 11:13:35 2014
@@ -143,7 +143,7 @@ public class CollectionsHandler extends
if (action == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown action: " + a);
}
-
+
switch (action) {
case CREATE: {
this.handleCreateAction(req, rsp);
@@ -320,36 +320,36 @@ public class CollectionsHandler extends
SolrQueryResponse rsp) throws KeeperException, InterruptedException {
handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT);
}
-
+
private void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException {
long time = System.nanoTime();
if(m.containsKey(ASYNC) && m.get(ASYNC) != null) {
-
+
String asyncId = m.getStr(ASYNC);
-
+
if(asyncId.equals("-1")) {
throw new SolrException(ErrorCode.BAD_REQUEST, "requestid can not be -1. It is reserved for cleanup purposes.");
}
-
+
NamedList<String> r = new NamedList<>();
-
+
if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) ||
overseerCollectionQueueContains(asyncId)) {
r.add("error", "Task with the same requestid already exists.");
-
+
} else {
coreContainer.getZkController().getOverseerCollectionQueue()
.offer(ZkStateReader.toJSON(m));
}
r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
SolrResponse response = new OverseerSolrResponse(r);
-
+
rsp.getValues().addAll(response.getResponse());
-
+
return;
}
@@ -380,27 +380,27 @@ public class CollectionsHandler extends
}
}
}
-
+
private void handleReloadAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
log.info("Reloading Collection : " + req.getParamString());
String name = req.getParams().required().get("name");
-
+
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.RELOADCOLLECTION, "name", name);
handleResponse(OverseerCollectionProcessor.RELOADCOLLECTION, m, rsp);
}
-
+
private void handleSyncShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException, SolrServerException, IOException {
log.info("Syncing shard : " + req.getParamString());
String collection = req.getParams().required().get("collection");
String shard = req.getParams().required().get("shard");
-
+
ClusterState clusterState = coreContainer.getZkController().getClusterState();
-
+
ZkNodeProps leaderProps = clusterState.getLeader(collection, shard);
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
-
+
HttpSolrServer server = new HttpSolrServer(nodeProps.getBaseUrl());
try {
server.setConnectionTimeout(15000);
@@ -414,36 +414,36 @@ public class CollectionsHandler extends
server.shutdown();
}
}
-
+
private void handleCreateAliasAction(SolrQueryRequest req,
SolrQueryResponse rsp) throws Exception {
log.info("Create alias action : " + req.getParamString());
String name = req.getParams().required().get("name");
String collections = req.getParams().required().get("collections");
-
+
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.CREATEALIAS, "name", name, "collections",
collections);
-
+
handleResponse(OverseerCollectionProcessor.CREATEALIAS, m, rsp);
}
-
+
private void handleDeleteAliasAction(SolrQueryRequest req,
SolrQueryResponse rsp) throws Exception {
log.info("Delete alias action : " + req.getParamString());
String name = req.getParams().required().get("name");
-
+
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.DELETEALIAS, "name", name);
-
+
handleResponse(OverseerCollectionProcessor.DELETEALIAS, m, rsp);
}
private void handleDeleteAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
log.info("Deleting Collection : " + req.getParamString());
-
+
String name = req.getParams().required().get("name");
-
+
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.DELETECOLLECTION, "name", name);
@@ -464,7 +464,7 @@ public class CollectionsHandler extends
throw new SolrException(ErrorCode.BAD_REQUEST,
"Collection name is required to create a new collection");
}
-
+
Map<String,Object> props = ZkNodeProps.makeMap(
Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.CREATECOLLECTION,
@@ -477,7 +477,6 @@ public class CollectionsHandler extends
MAX_SHARDS_PER_NODE,
CREATE_NODE_SET ,
SHARDS_PROP,
- DocCollection.STATE_FORMAT,
ASYNC,
"router.");
@@ -554,7 +553,7 @@ public class CollectionsHandler extends
log.info("Deleting Shard : " + req.getParamString());
String name = req.getParams().required().get(ZkStateReader.COLLECTION_PROP);
String shard = req.getParams().required().get(ZkStateReader.SHARD_ID_PROP);
-
+
Map<String,Object> props = new HashMap<>();
props.put(ZkStateReader.COLLECTION_PROP, name);
props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.DELETESHARD);
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java Thu Jul 3 11:13:35 2014
@@ -324,7 +324,6 @@ public class SolrDispatchFilter extends
String coreUrl = getRemotCoreUrl(cores, corename, origCorename);
// don't proxy for internal update requests
SolrParams queryParams = SolrRequestParsers.parseQueryString(req.getQueryString());
- checkStateIsValid(cores, queryParams.get(CloudSolrServer.STATE_VERSION));
if (coreUrl != null
&& queryParams
.get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null) {
@@ -380,7 +379,6 @@ public class SolrDispatchFilter extends
if( "/select".equals( path ) || "/select/".equals( path ) ) {
solrReq = parser.parse( core, path, req );
- checkStateIsValid(cores,solrReq.getParams().get(CloudSolrServer.STATE_VERSION));
String qt = solrReq.getParams().get( CommonParams.QT );
handler = core.getRequestHandler( qt );
if( handler == null ) {
@@ -470,22 +468,6 @@ public class SolrDispatchFilter extends
chain.doFilter(request, response);
}
- private void checkStateIsValid(CoreContainer cores, String stateVer) {
- if(stateVer != null && !stateVer.isEmpty() && cores.isZooKeeperAware() ){
- // many have multiple collections separated by |
- String[] pairs = StringUtils.split(stateVer, '|');
- for (String pair : pairs) {
- String[] pcs = StringUtils.split(pair, ':');
- if(pcs.length == 2 && !pcs[0].isEmpty() && !pcs[1].isEmpty()){
- Boolean status = cores.getZkController().getZkStateReader().checkValid(pcs[0],Integer.parseInt(pcs[1]));
-
- if(Boolean.TRUE != status){
- throw new SolrException(ErrorCode.INVALID_STATE, "STATE STALE: " + pair+ "valid : "+status);
- }
- }
- }
- }
- }
private void processAliases(SolrQueryRequest solrReq, Aliases aliases,
List<String> collectionsList) {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java Thu Jul 3 11:13:35 2014
@@ -17,7 +17,27 @@
package org.apache.solr.servlet;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
import org.apache.lucene.util.BytesRef;
+import org.noggit.CharArr;
+import org.noggit.JSONWriter;
+import org.noggit.ObjectBuilder;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
@@ -26,29 +46,13 @@ import org.apache.solr.common.params.Sol
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.FastWriter;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
import org.noggit.CharArr;
import org.noggit.JSONWriter;
-import org.noggit.ObjectBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-import java.util.Date;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
/**
* Zookeeper Info
@@ -86,6 +90,7 @@ public final class ZookeeperInfoServlet
String path = params.get("path");
String addr = params.get("addr");
+ boolean all = "true".equals(params.get("all"));
if (addr != null && addr.length() == 0) {
addr = null;
@@ -105,7 +110,6 @@ public final class ZookeeperInfoServlet
ZKPrinter printer = new ZKPrinter(response, out, cores.getZkController(), addr);
printer.detail = detail;
printer.dump = dump;
- printer.isTreeView = (params.get("wt") == null); // this is hacky but tree view requests don't come in with the wt set
try {
printer.print(path);
@@ -136,8 +140,6 @@ public final class ZookeeperInfoServlet
boolean detail = false;
boolean dump = false;
- boolean isTreeView = false;
-
String addr; // the address passed to us
String keeperAddr; // the address we're connected to
@@ -383,47 +385,6 @@ public final class ZookeeperInfoServlet
dataStrErr = "data is not parsable as a utf8 String: " + e.toString();
}
}
- // pull in external collections too
- if (ZkStateReader.CLUSTER_STATE.equals(path) && !isTreeView) {
- SortedMap<String,Object> collectionStates = null;
- List<String> children = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, null, true);
- java.util.Collections.sort(children);
- for (String collection : children) {
- String collStatePath = ZkStateReader.getCollectionPath(collection);
- String childDataStr = null;
- try {
- byte[] childData = zkClient.getData(collStatePath, null, null, true);
- if (childData != null) {
- childDataStr = (new BytesRef(childData)).utf8ToString();
- }
- } catch (KeeperException.NoNodeException nne) {
- // safe to ignore
- } catch (Exception childErr) {
- log.error("Failed to get "+collStatePath+" due to: "+childErr);
- }
-
- if (childDataStr != null) {
- if (collectionStates == null) {
- // initialize lazily as there may not be any external collections
- collectionStates = new TreeMap<>();
-
- // add the internal collections
- if (dataStr != null)
- collectionStates.putAll((Map<String,Object>)ObjectBuilder.fromJSON(dataStr));
- }
-
- // now add in the external collections
- Map<String,Object> extColl = (Map<String,Object>)ObjectBuilder.fromJSON(childDataStr);
- collectionStates.put(collection, extColl.get(collection));
- }
- }
-
- if (collectionStates != null) {
- CharArr out = new CharArr();
- new JSONWriter(out, 2).write(collectionStates);
- dataStr = out.toString();
- }
- }
json.writeString("znode");
json.writeNameSeparator();
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/SolrLogLayout.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/SolrLogLayout.java?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/SolrLogLayout.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/SolrLogLayout.java Thu Jul 3 11:13:35 2014
@@ -234,7 +234,7 @@ public class SolrLogLayout extends Layou
private Map<String,Object> getReplicaProps(ZkController zkController, SolrCore core) {
final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- Replica replica = zkController.getZkStateReader().getClusterState(). getReplica(collection, zkController.getCoreNodeName(core.getCoreDescriptor()), true);
+ Replica replica = zkController.getClusterState().getReplica(collection, zkController.getCoreNodeName(core.getCoreDescriptor()));
if(replica!=null) {
return replica.getProperties();
}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AssignTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AssignTest.java?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AssignTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AssignTest.java Thu Jul 3 11:13:35 2014
@@ -86,7 +86,7 @@ public class AssignTest extends SolrTest
collectionStates.put(cname, docCollection);
Set<String> liveNodes = new HashSet<>();
- ClusterState state = new ClusterState(-1,liveNodes, collectionStates, ClusterStateTest.getMockZkStateReader(collectionStates.keySet()));
+ ClusterState state = new ClusterState(-1,liveNodes, collectionStates);
String nodeName = Assign.assignNode("collection1", state);
assertEquals("core_node2", nodeName);
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java Thu Jul 3 11:13:35 2014
@@ -62,10 +62,10 @@ public class ClusterStateTest extends So
collectionStates.put("collection2", new DocCollection("collection2", slices, null, DocRouter.DEFAULT));
ZkStateReader zkStateReaderMock = getMockZkStateReader(collectionStates.keySet());
- ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates,zkStateReaderMock);
+ ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates);
byte[] bytes = ZkStateReader.toJSON(clusterState);
// System.out.println("#################### " + new String(bytes));
- ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes,zkStateReaderMock,null);
+ ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes);
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
.getLiveNodes().size());
@@ -73,13 +73,13 @@ public class ClusterStateTest extends So
assertEquals("Poperties not copied properly", replica.getStr("prop1"), loadedClusterState.getSlice("collection1", "shard1").getReplicasMap().get("node1").getStr("prop1"));
assertEquals("Poperties not copied properly", replica.getStr("prop2"), loadedClusterState.getSlice("collection1", "shard1").getReplicasMap().get("node1").getStr("prop2"));
- loadedClusterState = ClusterState.load(-1, new byte[0], liveNodes, getMockZkStateReader(Collections.<String>emptySet()),null );
+ loadedClusterState = ClusterState.load(-1, new byte[0], liveNodes);
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
.getLiveNodes().size());
assertEquals("Should not have collections", 0, loadedClusterState.getCollections().size());
- loadedClusterState = ClusterState.load(-1, (byte[])null, liveNodes,getMockZkStateReader(Collections.<String>emptySet()),null);
+ loadedClusterState = ClusterState.load(-1, (byte[])null, liveNodes);
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
.getLiveNodes().size());
@@ -89,13 +89,6 @@ public class ClusterStateTest extends So
public static ZkStateReader getMockZkStateReader(final Set<String> collections) {
ZkStateReader mock = createMock(ZkStateReader.class);
EasyMock.reset(mock);
- mock.getAllCollections();
- EasyMock.expectLastCall().andAnswer(new IAnswer<Set<String>>() {
- @Override
- public Set<String> answer() throws Throwable {
- return collections;
- }
- }).anyTimes();
EasyMock.replay(mock);
return mock;
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java Thu Jul 3 11:13:35 2014
@@ -50,9 +50,9 @@ public class SliceStateTest extends Solr
collectionStates.put("collection1", new DocCollection("collection1", slices, null, DocRouter.DEFAULT));
ZkStateReader mockZkStateReader = ClusterStateTest.getMockZkStateReader(collectionStates.keySet());
- ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates, mockZkStateReader);
+ ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates);
byte[] bytes = ZkStateReader.toJSON(clusterState);
- ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes, mockZkStateReader,null);
+ ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes);
assertEquals("Default state not set to active", "active", loadedClusterState.getSlice("collection1", "shard1").getState());
}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Thu Jul 3 11:13:35 2014
@@ -18,9 +18,7 @@ package org.apache.solr.client.solrj.imp
*/
import java.io.IOException;
-import java.net.ConnectException;
import java.net.MalformedURLException;
-import java.net.SocketException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -32,16 +30,13 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
-import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpClient;
-import org.apache.http.conn.ConnectTimeoutException;
import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
@@ -72,8 +67,6 @@ import org.apache.solr.common.util.Named
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* SolrJ client class to communicate with SolrCloud.
@@ -86,8 +79,6 @@ import org.slf4j.LoggerFactory;
* with {@link #setIdField(String)}.
*/
public class CloudSolrServer extends SolrServer {
- private static final Logger log = LoggerFactory.getLogger(CloudSolrServer.class);
-
private volatile ZkStateReader zkStateReader;
private String zkHost; // the zk server address
private int zkConnectTimeout = 10000;
@@ -96,8 +87,6 @@ public class CloudSolrServer extends Sol
private final LBHttpSolrServer lbServer;
private final boolean shutdownLBHttpSolrServer;
private HttpClient myClient;
- //no of times collection state to be reloaded if stale state error is received
- private static final int MAX_STALE_RETRIES = 5;
Random rand = new Random();
private final boolean updatesToLeaders;
@@ -106,7 +95,6 @@ public class CloudSolrServer extends Sol
.newCachedThreadPool(new SolrjNamedThreadFactory(
"CloudSolrServer ThreadPool"));
private String idField = "id";
- public static final String STATE_VERSION = "_stateVer_";
private final Set<String> NON_ROUTABLE_PARAMS;
{
NON_ROUTABLE_PARAMS = new HashSet<>();
@@ -124,36 +112,8 @@ public class CloudSolrServer extends Sol
// NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
}
- private volatile long timeToLive = 60* 1000L;
-
-
- protected Map<String, ExpiringCachedDocCollection> collectionStateCache = new ConcurrentHashMap<String, ExpiringCachedDocCollection>(){
- @Override
- public ExpiringCachedDocCollection get(Object key) {
- ExpiringCachedDocCollection val = super.get(key);
- if(val == null) return null;
- if(val.isExpired(timeToLive)) {
- super.remove(key);
- return null;
- }
- return val;
- }
-
- };
- class ExpiringCachedDocCollection {
- DocCollection cached;
- long cachedAt;
- ExpiringCachedDocCollection(DocCollection cached) {
- this.cached = cached;
- this.cachedAt = System.currentTimeMillis();
- }
-
- boolean isExpired(long timeToLive) {
- return (System.currentTimeMillis() - cachedAt) > timeToLive;
- }
- }
/**
* @param zkHost The client endpoint of the zookeeper quorum containing the cloud state,
@@ -167,8 +127,6 @@ public class CloudSolrServer extends Sol
this.lbServer.setParser(new BinaryResponseParser());
this.updatesToLeaders = true;
shutdownLBHttpSolrServer = true;
- setupStateVerParamOnQueryString(lbServer);
-
}
public CloudSolrServer(String zkHost, boolean updatesToLeaders)
@@ -180,15 +138,6 @@ public class CloudSolrServer extends Sol
this.lbServer.setParser(new BinaryResponseParser());
this.updatesToLeaders = updatesToLeaders;
shutdownLBHttpSolrServer = true;
- setupStateVerParamOnQueryString(lbServer);
- }
-
- /**Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json
- * @param seconds ttl value in seconds
- */
- public void setCollectionCacheTTl(int seconds){
- assert seconds > 0;
- timeToLive = seconds*1000L;
}
/**
@@ -201,7 +150,6 @@ public class CloudSolrServer extends Sol
this.lbServer = lbServer;
this.updatesToLeaders = true;
shutdownLBHttpSolrServer = false;
- setupStateVerParamOnQueryString(lbServer);
}
/**
@@ -215,24 +163,8 @@ public class CloudSolrServer extends Sol
this.lbServer = lbServer;
this.updatesToLeaders = updatesToLeaders;
shutdownLBHttpSolrServer = false;
- setupStateVerParamOnQueryString(lbServer);
-
}
- /**
- * Used internally to setup the _stateVer_ param to be sent in the query string of requests
- * coming from this instance.
- */
- protected void setupStateVerParamOnQueryString(LBHttpSolrServer lbServer) {
- // setup the stateVer param to be passed in the query string of every request
- Set<String> queryStringParams = lbServer.getQueryParams();
- if (queryStringParams == null) {
- queryStringParams = new HashSet<String>(2);
- lbServer.setQueryParams(queryStringParams);
- }
- queryStringParams.add(STATE_VERSION);
- }
-
public ResponseParser getParser() {
return lbServer.getParser();
}
@@ -306,7 +238,8 @@ public class CloudSolrServer extends Sol
if (zkStateReader == null) {
ZkStateReader zk = null;
try {
- zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
+ zk = new ZkStateReader(zkHost, zkClientTimeout,
+ zkConnectTimeout);
zk.createClusterStateWatchersAndUpdate();
zkStateReader = zk;
} catch (InterruptedException e) {
@@ -369,7 +302,7 @@ public class CloudSolrServer extends Sol
}
}
- DocCollection col = getDocCollection(clusterState, collection);
+ DocCollection col = clusterState.getCollection(collection);
DocRouter router = col.getRouter();
@@ -586,146 +519,7 @@ public class CloudSolrServer extends Sol
}
@Override
- public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException {
- SolrParams reqParams = request.getParams();
- String collection = (reqParams != null) ? reqParams.get("collection", getDefaultCollection()) : getDefaultCollection();
- return requestWithRetryOnStaleState(request, 0, collection);
- }
-
- /**
- * As this class doesn't watch external collections on the client side,
- * there's a chance that the request will fail due to cached stale state,
- * which means the state must be refreshed from ZK and retried.
- */
- protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest request, int retryCount, String collection)
- throws SolrServerException, IOException {
-
- connect(); // important to call this before you start working with the ZkStateReader
-
- // build up a _stateVer_ param to pass to the server containing all of the
- // external collection state versions involved in this request, which allows
- // the server to notify us that our cached state for one or more of the external
- // collections is stale and needs to be refreshed ... this code has no impact on internal collections
- String stateVerParam = null;
- List<DocCollection> requestedCollections = null;
- if (collection != null && !request.getPath().startsWith("/admin")) { // don't do _stateVer_ checking for admin requests
- Set<String> requestedCollectionNames = getCollectionList(getZkStateReader().getClusterState(), collection);
-
- StringBuilder stateVerParamBuilder = null;
- for (String requestedCollection : requestedCollectionNames) {
- // track the version of state we're using on the client side using the _stateVer_ param
- DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection);
- int collVer = coll.getZNodeVersion();
- if (coll.getStateFormat()>1) {
- if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
- requestedCollections.add(coll);
-
- if (stateVerParamBuilder == null) {
- stateVerParamBuilder = new StringBuilder();
- } else {
- stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
- }
-
- stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
- }
- }
-
- if (stateVerParamBuilder != null) {
- stateVerParam = stateVerParamBuilder.toString();
- }
- }
-
- if (request.getParams() instanceof ModifiableSolrParams) {
- ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
- if (stateVerParam != null) {
- params.set(STATE_VERSION, stateVerParam);
- } else {
- params.remove(STATE_VERSION);
- }
- } // else: ??? how to set this ???
-
- NamedList<Object> resp = null;
- try {
- resp = sendRequest(request);
- } catch (Exception exc) {
-
- Throwable rootCause = SolrException.getRootCause(exc);
- // don't do retry support for admin requests or if the request doesn't have a collection specified
- if (collection == null || request.getPath().startsWith("/admin")) {
- if (exc instanceof SolrServerException) {
- throw (SolrServerException)exc;
- } else if (exc instanceof IOException) {
- throw (IOException)exc;
- }else if (exc instanceof RuntimeException) {
- throw (RuntimeException) exc;
- }
- else {
- throw new SolrServerException(rootCause);
- }
- }
-
- int errorCode = (rootCause instanceof SolrException) ?
- ((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
-
- log.error("Request to collection {} failed due to ("+errorCode+
- ") {}, retry? "+retryCount, collection, rootCause.toString());
-
- boolean wasCommError =
- (rootCause instanceof ConnectException ||
- rootCause instanceof ConnectTimeoutException ||
- rootCause instanceof NoHttpResponseException ||
- rootCause instanceof SocketException);
-
- boolean stateWasStale = false;
- if (retryCount < MAX_STALE_RETRIES &&
- !requestedCollections.isEmpty() &&
- SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE)
- {
- // cached state for one or more external collections was stale
- // re-issue request using updated state
- stateWasStale = true;
-
- // just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence
- for (DocCollection ext : requestedCollections) {
- collectionStateCache.remove(ext.getName());
- }
- }
-
- // if we experienced a communication error, it's worth checking the state
- // with ZK just to make sure the node we're trying to hit is still part of the collection
- if (retryCount < MAX_STALE_RETRIES && !stateWasStale && !requestedCollections.isEmpty() && wasCommError) {
- for (DocCollection ext : requestedCollections) {
- DocCollection latestStateFromZk = getZkStateReader().getCollection(ext.getName());
- if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
- // looks like we couldn't reach the server because the state was stale == retry
- stateWasStale = true;
- // we just pulled state from ZK, so update the cache so that the retry uses it
- collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));
- }
- }
- }
-
- requestedCollections.clear(); // done with this
-
- // if the state was stale, then we retry the request once with new state pulled from Zk
- if (stateWasStale) {
- log.warn("Re-trying request to collection(s) "+collection+" after stale state error from server.");
- resp = requestWithRetryOnStaleState(request, retryCount+1, collection);
- } else {
- if (exc instanceof SolrServerException) {
- throw (SolrServerException)exc;
- } else if (exc instanceof IOException) {
- throw (IOException)exc;
- } else {
- throw new SolrServerException(rootCause);
- }
- }
- }
-
- return resp;
- }
-
- protected NamedList<Object> sendRequest(SolrRequest request)
+ public NamedList<Object> request(SolrRequest request)
throws SolrServerException, IOException {
connect();
@@ -785,7 +579,7 @@ public class CloudSolrServer extends Sol
// add it to the Map of slices.
Map<String,Slice> slices = new HashMap<>();
for (String collectionName : collectionsList) {
- DocCollection col = getDocCollection(clusterState, collectionName);
+ DocCollection col = clusterState.getCollection(collectionName);
Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
ClientUtils.addSlices(slices, collectionName, routeSlices, true);
}
@@ -877,17 +671,14 @@ public class CloudSolrServer extends Sol
Aliases aliases = zkStateReader.getAliases();
String alias = aliases.getCollectionAlias(collectionName);
if (alias != null) {
- List<String> aliasList = StrUtils.splitSmart(alias, ",", true);
+ List<String> aliasList = StrUtils.splitSmart(alias, ",", true);
collectionsList.addAll(aliasList);
continue;
}
-
- DocCollection docCollection = getDocCollection(clusterState, collection);
- if (docCollection == null) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
- }
+
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
}
-
+
collectionsList.add(collectionName);
}
return collectionsList;
@@ -924,28 +715,6 @@ public class CloudSolrServer extends Sol
return updatesToLeaders;
}
- protected DocCollection getDocCollection(ClusterState clusterState, String collection) throws SolrException {
- ExpiringCachedDocCollection cachedState = collectionStateCache != null ? collectionStateCache.get(collection) : null;
- if (cachedState != null && cachedState.cached != null) {
- return cachedState.cached;
- }
-
- DocCollection col = clusterState.getCollectionOrNull(collection);
- if(col == null ) return null;
- collectionStateCache.put(collection, new ExpiringCachedDocCollection(col));
- return col;
- }
-
- /**
- * Extension point to allow sub-classes to override the ZkStateReader this class uses internally.
- */
- protected ZkStateReader createZkStateReader(String zkHost, int zkClientTimeout, int zkConnectTimeout)
- throws InterruptedException, TimeoutException, IOException, KeeperException {
- ZkStateReader zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
- zk.createClusterStateWatchersAndUpdate();
- return zk;
- }
-
/**
* Useful for determining the minimum achieved replication factor across
* all shards involved in processing an update request, typically useful
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java Thu Jul 3 11:13:35 2014
@@ -46,25 +46,36 @@ public class ClusterState implements JSO
private final Map<String, DocCollection> collectionStates; // Map<collectionName, Map<sliceName,Slice>>
private Set<String> liveNodes;
- private final ZkStateReader stateReader;
+
+ /**
+ * Use this constr when ClusterState is meant for publication.
+ *
+ * hashCode and equals will only depend on liveNodes and not clusterStateVersion.
+ */
+ @Deprecated
+ public ClusterState(Set<String> liveNodes,
+ Map<String, DocCollection> collectionStates) {
+ this(null, liveNodes, collectionStates);
+ }
+
+
+
/**
* Use this constr when ClusterState is meant for consumption.
*/
public ClusterState(Integer zkClusterStateVersion, Set<String> liveNodes,
- Map<String, DocCollection> collectionStates, ZkStateReader stateReader) {
- assert stateReader != null;
+ Map<String, DocCollection> collectionStates) {
this.zkClusterStateVersion = zkClusterStateVersion;
this.liveNodes = new HashSet<>(liveNodes.size());
this.liveNodes.addAll(liveNodes);
this.collectionStates = new LinkedHashMap<>(collectionStates.size());
this.collectionStates.putAll(collectionStates);
- this.stateReader = stateReader;
}
public ClusterState copyWith(Map<String,DocCollection> modified){
- ClusterState result = new ClusterState(zkClusterStateVersion, liveNodes,collectionStates,stateReader);
+ ClusterState result = new ClusterState(zkClusterStateVersion, liveNodes,collectionStates);
for (Entry<String, DocCollection> e : modified.entrySet()) {
DocCollection c = e.getValue();
if(c == null) {
@@ -87,10 +98,17 @@ public class ClusterState implements JSO
if (slice == null) return null;
return slice.getLeader();
}
+ private Replica getReplica(DocCollection coll, String replicaName) {
+ if (coll == null) return null;
+ for (Slice slice : coll.getSlices()) {
+ Replica replica = slice.getReplica(replicaName);
+ if (replica != null) return replica;
+ }
+ return null;
+ }
public boolean hasCollection(String coll) {
- if (collectionStates.containsKey(coll)) return true;
- return stateReader.getAllCollections().contains(coll);
+ return collectionStates.containsKey(coll) ;
}
/**
@@ -98,9 +116,8 @@ public class ClusterState implements JSO
* If the slice is known, do not use this method.
* coreNodeName is the same as replicaName
*/
- public Replica getReplica(final String collection, final String coreNodeName, boolean cachedOnly) {
- DocCollection coll = stateReader.getCollection(collection,cachedOnly);
- return coll == null? null: coll.getReplica(coreNodeName);
+ public Replica getReplica(final String collection, final String coreNodeName) {
+ return getReplica(collectionStates.get(collection), coreNodeName);
}
/**
@@ -136,35 +153,6 @@ public class ClusterState implements JSO
return coll.getActiveSlices();
}
- /**
- * Get the {@code DocCollection} object if available. This method will
- * never hit ZooKeeper and attempt to fetch collection from locally available
- * state only.
- *
- * @param collection the name of the collection
- * @return the {@link org.apache.solr.common.cloud.DocCollection} or null if not found
- */
- private DocCollection getCachedCollection(String collection) {
- DocCollection c = collectionStates.get(collection);
- if (c != null) return c;
- if (!stateReader.getAllCollections().contains(collection)) return null;
- return stateReader.getCollection(collection, true); // return from cache
- }
-
- /** expert internal use only
- * Gets the replica from caches by the core name (assuming the slice is unknown) or null if replica is not found.
- * If the slice is known, do not use this method.
- * coreNodeName is the same as replicaName
- */
- public Replica getCachedReplica(String collectionName, String coreNodeName) {
- DocCollection c = getCachedCollection(collectionName);
- if (c == null) return null;
- for (Slice slice : c.getSlices()) {
- Replica replica = slice.getReplica(coreNodeName);
- if (replica != null) return replica;
- }
- return null;
- }
/**
* Get the named DocCollection object, or throw an exception if it doesn't exist.
@@ -175,26 +163,23 @@ public class ClusterState implements JSO
return coll;
}
+
public DocCollection getCollectionOrNull(String coll) {
- DocCollection c = collectionStates.get(coll);
- if (c != null) return c;
- if (!stateReader.getAllCollections().contains(coll)) return null;
- return stateReader.getCollection(coll);
+ return collectionStates.get(coll);
}
/**
* Get collection names.
*/
public Set<String> getCollections() {
- return stateReader.getAllCollections();
+ return collectionStates.keySet();
}
/**
- * For internal use only
* @return Map<collectionName, Map<sliceName,Slice>>
*/
- Map<String, DocCollection> getCollectionStates() {
- return collectionStates;
+ public Map<String, DocCollection> getCollectionStates() {
+ return Collections.unmodifiableMap(collectionStates);
}
/**
@@ -253,7 +238,7 @@ public class ClusterState implements JSO
Stat stat = new Stat();
byte[] state = zkClient.getData(ZkStateReader.CLUSTER_STATE,
null, stat, true);
- return load(stat.getVersion(), state, liveNodes, stateReader, ZkStateReader.CLUSTER_STATE);
+ return load(stat.getVersion(), state, liveNodes);
}
@@ -265,34 +250,25 @@ public class ClusterState implements JSO
* @param version zk version of the clusterstate.json file (bytes)
* @param bytes clusterstate.json as a byte array
* @param liveNodes list of live nodes
- * @param stateReader The ZkStateReader for this clusterstate
- * @param znode the znode from which this data is read from
* @return the ClusterState
*/
- public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes, ZkStateReader stateReader, String znode) {
+ public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes) {
// System.out.println("######## ClusterState.load:" + (bytes==null ? null : new String(bytes)));
if (bytes == null || bytes.length == 0) {
- return new ClusterState(version, liveNodes, Collections.<String, DocCollection>emptyMap(),stateReader);
+ return new ClusterState(version, liveNodes, Collections.<String, DocCollection>emptyMap());
}
Map<String, Object> stateMap = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
Map<String,DocCollection> collections = new LinkedHashMap<>(stateMap.size());
for (Entry<String, Object> entry : stateMap.entrySet()) {
String collectionName = entry.getKey();
- DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version, znode);
+ DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version);
collections.put(collectionName, coll);
}
// System.out.println("######## ClusterState.load result:" + collections);
- return new ClusterState( version, liveNodes, collections,stateReader);
+ return new ClusterState( version, liveNodes, collections);
}
- /**
- * @deprecated use {@link #load(Integer, byte[], Set, ZkStateReader, String)}
- */
- @Deprecated
- public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes){
- return load(version == null ? -1: version, bytes, liveNodes,null,null);
- }
public static Aliases load(byte[] bytes) {
if (bytes == null || bytes.length == 0) {
@@ -303,7 +279,7 @@ public class ClusterState implements JSO
return new Aliases(aliasMap);
}
- private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, Integer version, String znode) {
+ private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, Integer version) {
Map<String,Object> props;
Map<String,Slice> slices;
@@ -330,7 +306,7 @@ public class ClusterState implements JSO
router = DocRouter.getDocRouter((String) routerProps.get("name"));
}
- return new DocCollection(name, slices, props, router, version,znode);
+ return new DocCollection(name, slices, props, router, version);
}
private static Map<String,Slice> makeSlices(Map<String,Object> genericSlices) {
@@ -358,7 +334,7 @@ public class ClusterState implements JSO
*
* @return null if ClusterState was created for publication, not consumption
*/
- public Integer getZnodeVersion() {
+ public Integer getZkClusterStateVersion() {
return zkClusterStateVersion;
}
@@ -388,9 +364,6 @@ public class ClusterState implements JSO
}
- public ZkStateReader getStateReader(){
- return stateReader;
- }
/**
* Internal API used only by ZkStateReader
@@ -399,5 +372,9 @@ public class ClusterState implements JSO
this.liveNodes = liveNodes;
}
+ public DocCollection getCommonCollection(String name){
+ return collectionStates.get(name);
+
+ }
}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java Thu Jul 3 11:13:35 2014
@@ -21,6 +21,7 @@ import org.noggit.JSONUtil;
import org.noggit.JSONWriter;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -32,17 +33,15 @@ import java.util.Map;
public class DocCollection extends ZkNodeProps {
public static final String DOC_ROUTER = "router";
public static final String SHARDS = "shards";
- public static final String STATE_FORMAT = "stateFormat";
- private int znodeVersion;
+ private int version;
private final String name;
private final Map<String, Slice> slices;
private final Map<String, Slice> activeSlices;
private final DocRouter router;
- private final String znode;
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
- this(name, slices, props, router, -1, ZkStateReader.CLUSTER_STATE);
+ this(name, slices, props, router, -1);
}
/**
@@ -50,9 +49,9 @@ public class DocCollection extends ZkNod
* @param slices The logical shards of the collection. This is used directly and a copy is not made.
* @param props The properties of the slice. This is used directly and a copy is not made.
*/
- public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion, String znode) {
+ public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion) {
super( props==null ? props = new HashMap<String,Object>() : props);
- this.znodeVersion = zkVersion;
+ this.version = zkVersion;
this.name = name;
this.slices = slices;
@@ -66,12 +65,8 @@ public class DocCollection extends ZkNod
this.activeSlices.put(slice.getKey(), slice.getValue());
}
this.router = router;
- this.znode = znode == null? ZkStateReader.CLUSTER_STATE : znode;
- assert name != null && slices != null;
- }
- public DocCollection copyWith(Map<String, Slice> slices){
- return new DocCollection(getName(), slices, propMap, router, znodeVersion,znode);
+ assert name != null && slices != null;
}
@@ -115,16 +110,9 @@ public class DocCollection extends ZkNod
return activeSlices;
}
- public int getZNodeVersion(){
- return znodeVersion;
- }
+ public int getVersion(){
+ return version;
- public int getStateFormat(){
- return ZkStateReader.CLUSTER_STATE.equals(znode) ? 1:2;
- }
-
- public String getZNode(){
- return znode;
}
@@ -144,12 +132,4 @@ public class DocCollection extends ZkNod
all.put(SHARDS, slices);
jsonWriter.write(all);
}
-
- public Replica getReplica(String coreNodeName) {
- for (Slice slice : slices.values()) {
- Replica replica = slice.getReplica(coreNodeName);
- if (replica != null) return replica;
- }
- return null;
- }
}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java Thu Jul 3 11:13:35 2014
@@ -564,7 +564,6 @@ public class SolrZkClient {
}
public void close() {
-// log.warn("closed inst :"+inst, new Exception("leakdebug"));
if (isClosed) return; // it's okay if we over close - same as solrcore
isClosed = true;
try {
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Thu Jul 3 11:13:35 2014
@@ -98,16 +98,10 @@ public class ZkStateReader {
public static final String LEADER_ELECT_ZKNODE = "/leader_elect";
public static final String SHARD_LEADERS_ZKNODE = "leaders";
- private final Set<String> watchedCollections = new HashSet<String>();
- /**These are collections which are actively watched by this instance .
- *
- */
- private Map<String , DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>();
- private Set<String> allCollections = Collections.emptySet();
-
+
//
// convenience methods... should these go somewhere else?
//
@@ -168,8 +162,7 @@ public class ZkStateReader {
log.info("path={} {}={} specified config exists in ZooKeeper",
new Object[] {path, CONFIGNAME_PROP, configName});
}
- } else {
- throw new ZooKeeperException(ErrorCode.INVALID_STATE, "No config data found at path: " + path);
+
}
}
catch (KeeperException e) {
@@ -258,21 +251,22 @@ public class ZkStateReader {
return aliases;
}
- public Boolean checkValid(String coll, int version){
+ /*public Boolean checkValid(String coll, int version){
DocCollection collection = clusterState.getCollectionOrNull(coll);
if(collection ==null) return null;
- if(collection.getZNodeVersion() < version){
- log.info("server older than client {}<{}",collection.getZNodeVersion(),version);
- DocCollection nu = getCollectionLive(this, coll);
- if(nu.getZNodeVersion()> collection.getZNodeVersion()){
- updateWatchedCollection(nu);
+ if(collection.getVersion() < version){
+ log.info("server older than client {}<{}",collection.getVersion(),version);
+ DocCollection nu = getExternCollectionFresh(this, coll);
+ if(nu.getVersion()> collection.getVersion()){
+ updateExternCollection(nu);
collection = nu;
}
}
- if(collection.getZNodeVersion() == version) return Boolean.TRUE;
- log.debug("wrong version from client {}!={} ",version, collection.getZNodeVersion());
+ if(collection.getVersion() == version) return Boolean.TRUE;
+ log.info("wrong version from client {}!={} ",version, collection.getVersion());
return Boolean.FALSE;
- }
+
+ }*/
public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
InterruptedException {
@@ -305,11 +299,10 @@ public class ZkStateReader {
byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, stat ,
true);
Set<String> ln = ZkStateReader.this.clusterState.getLiveNodes();
- ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln,ZkStateReader.this, null);
+ ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln);
// update volatile
ZkStateReader.this.clusterState = clusterState;
- updateCollectionNames();
// HashSet<String> all = new HashSet<>(colls);;
// all.addAll(clusterState.getAllInternalCollections());
// all.remove(null);
@@ -384,7 +377,6 @@ public class ZkStateReader {
liveNodeSet.addAll(liveNodes);
ClusterState clusterState = ClusterState.load(zkClient, liveNodeSet, ZkStateReader.this);
this.clusterState = clusterState;
- updateCollectionNames();
zkClient.exists(ALIASES,
new Watcher() {
@@ -430,40 +422,6 @@ public class ZkStateReader {
}, true);
}
updateAliases();
- //on reconnect of SolrZkClient re-add watchers for the watched external collections
- synchronized (this) {
- for (String watchedCollection : watchedCollections) {
- addZkWatch(watchedCollection);
- }
- }
- }
-
- public void updateCollectionNames() throws KeeperException, InterruptedException {
- Set<String> colls = getExternColls();
- colls.addAll(clusterState.getCollectionStates().keySet());
- allCollections = Collections.unmodifiableSet(colls);
- }
-
- private Set<String> getExternColls() throws KeeperException, InterruptedException {
- List<String> children = null;
- try {
- children = zkClient.getChildren(COLLECTIONS_ZKNODE, null, true);
- } catch (KeeperException.NoNodeException e) {
- log.warn("Error fetching collection names");
-
- return new HashSet<>();
- }
- if (children == null || children.isEmpty()) return new HashSet<>();
- HashSet<String> result = new HashSet<>(children.size());
-
- for (String c : children) {
- try {
- if (zkClient.exists(getCollectionPath(c), true)) result.add(c);
- } catch (Exception e) {
- log.warn("Error reading collections nodes", e);
- }
- }
- return result;
}
@@ -482,7 +440,7 @@ public class ZkStateReader {
liveNodesSet.addAll(liveNodes);
if (!onlyLiveNodes) {
- log.debug("Updating cloud state from ZooKeeper... ");
+ log.info("Updating cloud state from ZooKeeper... ");
clusterState = ClusterState.load(zkClient, liveNodesSet,this);
} else {
@@ -491,7 +449,6 @@ public class ZkStateReader {
clusterState.setLiveNodes(liveNodesSet);
}
this.clusterState = clusterState;
- updateCollectionNames();
}
} else {
@@ -550,13 +507,9 @@ public class ZkStateReader {
}
}, SOLRCLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS);
}
- synchronized (this) {
- for (String watchedCollection : watchedCollections) {
- watchedCollectionStates.put(watchedCollection, getCollectionLive(this, watchedCollection));
- }
- }
- }
+ }
+
/**
* @return information about the cluster from ZooKeeper
*/
@@ -679,9 +632,6 @@ public class ZkStateReader {
public SolrZkClient getZkClient() {
return zkClient;
}
- public Set<String> getAllCollections(){
- return allCollections;
- }
public void updateAliases() throws KeeperException, InterruptedException {
byte[] data = zkClient.getData(ALIASES, null, null, true);
@@ -728,167 +678,4 @@ public class ZkStateReader {
}
}
- public void updateWatchedCollection(DocCollection c) {
- if(watchedCollections.contains(c.getName())){
- watchedCollectionStates.put(c.getName(), c);
- log.info("Updated DocCollection "+c.getName()+" to: ");
- }
- }
-
- /**
- * <b>Advance usage</b>
- * This method can be used to fetch a collection object and control whether it hits
- * the cache only or if information can be looked up from ZooKeeper.
- *
- * @param coll the collection name
- * @param cachedCopyOnly whether to fetch data from cache only or if hitting Zookeeper is acceptable
- * @return the {@link org.apache.solr.common.cloud.DocCollection}
- */
- public DocCollection getCollection(String coll, boolean cachedCopyOnly) {
- if(clusterState.getCollectionStates().get(coll) != null) {
- //this collection resides in clusterstate.json. So it's always up-to-date
- return clusterState.getCollectionStates().get(coll);
- }
- if (watchedCollections.contains(coll) || cachedCopyOnly) {
- DocCollection c = watchedCollectionStates.get(coll);
- if (c != null || cachedCopyOnly) return c;
- }
- return getCollectionLive(this, coll);
- }
-
- private Map ephemeralCollectionData;
-
- /**
- * this is only set by Overseer not to be set by others and only set inside the Overseer node. If Overseer has
- unfinished external collections which are yet to be persisted to ZK
- this map is populated and this class can use that information
- @param map The map reference
- */
- public void setEphemeralCollectionData(Map map){
- ephemeralCollectionData = map;
- }
-
- public static DocCollection getCollectionLive(ZkStateReader zkStateReader, String coll) {
- String collectionPath = getCollectionPath(coll);
- if(zkStateReader.ephemeralCollectionData !=null ){
- ClusterState cs = (ClusterState) zkStateReader.ephemeralCollectionData.get(collectionPath);
- if(cs !=null) {
- return cs.getCollectionStates().get(coll);
- }
- }
- try {
- if (!zkStateReader.getZkClient().exists(collectionPath, true)) return null;
- Stat stat = new Stat();
- byte[] data = zkStateReader.getZkClient().getData(collectionPath, null, stat, true);
- ClusterState state = ClusterState.load(stat.getVersion(), data, Collections.<String>emptySet(), zkStateReader, collectionPath);
- return state.getCollectionStates().get(coll);
- } catch (KeeperException.NoNodeException e) {
- log.warn("No node available : " + collectionPath, e);
- return null;
- } catch (KeeperException e) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK:" + coll, e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK:" + coll, e);
- }
- }
-
- public DocCollection getCollection(String coll) {
- return getCollection(coll, false);
- }
-
- public static String getCollectionPath(String coll) {
- return COLLECTIONS_ZKNODE+"/"+coll + "/state.json";
- }
-
- public void addCollectionWatch(String coll) throws KeeperException, InterruptedException {
- synchronized (this){
- if(watchedCollections.contains(coll)) return;
- else {
- watchedCollections.add(coll);
- }
- addZkWatch(coll);
- }
-
- }
-
- private void addZkWatch(final String coll) throws KeeperException, InterruptedException {
- log.info("addZkWatch {}", coll);
- final String fullpath = getCollectionPath(coll);
- synchronized (getUpdateLock()){
-
- cmdExecutor.ensureExists(fullpath, zkClient);
- log.info("Updating collection state at {} from ZooKeeper... ",fullpath);
-
- Watcher watcher = new Watcher() {
-
- @Override
- public void process(WatchedEvent event) {
- // session events are not change events,
- // and do not remove the watcher
- if (EventType.None.equals(event.getType())) {
- return;
- }
- log.info("A cluster state change: {}, has occurred - updating... ", (event), ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size());
- try {
-
- // delayed approach
- // ZkStateReader.this.updateClusterState(false, false);
- synchronized (ZkStateReader.this.getUpdateLock()) {
- if(!watchedCollections.contains(coll)) {
- log.info("Unwatched collection {}",coll);
- return;
- }
- // remake watch
- final Watcher thisWatch = this;
- Stat stat = new Stat();
- byte[] data = zkClient.getData(fullpath, thisWatch, stat, true);
-
- if(data == null || data.length ==0){
- log.warn("No value set for collection state : {}", coll);
- return;
-
- }
- ClusterState clusterState = ClusterState.load(stat.getVersion(), data, Collections.<String>emptySet(),ZkStateReader.this,fullpath);
- // update volatile
-
- DocCollection newState = clusterState.getCollectionStates().get(coll);
- watchedCollectionStates.put(coll, newState);
- log.info("Updating data for {} to ver {} ", coll , newState.getZNodeVersion());
-
- }
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.SESSIONEXPIRED
- || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- log.error("Unwatched collection :"+coll , e);
- throw new ZooKeeperException(ErrorCode.SERVER_ERROR,
- "", e);
-
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.error("Unwatched collection :"+coll , e);
- return;
- }
- }
-
- };
- zkClient.exists(fullpath, watcher, true);
- }
-
- watchedCollectionStates.put(coll, getCollectionLive(this, coll));
- }
-
- /**This is not a public API. Only used by ZkController */
- public void removeZKWatch(final String coll){
- synchronized (this){
- watchedCollections.remove(coll);
- }
- }
-
-
-
-
}
Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java?rev=1607587&r1=1607586&r2=1607587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java Thu Jul 3 11:13:35 2014
@@ -20,6 +20,7 @@ package org.apache.solr.client.solrj.imp
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
@@ -41,6 +42,7 @@ import org.apache.solr.client.solrj.requ
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase;
+import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
@@ -49,6 +51,7 @@ import org.apache.solr.common.cloud.DocC
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
@@ -59,6 +62,7 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,7 +125,6 @@ public class CloudSolrServerTest extends
@Override
public void doTest() throws Exception {
allTests();
- stateVersionParamTest();
}
private void allTests() throws Exception {
@@ -343,77 +346,7 @@ public class CloudSolrServerTest extends
SolrInputDocument doc = getDoc(fields);
indexDoc(doc);
}
-
- private void stateVersionParamTest() throws Exception {
- CloudSolrServer client = createCloudClient(null);
- try {
- String collectionName = "checkStateVerCol";
- createCollection(collectionName, client, 2, 2);
- waitForRecoveriesToFinish(collectionName, false);
- DocCollection coll = client.getZkStateReader().getClusterState().getCollection(collectionName);
- Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next();
-
- HttpSolrServer httpSolrServer = new HttpSolrServer(r.getStr(ZkStateReader.BASE_URL_PROP) + "/"+collectionName);
-
-
- SolrQuery q = new SolrQuery().setQuery("*:*");
-
- log.info("should work query, result {}", httpSolrServer.query(q));
- //no problem
- q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion());
- log.info("2nd query , result {}", httpSolrServer.query(q));
- //no error yet good
-
- q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+ (coll.getZNodeVersion() -1)); //an older version expect error
-
- HttpSolrServer.RemoteSolrException sse = null;
- try {
- httpSolrServer.query(q);
- log.info("expected query error");
- } catch (HttpSolrServer.RemoteSolrException e) {
- sse = e;
- }
- httpSolrServer.shutdown();
- assertNotNull(sse);
- assertEquals(" Error code should be ", sse.code() , SolrException.ErrorCode.INVALID_STATE.code);
-
- //now send the request to another node that does n ot serve the collection
-
- Set<String> allNodesOfColl = new HashSet<>();
- for (Slice slice : coll.getSlices()) {
- for (Replica replica : slice.getReplicas()) {
- allNodesOfColl.add(replica.getStr(ZkStateReader.BASE_URL_PROP));
- }
- }
- String theNode = null;
- for (String s : client.getZkStateReader().getClusterState().getLiveNodes()) {
- String n = client.getZkStateReader().getBaseUrlForNodeName(s);
- if(!allNodesOfColl.contains(s)){
- theNode = n;
- break;
- }
- }
- log.info("thenode which does not serve this collection{} ",theNode);
- assertNotNull(theNode);
- httpSolrServer = new HttpSolrServer(theNode + "/"+collectionName);
-
- q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion());
-
- try {
- httpSolrServer.query(q);
- log.info("error was expected");
- } catch (HttpSolrServer.RemoteSolrException e) {
- sse = e;
- }
- httpSolrServer.shutdown();
- assertNotNull(sse);
- assertEquals(" Error code should be ", sse.code() , SolrException.ErrorCode.INVALID_STATE.code);
- } finally {
- client.shutdown();
- }
-
- }
-
+
public void testShutdown() throws MalformedURLException {
CloudSolrServer server = new CloudSolrServer("[ff01::114]:33332");
try {