You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by cm...@apache.org on 2013/08/11 14:19:39 UTC
svn commit: r1512909 [24/38] - in /lucene/dev/branches/lucene4956: ./
dev-tools/ dev-tools/eclipse/ dev-tools/idea/.idea/libraries/
dev-tools/idea/lucene/suggest/ dev-tools/idea/solr/contrib/dataimporthandler/
dev-tools/idea/solr/core/src/test/ dev-too...
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Sun Aug 11 12:19:13 2013
@@ -17,15 +17,13 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -34,15 +32,18 @@ import org.apache.solr.common.cloud.Clos
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.PlainIdRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
@@ -53,6 +54,21 @@ import org.apache.zookeeper.KeeperExcept
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.solr.cloud.Assign.Node;
+import static org.apache.solr.cloud.Assign.getNodesForNewShard;
+import static org.apache.solr.common.cloud.DocRouter.ROUTE_FIELD;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+
+
public class OverseerCollectionProcessor implements Runnable, ClosableThread {
public static final String NUM_SLICES = "numShards";
@@ -75,6 +91,24 @@ public class OverseerCollectionProcessor
public static final String SPLITSHARD = "splitshard";
+ public static final String DELETESHARD = "deleteshard";
+
+ public static final String ROUTER = "router";
+
+ public static final String SHARDS_PROP = "shards";
+
+ public static final String CREATESHARD = "createshard";
+
+ public static final String COLL_CONF = "collection.configName";
+
+
+ public static final Map<String,Object> COLL_PROPS = asMap(
+ ROUTER,DocRouter.DEFAULT_NAME,
+ REPLICATION_FACTOR, "1",
+ MAX_SHARDS_PER_NODE,"1",
+ ROUTE_FIELD,null);
+
+
// TODO: use from Overseer?
private static final String QUEUE_OPERATION = "operation";
@@ -158,37 +192,14 @@ public class OverseerCollectionProcessor
protected SolrResponse processMessage(ZkNodeProps message, String operation) {
-
+ log.warn("OverseerCollectionProcessor.processMessage : "+ operation + " , "+ message.toString());
+
NamedList results = new NamedList();
try {
if (CREATECOLLECTION.equals(operation)) {
createCollection(zkStateReader.getClusterState(), message, results);
} else if (DELETECOLLECTION.equals(operation)) {
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
- params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
- params.set(CoreAdminParams.DELETE_DATA_DIR, true);
- collectionCmd(zkStateReader.getClusterState(), message, params, results, null);
-
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
- Overseer.REMOVECOLLECTION, "name", message.getStr("name"));
- Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
-
- // wait for a while until we don't see the collection
- long now = System.currentTimeMillis();
- long timeout = now + 30000;
- boolean removed = false;
- while (System.currentTimeMillis() < timeout) {
- Thread.sleep(100);
- removed = !zkStateReader.getClusterState().getCollections().contains(message.getStr("name"));
- if (removed) {
- Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
- break;
- }
- }
- if (!removed) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully remove collection: " + message.getStr("name"));
- }
+ deleteCollection(message, results);
} else if (RELOADCOLLECTION.equals(operation)) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
@@ -199,6 +210,10 @@ public class OverseerCollectionProcessor
deleteAlias(zkStateReader.getAliases(), message);
} else if (SPLITSHARD.equals(operation)) {
splitShard(zkStateReader.getClusterState(), message, results);
+ } else if (CREATESHARD.equals(operation)) {
+ createShard(zkStateReader.getClusterState(), message, results);
+ } else if (DELETESHARD.equals(operation)) {
+ deleteShard(zkStateReader.getClusterState(), message, results);
} else {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+ operation);
@@ -217,6 +232,34 @@ public class OverseerCollectionProcessor
return new OverseerSolrResponse(results);
}
+ private void deleteCollection(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
+ params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
+ params.set(CoreAdminParams.DELETE_DATA_DIR, true);
+ collectionCmd(zkStateReader.getClusterState(), message, params, results, null);
+
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
+ Overseer.REMOVECOLLECTION, "name", message.getStr("name"));
+ Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
+
+ // wait for a while until we don't see the collection
+ long now = System.currentTimeMillis();
+ long timeout = now + 30000;
+ boolean removed = false;
+ while (System.currentTimeMillis() < timeout) {
+ Thread.sleep(100);
+ removed = !zkStateReader.getClusterState().getCollections().contains(message.getStr("name"));
+ if (removed) {
+ Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
+ break;
+ }
+ }
+ if (!removed) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully remove collection: " + message.getStr("name"));
+ }
+ }
+
private void createAlias(Aliases aliases, ZkNodeProps message) {
String aliasName = message.getStr("name");
String collections = message.getStr("collections");
@@ -317,7 +360,84 @@ public class OverseerCollectionProcessor
}
}
-
+
+ private boolean createShard(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
+ log.info("create shard invoked");
+ String collectionName = message.getStr(COLLECTION_PROP);
+ String shard = message.getStr(SHARD_ID_PROP);
+ if(collectionName == null || shard ==null)
+ throw new SolrException(ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters" );
+ int numSlices = 1;
+
+ DocCollection collection = clusterState.getCollection(collectionName);
+ int maxShardsPerNode = collection.getInt(MAX_SHARDS_PER_NODE, 1);
+ int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(MAX_SHARDS_PER_NODE, 1));
+// int minReplicas = message.getInt("minReplicas",repFactor);
+ String createNodeSetStr =message.getStr(CREATE_NODE_SET);
+
+ ArrayList<Node> sortedNodeList = getNodesForNewShard(clusterState, collectionName, numSlices, maxShardsPerNode, repFactor, createNodeSetStr);
+
+ Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message));
+ // wait for a while until we don't see the collection
+ long waitUntil = System.currentTimeMillis() + 30000;
+ boolean created = false;
+ while (System.currentTimeMillis() < waitUntil) {
+ Thread.sleep(100);
+ created = zkStateReader.getClusterState().getCollection(collectionName).getSlice(shard) !=null;
+ if (created) break;
+ }
+ if (!created)
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr("name"));
+
+
+ String configName = message.getStr(COLL_CONF);
+ String sliceName = shard;
+ for (int j = 1; j <= repFactor; j++) {
+ String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName;
+ String shardName = collectionName + "_" + sliceName + "_replica" + j;
+ log.info("Creating shard " + shardName + " as part of slice "
+ + sliceName + " of collection " + collectionName + " on "
+ + nodeName);
+
+ // Need to create new params for each request
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
+
+ params.set(CoreAdminParams.NAME, shardName);
+ params.set(COLL_CONF, configName);
+ params.set(CoreAdminParams.COLLECTION, collectionName);
+ params.set(CoreAdminParams.SHARD, sliceName);
+ params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+
+ ShardRequest sreq = new ShardRequest();
+ params.set("qt", adminPath);
+ sreq.purpose = 1;
+ String replica = zkStateReader.getZkClient()
+ .getBaseUrlForNodeName(nodeName);
+ if (replica.startsWith("http://")) replica = replica.substring(7);
+ sreq.shards = new String[]{replica};
+ sreq.actualShards = sreq.shards;
+ sreq.params = params;
+
+ shardHandler.submit(sreq, replica, sreq.params);
+
+ }
+
+ ShardResponse srsp;
+ do {
+ srsp = shardHandler.takeCompletedOrError();
+ if (srsp != null) {
+ processResponse(results, srsp);
+ }
+ } while (srsp != null);
+
+ log.info("Finished create command on all shards for collection: "
+ + collectionName);
+
+ return true;
+ }
+
+
private boolean splitShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
log.info("Split shard invoked");
String collectionName = message.getStr("collection");
@@ -360,13 +480,17 @@ public class OverseerCollectionProcessor
throw new SolrException(ErrorCode.BAD_REQUEST, "Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
} else if (Slice.CONSTRUCTION.equals(oSlice.getState())) {
for (Replica replica : oSlice.getReplicas()) {
- String core = replica.getStr("core");
- log.info("Unloading core: " + core + " from node: " + replica.getNodeName());
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
- params.set(CoreAdminParams.CORE, core);
- params.set(CoreAdminParams.DELETE_INDEX, "true");
- sendShardRequest(replica.getNodeName(), params);
+ if (clusterState.liveNodesContain(replica.getNodeName())) {
+ String core = replica.getStr("core");
+ log.info("Unloading core: " + core + " from node: " + replica.getNodeName());
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
+ params.set(CoreAdminParams.CORE, core);
+ params.set(CoreAdminParams.DELETE_INDEX, "true");
+ sendShardRequest(replica.getNodeName(), params);
+ } else {
+ log.warn("Replica {} exists in shard {} but is not live and cannot be unloaded", replica, oSlice);
+ }
}
}
}
@@ -397,13 +521,19 @@ public class OverseerCollectionProcessor
//params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices); todo: is it necessary, we're not creating collections?
sendShardRequest(nodeName, params);
+ }
+ collectShardResponses(results, true,
+ "SPLTSHARD failed to create subshard leaders");
+
+ for (String subShardName : subShardNames) {
// wait for parent leader to acknowledge the sub-shard core
log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
+ String coreNodeName = waitForCoreNodeName(collection, zkStateReader.getZkClient().getBaseUrlForNodeName(nodeName), subShardName);
CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
cmd.setCoreName(subShardName);
cmd.setNodeName(nodeName);
- cmd.setCoreNodeName(nodeName + "_" + subShardName);
+ cmd.setCoreNodeName(coreNodeName);
cmd.setState(ZkStateReader.ACTIVE);
cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true);
@@ -411,7 +541,7 @@ public class OverseerCollectionProcessor
}
collectShardResponses(results, true,
- "SPLTSHARD failed to create subshard leaders or timed out waiting for them to come up");
+ "SPLTSHARD timed out waiting for subshard leaders to come up");
log.info("Successfully created all sub-shards for collection "
+ collectionName + " parent shard: " + slice + " on: " + parentShardLeader);
@@ -506,12 +636,13 @@ public class OverseerCollectionProcessor
sendShardRequest(subShardNodeName, params);
+ String coreNodeName = waitForCoreNodeName(collection, zkStateReader.getZkClient().getBaseUrlForNodeName(subShardNodeName), shardName);
// wait for the replicas to be seen as active on sub shard leader
log.info("Asking sub shard leader to wait for: " + shardName + " to be alive on: " + subShardNodeName);
CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
cmd.setCoreName(subShardNames.get(i-1));
cmd.setNodeName(subShardNodeName);
- cmd.setCoreNodeName(subShardNodeName + "_" + shardName);
+ cmd.setCoreNodeName(coreNodeName);
cmd.setState(ZkStateReader.ACTIVE);
cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true);
@@ -522,6 +653,18 @@ public class OverseerCollectionProcessor
collectShardResponses(results, true,
"SPLTSHARD failed to create subshard replicas or timed out waiting for them to come up");
+ String coreUrl = new ZkCoreNodeProps(parentShardLeader).getCoreUrl();
+ // HttpShardHandler is hard coded to send a QueryRequest hence we go direct
+ // and we force open a searcher so that we have documents to show upon switching states
+ UpdateResponse updateResponse = null;
+ try {
+ updateResponse = commit(coreUrl, true);
+ processResponse(results, null, coreUrl, updateResponse, slice);
+ } catch (Exception e) {
+ processResponse(results, e, coreUrl, updateResponse, slice);
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to call distrib commit on: " + coreUrl, e);
+ }
+
log.info("Successfully created all replica shards for all sub-slices "
+ subSlices);
@@ -546,19 +689,140 @@ public class OverseerCollectionProcessor
}
}
+ static UpdateResponse commit(String url, boolean openSearcher) throws SolrServerException, IOException {
+ HttpSolrServer server = null;
+ try {
+ server = new HttpSolrServer(url);
+ server.setConnectionTimeout(30000);
+ server.setSoTimeout(60000);
+ UpdateRequest ureq = new UpdateRequest();
+ ureq.setParams(new ModifiableSolrParams());
+ ureq.getParams().set(UpdateParams.OPEN_SEARCHER, openSearcher);
+ ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true);
+ return ureq.process(server);
+ } finally {
+ if (server != null) {
+ server.shutdown();
+ }
+ }
+ }
+
+ private String waitForCoreNodeName(DocCollection collection, String msgBaseUrl, String msgCore) {
+ int retryCount = 320;
+ while (retryCount-- > 0) {
+ Map<String,Slice> slicesMap = zkStateReader.getClusterState()
+ .getSlicesMap(collection.getName());
+ if (slicesMap != null) {
+
+ for (Slice slice : slicesMap.values()) {
+ for (Replica replica : slice.getReplicas()) {
+ // TODO: for really large clusters, we could 'index' on this
+
+ String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+ String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+
+ if (baseUrl.equals(msgBaseUrl) && core.equals(msgCore)) {
+ return replica.getName();
+ }
+ }
+ }
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Could not find coreNodeName");
+ }
+
private void collectShardResponses(NamedList results, boolean abortOnError, String msgOnError) {
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
processResponse(results, srsp);
- if (abortOnError && srsp.getException() != null) {
- throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, srsp.getException());
+ Throwable exception = srsp.getException();
+ if (abortOnError && exception != null) {
+ // drain pending requests
+ while (srsp != null) {
+ srsp = shardHandler.takeCompletedOrError();
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception);
}
}
} while (srsp != null);
}
+
+ private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
+ log.info("Delete shard invoked");
+ String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+
+ String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
+ Slice slice = clusterState.getSlice(collection, sliceId);
+
+ if (slice == null) {
+ if(clusterState.getCollections().contains(collection)) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "No shard with the specified name exists: " + slice);
+ } else {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "No collection with the specified name exists: " + collection);
+ }
+ }
+ // For now, only allow for deletions of Inactive slices or custom hashes (range==null).
+ // TODO: Add check for range gaps on Slice deletion
+ if (!(slice.getRange() == null || slice.getState().equals(Slice.INACTIVE))) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "The slice: " + slice.getName() + " is currently "
+ + slice.getState() + ". Only INACTIVE (or custom-hashed) slices can be deleted.");
+ }
+
+ try {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
+ params.set(CoreAdminParams.DELETE_INDEX, "true");
+ sliceCmd(clusterState, params, null, slice);
+
+ ShardResponse srsp;
+ do {
+ srsp = shardHandler.takeCompletedOrError();
+ if (srsp != null) {
+ processResponse(results, srsp);
+ }
+ } while (srsp != null);
+
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
+ Overseer.REMOVESHARD, ZkStateReader.COLLECTION_PROP, collection);
+ Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
+
+ // wait for a while until we don't see the shard
+ long now = System.currentTimeMillis();
+ long timeout = now + 30000;
+ boolean removed = false;
+ while (System.currentTimeMillis() < timeout) {
+ Thread.sleep(100);
+ removed = zkStateReader.getClusterState().getSlice(collection, message.getStr("name")) == null;
+ if (removed) {
+ Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
+ break;
+ }
+ }
+ if (!removed) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Could not fully remove collection: " + collection + " shard: " + message.getStr("name"));
+ }
+
+ log.info("Successfully deleted collection " + collection + ", shard: " + message.getStr("name"));
+
+ } catch (SolrException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Error executing delete operation for collection: " + collection + " shard: " + message.getStr("name"), e);
+ }
+ }
+
private void sendShardRequest(String nodeName, ModifiableSolrParams params) {
ShardRequest sreq = new ShardRequest();
params.set("qt", adminPath);
@@ -571,8 +835,8 @@ public class OverseerCollectionProcessor
shardHandler.submit(sreq, replica, sreq.params);
}
-
- private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) {
+
+ private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
String collectionName = message.getStr("name");
if (clusterState.getCollections().contains(collectionName)) {
throw new SolrException(ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
@@ -582,14 +846,22 @@ public class OverseerCollectionProcessor
// look at the replication factor and see if it matches reality
// if it does not, find best nodes to create more cores
- int repFactor = msgStrToInt(message, REPLICATION_FACTOR, 1);
- Integer numSlices = msgStrToInt(message, NUM_SLICES, null);
-
- if (numSlices == null) {
+ int repFactor = message.getInt( REPLICATION_FACTOR, 1);
+ Integer numSlices = message.getInt(NUM_SLICES, null);
+ String router = message.getStr(ROUTER, DocRouter.DEFAULT_NAME);
+ List<String> shardNames = new ArrayList<>();
+ if(ImplicitDocRouter.NAME.equals(router)){
+ Overseer.getShardNames(shardNames, message.getStr("shards",null));
+ numSlices = shardNames.size();
+ } else {
+ Overseer.getShardNames(numSlices,shardNames);
+ }
+
+ if (numSlices == null ) {
throw new SolrException(ErrorCode.BAD_REQUEST, NUM_SLICES + " is a required param");
}
-
- int maxShardsPerNode = msgStrToInt(message, MAX_SHARDS_PER_NODE, 1);
+
+ int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
String createNodeSetStr;
List<String> createNodeList = ((createNodeSetStr = message.getStr(CREATE_NODE_SET)) == null)?null:StrUtils.splitSmart(createNodeSetStr, ",", true);
@@ -601,8 +873,6 @@ public class OverseerCollectionProcessor
throw new SolrException(ErrorCode.BAD_REQUEST, NUM_SLICES + " must be > 0");
}
- String configName = message.getStr("collection.configName");
-
// we need to look at every node and see how many cores it serves
// add our new cores to existing nodes serving the least number of cores
// but (for now) require that each core goes on a distinct node.
@@ -645,26 +915,44 @@ public class OverseerCollectionProcessor
+ ". This requires " + requestedShardsToCreate
+ " shards to be created (higher than the allowed number)");
}
-
- for (int i = 1; i <= numSlices; i++) {
+
+// ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
+// Overseer.CREATECOLLECTION, "name", message.getStr("name"));
+ Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message));
+
+ // wait for a while until we don't see the collection
+ long waitUntil = System.currentTimeMillis() + 30000;
+ boolean created = false;
+ while (System.currentTimeMillis() < waitUntil) {
+ Thread.sleep(100);
+ created = zkStateReader.getClusterState().getCollections().contains(message.getStr("name"));
+ if(created) break;
+ }
+ if (!created)
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully createcollection: " + message.getStr("name"));
+
+
+ String configName = message.getStr(COLL_CONF);
+ log.info("going to create cores replicas shardNames {} , repFactor : {}", shardNames, repFactor);
+ for (int i = 1; i <= shardNames.size(); i++) {
+ String sliceName = shardNames.get(i-1);
for (int j = 1; j <= repFactor; j++) {
String nodeName = nodeList.get((repFactor * (i - 1) + (j - 1)) % nodeList.size());
- String sliceName = "shard" + i;
String shardName = collectionName + "_" + sliceName + "_replica" + j;
log.info("Creating shard " + shardName + " as part of slice "
+ sliceName + " of collection " + collectionName + " on "
+ nodeName);
-
+
// Need to create new params for each request
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
-
+
params.set(CoreAdminParams.NAME, shardName);
- params.set("collection.configName", configName);
+ params.set(COLL_CONF, configName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, sliceName);
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
-
+
ShardRequest sreq = new ShardRequest();
params.set("qt", adminPath);
sreq.purpose = 1;
@@ -674,12 +962,12 @@ public class OverseerCollectionProcessor
sreq.shards = new String[] {replica};
sreq.actualShards = sreq.shards;
sreq.params = params;
-
+
shardHandler.submit(sreq, replica, sreq.params);
-
+
}
}
-
+
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
@@ -697,7 +985,7 @@ public class OverseerCollectionProcessor
throw new SolrException(ErrorCode.SERVER_ERROR, null, ex);
}
}
-
+
private void collectionCmd(ClusterState clusterState, ZkNodeProps message, ModifiableSolrParams params, NamedList results, String stateMatcher) {
log.info("Executing Collection Cmd : " + params);
String collectionName = message.getStr("name");
@@ -711,33 +999,7 @@ public class OverseerCollectionProcessor
for (Map.Entry<String,Slice> entry : coll.getSlicesMap().entrySet()) {
Slice slice = entry.getValue();
- Map<String,Replica> shards = slice.getReplicasMap();
- Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
- for (Map.Entry<String,Replica> shardEntry : shardEntries) {
- final ZkNodeProps node = shardEntry.getValue();
- if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP)) && (stateMatcher != null ? node.getStr(ZkStateReader.STATE_PROP).equals(stateMatcher) : true)) {
- // For thread safety, only simple clone the ModifiableSolrParams
- ModifiableSolrParams cloneParams = new ModifiableSolrParams();
- cloneParams.add(params);
- cloneParams.set(CoreAdminParams.CORE,
- node.getStr(ZkStateReader.CORE_NAME_PROP));
-
- String replica = node.getStr(ZkStateReader.BASE_URL_PROP);
- ShardRequest sreq = new ShardRequest();
- sreq.nodeName = node.getStr(ZkStateReader.NODE_NAME_PROP);
- // yes, they must use same admin handler path everywhere...
- cloneParams.set("qt", adminPath);
- sreq.purpose = 1;
- // TODO: this sucks
- if (replica.startsWith("http://")) replica = replica.substring(7);
- sreq.shards = new String[] {replica};
- sreq.actualShards = sreq.shards;
- sreq.params = cloneParams;
- log.info("Collection Admin sending CoreAdmin cmd to " + replica
- + " params:" + sreq.params);
- shardHandler.submit(sreq, replica, sreq.params);
- }
- }
+ sliceCmd(clusterState, params, stateMatcher, slice);
}
ShardResponse srsp;
@@ -750,39 +1012,66 @@ public class OverseerCollectionProcessor
}
+ private void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, String stateMatcher, Slice slice) {
+ Map<String,Replica> shards = slice.getReplicasMap();
+ Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
+ for (Map.Entry<String,Replica> shardEntry : shardEntries) {
+ final ZkNodeProps node = shardEntry.getValue();
+ if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP)) && (stateMatcher != null ? node.getStr(ZkStateReader.STATE_PROP).equals(stateMatcher) : true)) {
+ // For thread safety, only simple clone the ModifiableSolrParams
+ ModifiableSolrParams cloneParams = new ModifiableSolrParams();
+ cloneParams.add(params);
+ cloneParams.set(CoreAdminParams.CORE,
+ node.getStr(ZkStateReader.CORE_NAME_PROP));
+
+ String replica = node.getStr(ZkStateReader.BASE_URL_PROP);
+ ShardRequest sreq = new ShardRequest();
+ sreq.nodeName = node.getStr(ZkStateReader.NODE_NAME_PROP);
+ // yes, they must use same admin handler path everywhere...
+ cloneParams.set("qt", adminPath);
+ sreq.purpose = 1;
+ // TODO: this sucks
+ if (replica.startsWith("http://")) replica = replica.substring(7);
+ sreq.shards = new String[] {replica};
+ sreq.actualShards = sreq.shards;
+ sreq.params = cloneParams;
+ log.info("Collection Admin sending CoreAdmin cmd to " + replica
+ + " params:" + sreq.params);
+ shardHandler.submit(sreq, replica, sreq.params);
+ }
+ }
+ }
+
private void processResponse(NamedList results, ShardResponse srsp) {
Throwable e = srsp.getException();
+ String nodeName = srsp.getNodeName();
+ SolrResponse solrResponse = srsp.getSolrResponse();
+ String shard = srsp.getShard();
+
+ processResponse(results, e, nodeName, solrResponse, shard);
+ }
+
+ private void processResponse(NamedList results, Throwable e, String nodeName, SolrResponse solrResponse, String shard) {
if (e != null) {
- log.error("Error from shard: " + srsp.getShard(), e);
-
+ log.error("Error from shard: " + shard, e);
+
SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
if (failure == null) {
failure = new SimpleOrderedMap();
results.add("failure", failure);
}
- failure.add(srsp.getNodeName(), e.getClass().getName() + ":" + e.getMessage());
-
+ failure.add(nodeName, e.getClass().getName() + ":" + e.getMessage());
+
} else {
-
+
SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
if (success == null) {
success = new SimpleOrderedMap();
results.add("success", success);
}
-
- success.add(srsp.getNodeName(), srsp.getSolrResponse().getResponse());
- }
- }
-
- private Integer msgStrToInt(ZkNodeProps message, String key, Integer def)
- throws Exception {
- String str = message.getStr(key);
- try {
- return str == null ? def : Integer.parseInt(str);
- } catch (Exception ex) {
- SolrException.log(log, "Could not parse " + key, ex);
- throw ex;
+
+ success.add(nodeName, solrResponse.getResponse());
}
}
@@ -790,4 +1079,12 @@ public class OverseerCollectionProcessor
public boolean isClosed() {
return isClosed;
}
+
+ public static Map<String, Object> asMap(Object... vals) {
+ HashMap<String, Object> m = new HashMap<String, Object>();
+ for(int i=0; i<vals.length; i+=2) {
+ m.put(String.valueOf(vals[i]), vals[i+1]);
+ }
+ return m;
+ }
}
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Sun Aug 11 12:19:13 2013
@@ -91,7 +91,7 @@ public class RecoveryStrategy extends Th
zkController = cc.getZkController();
zkStateReader = zkController.getZkStateReader();
baseUrl = zkController.getBaseUrl();
- coreZkNodeName = zkController.getCoreNodeName(cd);
+ coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
}
public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
@@ -118,44 +118,40 @@ public class RecoveryStrategy extends Th
}
}
- private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops, String baseUrl)
+ private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops)
throws SolrServerException, IOException {
-
- String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
+
ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops);
String leaderUrl = leaderCNodeProps.getCoreUrl();
log.info("Attempting to replicate from " + leaderUrl + ". core=" + coreName);
- // if we are the leader, either we are trying to recover faster
- // then our ephemeral timed out or we are the only node
- if (!leaderBaseUrl.equals(baseUrl)) {
-
- // send commit
- commitOnLeader(leaderUrl);
-
- // use rep handler directly, so we can do this sync rather than async
- SolrRequestHandler handler = core.getRequestHandler(REPLICATION_HANDLER);
- if (handler instanceof LazyRequestHandlerWrapper) {
- handler = ((LazyRequestHandlerWrapper)handler).getWrappedHandler();
- }
- ReplicationHandler replicationHandler = (ReplicationHandler) handler;
-
- if (replicationHandler == null) {
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
- "Skipping recovery, no " + REPLICATION_HANDLER + " handler found");
- }
-
- ModifiableSolrParams solrParams = new ModifiableSolrParams();
- solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
-
- if (isClosed()) retries = INTERRUPTED;
- boolean success = replicationHandler.doFetch(solrParams, false);
+ // send commit
+ commitOnLeader(leaderUrl);
+
+ // use rep handler directly, so we can do this sync rather than async
+ SolrRequestHandler handler = core.getRequestHandler(REPLICATION_HANDLER);
+ if (handler instanceof LazyRequestHandlerWrapper) {
+ handler = ((LazyRequestHandlerWrapper) handler).getWrappedHandler();
+ }
+ ReplicationHandler replicationHandler = (ReplicationHandler) handler;
+
+ if (replicationHandler == null) {
+ throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+ "Skipping recovery, no " + REPLICATION_HANDLER + " handler found");
+ }
+
+ ModifiableSolrParams solrParams = new ModifiableSolrParams();
+ solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
+
+ if (isClosed()) retries = INTERRUPTED;
+ boolean success = replicationHandler.doFetch(solrParams, false);
+
+ if (!success) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Replication for recovery failed.");
+ }
- if (!success) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
- }
-
// solrcloud_debug
// try {
// RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
@@ -169,7 +165,7 @@ public class RecoveryStrategy extends Th
// } catch (Exception e) {
//
// }
- }
+
}
private void commitOnLeader(String leaderUrl) throws SolrServerException, IOException {
@@ -329,10 +325,10 @@ public class RecoveryStrategy extends Th
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
boolean isLeader = leaderUrl.equals(ourUrl);
- if (isLeader && !cloudDesc.isLeader) {
+ if (isLeader && !cloudDesc.isLeader()) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
}
- if (cloudDesc.isLeader) {
+ if (cloudDesc.isLeader()) {
// we are now the leader - no one else must have been suitable
log.warn("We have not yet recovered - but we are now the leader! core=" + coreName);
log.info("Finished recovery process. core=" + coreName);
@@ -406,8 +402,7 @@ public class RecoveryStrategy extends Th
try {
- replicate(zkController.getNodeName(), core,
- leaderprops, leaderUrl);
+ replicate(zkController.getNodeName(), core, leaderprops);
replay(ulog);
replayed = true;
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SolrZkServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SolrZkServer.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SolrZkServer.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SolrZkServer.java Sun Aug 11 12:19:13 2013
@@ -179,7 +179,7 @@ class SolrZkServerProps extends QuorumPe
Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
- cfg.load(in);
+ cfg.load(new InputStreamReader(in, IOUtils.CHARSET_UTF_8));
} finally {
in.close();
}
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java Sun Aug 11 12:19:13 2013
@@ -62,12 +62,12 @@ public class SyncStrategy {
private volatile boolean isClosed;
- private final static HttpClient client;
- static {
+ private final HttpClient client;
+ {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 20);
- params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000);
+ params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 15000);
params.set(HttpClientUtil.PROP_SO_TIMEOUT, 30000);
params.set(HttpClientUtil.PROP_USE_RETRY, false);
client = HttpClientUtil.createClient(params);
@@ -87,6 +87,10 @@ public class SyncStrategy {
if (SKIP_AUTO_RECOVERY) {
return true;
}
+ if (isClosed) {
+ log.warn("Closed, skipping sync up.");
+ return false;
+ }
log.info("Sync replicas to " + ZkCoreNodeProps.getCoreUrl(leaderProps));
// TODO: look at our state usage of sync
// zkController.publish(core, ZkStateReader.SYNC);
@@ -112,20 +116,6 @@ public class SyncStrategy {
log.info("We have been closed, won't sync with replicas");
return false;
}
- // if no one that is up is active, we are willing to wait...
- // we don't want a recovering node to become leader and then
- // a better candidate pops up a second later.
-// int tries = 20;
-// while (!areAnyReplicasActive(zkController, collection, shardId)) {
-// if (tries-- == 0) {
-// break;
-// }
-// try {
-// Thread.sleep(500);
-// } catch (InterruptedException e) {
-// Thread.currentThread().interrupt();
-// }
-// }
// first sync ourselves - we are the potential leader after all
try {
@@ -146,7 +136,7 @@ public class SyncStrategy {
syncToMe(zkController, collection, shardId, leaderProps, core.getCoreDescriptor());
} else {
- log.info("Leader's attempt to sync with shard failed, moving to the next canidate");
+ log.info("Leader's attempt to sync with shard failed, moving to the next candidate");
// lets see who seems ahead...
}
@@ -160,8 +150,7 @@ public class SyncStrategy {
private boolean syncWithReplicas(ZkController zkController, SolrCore core,
ZkNodeProps props, String collection, String shardId) {
List<ZkCoreNodeProps> nodes = zkController.getZkStateReader()
- .getReplicaProps(collection, shardId,
- zkController.getCoreNodeName(core.getCoreDescriptor()),
+ .getReplicaProps(collection, shardId,core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName(),
props.getStr(ZkStateReader.CORE_NAME_PROP));
if (nodes == null) {
@@ -189,7 +178,7 @@ public class SyncStrategy {
List<ZkCoreNodeProps> nodes = zkController
.getZkStateReader()
.getReplicaProps(collection, shardId,
- zkController.getCoreNodeName(cd),
+ cd.getCloudDescriptor().getCoreNodeName(),
leaderProps.getStr(ZkStateReader.CORE_NAME_PROP));
if (nodes == null) {
log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + " has no replicas");
@@ -273,6 +262,11 @@ public class SyncStrategy {
public void close() {
this.isClosed = true;
try {
+ client.getConnectionManager().shutdown();
+ } catch (Throwable e) {
+ SolrException.log(log, e);
+ }
+ try {
ExecutorUtil.shutdownNowAndAwaitTermination(recoveryCmdExecutor);
} catch (Throwable e) {
SolrException.log(log, e);
@@ -280,7 +274,6 @@ public class SyncStrategy {
}
private void requestRecovery(final ZkNodeProps leaderProps, final String baseUrl, final String coreName) throws SolrServerException, IOException {
- // TODO: do this in background threads
Thread thread = new Thread() {
{
setDaemon(true);
@@ -291,9 +284,9 @@ public class SyncStrategy {
recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
recoverRequestCmd.setCoreName(coreName);
- HttpSolrServer server = new HttpSolrServer(baseUrl);
- server.setConnectionTimeout(45000);
- server.setSoTimeout(45000);
+ HttpSolrServer server = new HttpSolrServer(baseUrl, client);
+ server.setConnectionTimeout(15000);
+ server.setSoTimeout(30000);
try {
server.request(recoverRequestCmd);
} catch (Throwable t) {
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/ZkCLI.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/ZkCLI.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/ZkCLI.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/ZkCLI.java Sun Aug 11 12:19:13 2013
@@ -1,14 +1,5 @@
package org.apache.solr.cloud;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-
-import javax.xml.parsers.ParserConfigurationException;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@@ -17,18 +8,21 @@ import org.apache.commons.cli.OptionBuil
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
-import org.apache.commons.io.IOUtils;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.core.Config;
-import org.apache.solr.core.ConfigSolr;
-import org.apache.solr.core.ConfigSolrXml;
-import org.apache.solr.core.ConfigSolrXmlOld;
-import org.apache.solr.core.SolrResourceLoader;
+import org.apache.solr.core.CoreContainer;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.xml.sax.InputSource;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
import org.xml.sax.SAXException;
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -49,6 +43,7 @@ import org.xml.sax.SAXException;
public class ZkCLI {
private static final String MAKEPATH = "makepath";
+ private static final String PUT = "put";
private static final String DOWNCONFIG = "downconfig";
private static final String ZK_CLI_NAME = "ZkCLI";
private static final String HELP = "help";
@@ -92,7 +87,7 @@ public class ZkCLI {
.hasArg(true)
.withDescription(
"cmd to run: " + BOOTSTRAP + ", " + UPCONFIG + ", " + DOWNCONFIG
- + ", " + LINKCONFIG + ", " + MAKEPATH + ", "+ LIST + ", " +CLEAR).create(CMD));
+ + ", " + LINKCONFIG + ", " + MAKEPATH + ", "+ PUT + ", "+ LIST + ", " + CLEAR).create(CMD));
Option zkHostOption = new Option("z", ZKHOST, true,
"ZooKeeper host address");
@@ -135,6 +130,7 @@ public class ZkCLI {
System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + DOWNCONFIG + " -" + CONFDIR + " /opt/solr/collection1/conf" + " -" + CONFNAME + " myconf");
System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + LINKCONFIG + " -" + COLLECTION + " collection1" + " -" + CONFNAME + " myconf");
System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + MAKEPATH + " /apache/solr");
+ System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + PUT + " /solr.conf 'conf data'");
System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + CLEAR + " /solr");
System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + LIST);
return;
@@ -175,35 +171,18 @@ public class ZkCLI {
+ " is required for " + BOOTSTRAP);
System.exit(1);
}
- SolrResourceLoader loader = new SolrResourceLoader(solrHome);
- solrHome = loader.getInstanceDir();
-
- File configFile = new File(solrHome, SOLR_XML);
- InputStream is = new FileInputStream(configFile);
-
- ConfigSolr cfg;
-
- try {
- Config config = new Config(loader, null, new InputSource(is), null, false);
-
- boolean oldStyle = (config.getNode("solr/cores", false) != null);
-
- if (oldStyle) {
- cfg = new ConfigSolrXmlOld(config, null);
- } else {
- cfg = new ConfigSolrXml(config, null);
- }
- } finally {
- IOUtils.closeQuietly(is);
- }
+ CoreContainer cc = new CoreContainer(solrHome);
if(!ZkController.checkChrootPath(zkServerAddress, true)) {
System.out.println("A chroot was specified in zkHost but the znode doesn't exist. ");
System.exit(1);
}
- ZkController.bootstrapConf(zkClient, cfg, solrHome);
+ ZkController.bootstrapConf(zkClient, cc, solrHome);
+
+ // No need to shutdown the CoreContainer, as it wasn't started
+ // up in the first place...
} else if (line.getOptionValue(CMD).equals(UPCONFIG)) {
if (!line.hasOption(CONFDIR) || !line.hasOption(CONFNAME)) {
@@ -256,6 +235,15 @@ public class ZkCLI {
System.exit(1);
}
zkClient.makePath(arglist.get(0).toString(), true);
+ } else if (line.getOptionValue(CMD).equals(PUT)) {
+ List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+ List arglist = line.getArgList();
+ if (arglist.size() != 2) {
+ System.out.println("-" + PUT + " requires two args - the path to create and the data string");
+ System.exit(1);
+ }
+ zkClient.create(arglist.get(0).toString(), arglist.get(1).toString().getBytes("UTF-8"),
+ acl, CreateMode.PERSISTENT, true);
}
} finally {
if (solrPort != null) {
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/ZkController.java Sun Aug 11 12:19:13 2013
@@ -17,29 +17,8 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
-import java.io.File;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.URLEncoder;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeoutException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
import org.apache.solr.common.SolrException;
@@ -58,14 +37,12 @@ import org.apache.solr.common.cloud.ZkNo
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.core.ConfigSolr;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateShardHandler;
-import org.apache.solr.util.PropertiesUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -74,6 +51,28 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.URLEncoder;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
/**
* Handle ZooKeeper interactions.
*
@@ -131,7 +130,9 @@ public final class ZkController {
protected volatile Overseer overseer;
- private String leaderVoteWait;
+ private int leaderVoteWait;
+
+ private boolean genericCoreNodeNames;
private int clientTimeout;
@@ -140,11 +141,11 @@ public final class ZkController {
private UpdateShardHandler updateShardHandler;
public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
- String localHostContext, String leaderVoteWait, int distribUpdateConnTimeout, int distribUpdateSoTimeout, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
+ String localHostContext, int leaderVoteWait, boolean genericCoreNodeNames, int distribUpdateConnTimeout, int distribUpdateSoTimeout, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
TimeoutException, IOException {
if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null.");
this.cc = cc;
-
+ this.genericCoreNodeNames = genericCoreNodeNames;
// be forgiving and strip this off leading/trailing slashes
// this allows us to support users specifying hostContext="/" in
// solr.xml to indicate the root context, instead of hostContext=""
@@ -241,7 +242,7 @@ public final class ZkController {
init(registerOnReconnect);
}
- public String getLeaderVoteWait() {
+ public int getLeaderVoteWait() {
return leaderVoteWait;
}
@@ -254,9 +255,9 @@ public final class ZkController {
// before registering as live, make sure everyone is in a
// down state
for (CoreDescriptor descriptor : descriptors) {
- final String coreZkNodeName = getCoreNodeName(descriptor);
+ final String coreZkNodeName = descriptor.getCloudDescriptor().getCoreNodeName();
try {
- descriptor.getCloudDescriptor().isLeader = false;
+ descriptor.getCloudDescriptor().setLeader(false);
publish(descriptor, ZkStateReader.DOWN, updateLastPublished);
} catch (Exception e) {
if (isClosed) {
@@ -323,7 +324,7 @@ public final class ZkController {
.getCurrentDescriptors();
if (descriptors != null) {
for (CoreDescriptor descriptor : descriptors) {
- descriptor.getCloudDescriptor().isLeader = false;
+ descriptor.getCloudDescriptor().setLeader(false);
}
}
}
@@ -544,7 +545,6 @@ public final class ZkController {
if (replica.getNodeName().equals(getNodeName())
&& !(replica.getStr(ZkStateReader.STATE_PROP)
.equals(ZkStateReader.DOWN))) {
- assert replica.getStr(ZkStateReader.SHARD_ID_PROP) != null;
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
ZkStateReader.BASE_URL_PROP, getBaseUrl(),
@@ -555,8 +555,7 @@ public final class ZkController {
ZkStateReader.NODE_NAME_PROP, getNodeName(),
ZkStateReader.SHARD_ID_PROP,
replica.getStr(ZkStateReader.SHARD_ID_PROP),
- ZkStateReader.COLLECTION_PROP,
- replica.getStr(ZkStateReader.COLLECTION_PROP),
+ ZkStateReader.COLLECTION_PROP, collectionName,
ZkStateReader.CORE_NODE_NAME_PROP, replica.getName());
updatedNodes.add(replica.getStr(ZkStateReader.CORE_NAME_PROP));
overseerJobQueue.offer(ZkStateReader.toJSON(m));
@@ -735,7 +734,8 @@ public final class ZkController {
final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
final String collection = cloudDesc.getCollectionName();
- final String coreZkNodeName = getCoreNodeName(desc);
+ final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName();
+ assert coreZkNodeName != null : "we should have a coreNodeName by now";
String shardId = cloudDesc.getShardId();
@@ -769,7 +769,7 @@ public final class ZkController {
// in this case, we want to wait for the leader as long as the leader might
// wait for a vote, at least - but also long enough that a large cluster has
// time to get its act together
- String leaderUrl = getLeader(cloudDesc, Integer.parseInt(leaderVoteWait) + 600000);
+ String leaderUrl = getLeader(cloudDesc, leaderVoteWait + 600000);
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
log.info("We are " + ourUrl + " and leader is " + leaderUrl);
@@ -794,9 +794,7 @@ public final class ZkController {
if (!core.isReloaded() && ulog != null) {
// disable recovery in case shard is in construction state (for shard splits)
Slice slice = getClusterState().getSlice(collection, shardId);
- if (Slice.CONSTRUCTION.equals(slice.getState())) {
- publish(desc, ZkStateReader.ACTIVE);
- } else {
+ if (!Slice.CONSTRUCTION.equals(slice.getState()) || !isLeader) {
Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
.getUpdateLog().recoverFromLog();
if (recoveryFuture != null) {
@@ -807,11 +805,11 @@ public final class ZkController {
} else {
log.info("No LogReplay needed for core=" + core.getName() + " baseURL=" + baseUrl);
}
- boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
- collection, coreZkNodeName, shardId, leaderProps, core, cc);
- if (!didRecovery) {
- publish(desc, ZkStateReader.ACTIVE);
- }
+ }
+ boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
+ collection, coreZkNodeName, shardId, leaderProps, core, cc);
+ if (!didRecovery) {
+ publish(desc, ZkStateReader.ACTIVE);
}
}
} finally {
@@ -923,16 +921,16 @@ public final class ZkController {
props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
- final String coreZkNodeName = getCoreNodeName(cd);
+ final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
ZkNodeProps ourProps = new ZkNodeProps(props);
String collection = cd.getCloudDescriptor()
.getCollectionName();
ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
- collection, coreZkNodeName, ourProps, this, cc);
+ collection, coreNodeName, ourProps, this, cc);
leaderElector.setup(context);
- electionContexts.put(coreZkNodeName, context);
+ electionContexts.put(coreNodeName, context);
leaderElector.joinElection(context, false);
}
@@ -989,6 +987,9 @@ public final class ZkController {
numShards = Integer.getInteger(ZkStateReader.NUM_SHARDS_PROP);
}
+ assert cd.getCloudDescriptor().getCollectionName() != null && cd.getCloudDescriptor()
+ .getCollectionName().length() > 0;
+
String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
//assert cd.getCloudDescriptor().getShardId() != null;
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
@@ -1017,7 +1018,7 @@ public final class ZkController {
final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
- final String shardId = state.getShardId(coreNodeName);
+ final String shardId = state.getShardId(getBaseUrl(), desc.getName());
if (shardId != null) {
cloudDesc.setShardId(shardId);
@@ -1028,16 +1029,21 @@ public final class ZkController {
public void unregister(String coreName, CoreDescriptor cd)
throws InterruptedException, KeeperException {
- final String zkNodeName = getCoreNodeName(cd);
- ElectionContext context = electionContexts.remove(zkNodeName);
+ final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
+ ElectionContext context = electionContexts.remove(coreNodeName);
+
+ assert context != null : coreNodeName;
+
if (context != null) {
context.cancelElection();
}
+ CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
- "deletecore", ZkStateReader.CORE_NAME_PROP, coreName,
+ Overseer.DELETECORE, ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.NODE_NAME_PROP, getNodeName(),
- ZkStateReader.COLLECTION_PROP, cd.getCloudDescriptor().getCollectionName());
+ ZkStateReader.COLLECTION_PROP, cloudDescriptor.getCollectionName(),
+ ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
overseerJobQueue.offer(ZkStateReader.toJSON(m));
}
@@ -1206,13 +1212,60 @@ public final class ZkController {
return zkStateReader;
}
- private String doGetShardIdProcess(String coreName, CoreDescriptor descriptor) {
- final String coreNodeName = getCoreNodeName(descriptor);
+ private void doGetShardIdAndNodeNameProcess(CoreDescriptor cd) {
+ final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
+
+ if (coreNodeName != null) {
+ waitForShardId(cd);
+ } else {
+ // if no explicit coreNodeName, we want to match by base url and core name
+ waitForCoreNodeName(cd);
+ waitForShardId(cd);
+ }
+ }
+
+ private void waitForCoreNodeName(CoreDescriptor descriptor) {
int retryCount = 320;
+ log.info("look for our core node name");
while (retryCount-- > 0) {
- final String shardId = zkStateReader.getClusterState().getShardId(coreNodeName);
+ Map<String,Slice> slicesMap = zkStateReader.getClusterState()
+ .getSlicesMap(descriptor.getCloudDescriptor().getCollectionName());
+ if (slicesMap != null) {
+
+ for (Slice slice : slicesMap.values()) {
+ for (Replica replica : slice.getReplicas()) {
+ // TODO: for really large clusters, we could 'index' on this
+
+ String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+ String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+
+ String msgBaseUrl = getBaseUrl();
+ String msgCore = descriptor.getName();
+
+ if (baseUrl.equals(msgBaseUrl) && core.equals(msgCore)) {
+ descriptor.getCloudDescriptor()
+ .setCoreNodeName(replica.getName());
+ return;
+ }
+ }
+ }
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private void waitForShardId(CoreDescriptor cd) {
+ log.info("waiting to find shard id in clusterstate for " + cd.getName());
+ int retryCount = 320;
+ while (retryCount-- > 0) {
+ final String shardId = zkStateReader.getClusterState().getShardId(getBaseUrl(), cd.getName());
if (shardId != null) {
- return shardId;
+ cd.getCloudDescriptor().setShardId(shardId);
+ return;
}
try {
Thread.sleep(1000);
@@ -1222,7 +1275,7 @@ public final class ZkController {
}
throw new SolrException(ErrorCode.SERVER_ERROR,
- "Could not get shard_id for core: " + coreName + " coreNodeName:" + coreNodeName);
+ "Could not get shard id for core: " + cd.getName());
}
public static void uploadToZK(SolrZkClient zkClient, File dir, String zkPath) throws IOException, KeeperException, InterruptedException {
@@ -1261,7 +1314,7 @@ public final class ZkController {
public String getCoreNodeName(CoreDescriptor descriptor){
String coreNodeName = descriptor.getCloudDescriptor().getCoreNodeName();
- if (coreNodeName == null) {
+ if (coreNodeName == null && !genericCoreNodeNames) {
// it's the default
return getNodeName() + "_" + descriptor.getName();
}
@@ -1277,34 +1330,33 @@ public final class ZkController {
downloadFromZK(zkClient, ZkController.CONFIGS_ZKNODE + "/" + configName, dir);
}
- public void preRegister(SolrCore core) throws KeeperException, InterruptedException {
- CoreDescriptor cd = core.getCoreDescriptor();
- if (Slice.CONSTRUCTION.equals(cd.getCloudDescriptor().getShardState())) {
- // set update log to buffer before publishing the core
- core.getUpdateHandler().getUpdateLog().bufferUpdates();
- }
- // before becoming available, make sure we are not live and active
- // this also gets us our assigned shard id if it was not specified
- publish(cd, ZkStateReader.DOWN, false);
- // shardState and shardRange are for one-time use only, thereafter the actual values in the Slice should be used
- if (Slice.CONSTRUCTION.equals(cd.getCloudDescriptor().getShardState())) {
- cd.getCloudDescriptor().setShardState(null);
- cd.getCloudDescriptor().setShardRange(null);
- }
- String coreNodeName = getCoreNodeName(cd);
+ public void preRegister(CoreDescriptor cd ) {
+ String coreNodeName = getCoreNodeName(cd);
+
// make sure the node name is set on the descriptor
if (cd.getCloudDescriptor().getCoreNodeName() == null) {
cd.getCloudDescriptor().setCoreNodeName(coreNodeName);
}
+
+ // before becoming available, make sure we are not live and active
+ // this also gets us our assigned shard id if it was not specified
+ try {
+ publish(cd, ZkStateReader.DOWN, false);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
if (cd.getCloudDescriptor().getShardId() == null && needsToBeAssignedShardId(cd, zkStateReader.getClusterState(), coreNodeName)) {
- String shardId;
- shardId = doGetShardIdProcess(cd.getName(), cd);
- cd.getCloudDescriptor().setShardId(shardId);
+ doGetShardIdAndNodeNameProcess(cd);
} else {
// still wait till we see us in local state
- doGetShardIdProcess(cd.getName(), cd);
+ doGetShardIdAndNodeNameProcess(cd);
}
}
@@ -1339,7 +1391,7 @@ public final class ZkController {
}
}
}
-
+
String leaderBaseUrl = leaderProps.getBaseUrl();
String leaderCoreName = leaderProps.getCoreName();
@@ -1438,27 +1490,22 @@ public final class ZkController {
/**
* If in SolrCloud mode, upload config sets for each SolrCore in solr.xml.
*/
- public static void bootstrapConf(SolrZkClient zkClient, ConfigSolr cfg, String solrHome) throws IOException,
+ public static void bootstrapConf(SolrZkClient zkClient, CoreContainer cc, String solrHome) throws IOException,
KeeperException, InterruptedException {
- List<String> allCoreNames = cfg.getAllCoreNames();
+ //List<String> allCoreNames = cfg.getAllCoreNames();
+ List<CoreDescriptor> cds = cc.getCoresLocator().discover(cc);
- log.info("bootstraping config for " + allCoreNames.size() + " cores into ZooKeeper using solr.xml from " + solrHome);
+ log.info("bootstrapping config for " + cds.size() + " cores into ZooKeeper using solr.xml from " + solrHome);
- for (String coreName : allCoreNames) {
- String rawName = PropertiesUtil.substituteProperty(cfg.getProperty(coreName, "name", null), new Properties());
- String instanceDir = cfg.getProperty(coreName, "instanceDir", null);
- File idir = new File(instanceDir);
- System.out.println("idir:" + idir);
- if (!idir.isAbsolute()) {
- idir = new File(solrHome, instanceDir);
- }
- String confName = PropertiesUtil.substituteProperty(cfg.getProperty(coreName, "collection", null), new Properties());
- if (confName == null) {
- confName = rawName;
- }
- File udir = new File(idir, "conf");
- log.info("Uploading directory " + udir + " with name " + confName + " for SolrCore " + rawName);
+ for (CoreDescriptor cd : cds) {
+ String coreName = cd.getName();
+ String confName = cd.getCollectionName();
+ if (StringUtils.isEmpty(confName))
+ confName = coreName;
+ String instanceDir = cd.getInstanceDir();
+ File udir = new File(instanceDir, "conf");
+ log.info("Uploading directory " + udir + " with name " + confName + " for SolrCore " + coreName);
ZkController.uploadConfigDir(zkClient, udir, confName);
}
}
@@ -1501,11 +1548,11 @@ public final class ZkController {
}
/**
- * utilitiy method fro trimming and leading and/or trailing slashes from
+ * Utility method for trimming and leading and/or trailing slashes from
* it's input. May return the empty string. May return null if and only
* if the input is null.
*/
- private static String trimLeadingAndTrailingSlashes(final String in) {
+ public static String trimLeadingAndTrailingSlashes(final String in) {
if (null == in) return in;
String out = in;
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java Sun Aug 11 12:19:13 2013
@@ -32,6 +32,7 @@ import java.util.Set;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext.Context;
+import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.store.RateLimitedDirectoryWrapper;
@@ -40,6 +41,9 @@ import org.apache.lucene.store.SingleIns
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.store.blockcache.BlockDirectory;
+import org.apache.solr.store.hdfs.HdfsDirectory;
+import org.apache.solr.store.hdfs.HdfsLockFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -498,6 +502,24 @@ public abstract class CachingDirectoryFa
} else if ("single".equals(lockType)) {
if (!(dir.getLockFactory() instanceof SingleInstanceLockFactory)) dir
.setLockFactory(new SingleInstanceLockFactory());
+ } else if ("hdfs".equals(lockType)) {
+ Directory del = dir;
+
+ if (dir instanceof NRTCachingDirectory) {
+ del = ((NRTCachingDirectory) del).getDelegate();
+ }
+
+ if (del instanceof BlockDirectory) {
+ del = ((BlockDirectory) del).getDirectory();
+ }
+
+ if (!(del instanceof HdfsDirectory)) {
+ throw new SolrException(ErrorCode.FORBIDDEN, "Directory: "
+ + del.getClass().getName()
+ + ", but hdfs lock factory can only be used with HdfsDirectory");
+ }
+
+ dir.setLockFactory(new HdfsLockFactory(((HdfsDirectory)del).getHdfsDirPath(), ((HdfsDirectory)del).getConfiguration()));
} else if ("none".equals(lockType)) {
// Recipe for disaster
log.error("CONFIGURATION WARNING: locks are disabled on " + dir);
@@ -519,7 +541,7 @@ public abstract class CachingDirectoryFa
return path;
}
- private String stripTrailingSlash(String path) {
+ protected String stripTrailingSlash(String path) {
if (path.endsWith("/")) {
path = path.substring(0, path.length() - 1);
}
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/Config.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/Config.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/Config.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/Config.java Sun Aug 11 12:19:13 2013
@@ -68,6 +68,7 @@ public class Config {
static final XPathFactory xpathFactory = XPathFactory.newInstance();
private final Document doc;
+ private final Document origDoc; // with unsubstituted properties
private final String prefix;
private final String name;
private final SolrResourceLoader loader;
@@ -131,6 +132,7 @@ public class Config {
db.setErrorHandler(xmllog);
try {
doc = db.parse(is);
+ origDoc = copyDoc(doc);
} finally {
// some XML parsers are broken and don't close the byte stream (but they should according to spec)
IOUtils.closeQuietly(is.getByteStream());
@@ -140,19 +142,24 @@ public class Config {
}
} catch (ParserConfigurationException e) {
SolrException.log(log, "Exception during parsing file: " + name, e);
- throw e;
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch (SAXException e) {
SolrException.log(log, "Exception during parsing file: " + name, e);
- throw e;
- } catch( SolrException e ){
- SolrException.log(log,"Error in "+name,e);
- throw e;
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ } catch (TransformerException e) {
+ SolrException.log(log, "Exception during parsing file: " + name, e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
public Config(SolrResourceLoader loader, String name, Document doc) {
this.prefix = null;
this.doc = doc;
+ try {
+ this.origDoc = copyDoc(doc);
+ } catch (TransformerException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
this.name = name;
this.loader = loader;
}
@@ -217,15 +224,22 @@ public class Config {
}
}
- public Node getNode(String path, boolean errIfMissing) {
- XPath xpath = xpathFactory.newXPath();
- Node nd = null;
- String xstr = normalize(path);
+ public Node getNode(String path, boolean errifMissing) {
+ return getNode(path, doc, errifMissing);
+ }
- try {
- nd = (Node)xpath.evaluate(xstr, doc, XPathConstants.NODE);
+ public Node getUnsubstitutedNode(String path, boolean errIfMissing) {
+ return getNode(path, origDoc, errIfMissing);
+ }
- if (nd==null) {
+ public Node getNode(String path, Document doc, boolean errIfMissing) {
+ XPath xpath = xpathFactory.newXPath();
+ String xstr = normalize(path);
+
+ try {
+ NodeList nodes = (NodeList)xpath.evaluate(xstr, doc,
+ XPathConstants.NODESET);
+ if (nodes==null || 0 == nodes.getLength() ) {
if (errIfMissing) {
throw new RuntimeException(name + " missing "+path);
} else {
@@ -233,7 +247,11 @@ public class Config {
return null;
}
}
-
+ if ( 1 < nodes.getLength() ) {
+ throw new SolrException( SolrException.ErrorCode.SERVER_ERROR,
+ name + " contains more than one value for config path: " + path);
+ }
+ Node nd = nodes.item(0);
log.trace(name + ":" + path + "=" + nd);
return nd;
@@ -441,4 +459,9 @@ public class Config {
return version;
}
+
+ public Config getOriginalConfig() {
+ return new Config(loader, null, origDoc);
+ }
+
}
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/ConfigSolr.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/ConfigSolr.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/ConfigSolr.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/ConfigSolr.java Sun Aug 11 12:19:13 2013
@@ -17,31 +17,190 @@ package org.apache.solr.core;
* limitations under the License.
*/
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpressionException;
-
+import com.google.common.base.Charsets;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.io.IOUtils;
import org.apache.solr.common.SolrException;
+import org.apache.solr.logging.LogWatcherConfig;
import org.apache.solr.util.DOMUtil;
import org.apache.solr.util.PropertiesUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
public abstract class ConfigSolr {
protected static Logger log = LoggerFactory.getLogger(ConfigSolr.class);
public final static String SOLR_XML_FILE = "solr.xml";
+
+ public static ConfigSolr fromFile(SolrResourceLoader loader, File configFile) {
+ log.info("Loading container configuration from {}", configFile.getAbsolutePath());
+
+ InputStream inputStream = null;
+
+ try {
+ if (!configFile.exists()) {
+ log.info("{} does not exist, using default configuration", configFile.getAbsolutePath());
+ inputStream = new ByteArrayInputStream(ConfigSolrXmlOld.DEF_SOLR_XML.getBytes(Charsets.UTF_8));
+ }
+ else {
+ inputStream = new FileInputStream(configFile);
+ }
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ByteStreams.copy(inputStream, baos);
+ String originalXml = IOUtils.toString(new ByteArrayInputStream(baos.toByteArray()), "UTF-8");
+ return fromInputStream(loader, new ByteArrayInputStream(baos.toByteArray()), configFile, originalXml);
+ }
+ catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Could not load SOLR configuration", e);
+ }
+ finally {
+ IOUtils.closeQuietly(inputStream);
+ }
+ }
+
+ public static ConfigSolr fromString(String xml) {
+ return fromInputStream(null, new ByteArrayInputStream(xml.getBytes(Charsets.UTF_8)), null, xml);
+ }
+
+ public static ConfigSolr fromInputStream(SolrResourceLoader loader, InputStream is, File file, String originalXml) {
+ try {
+ Config config = new Config(loader, null, new InputSource(is), null, false);
+ return fromConfig(config, file, originalXml);
+ }
+ catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+ public static ConfigSolr fromSolrHome(SolrResourceLoader loader, String solrHome) {
+ return fromFile(loader, new File(solrHome, SOLR_XML_FILE));
+ }
+
+ public static ConfigSolr fromConfig(Config config, File file, String originalXml) {
+ boolean oldStyle = (config.getNode("solr/cores", false) != null);
+ return oldStyle ? new ConfigSolrXmlOld(config, file, originalXml)
+ : new ConfigSolrXml(config);
+ }
+ public abstract CoresLocator getCoresLocator();
+
+ public PluginInfo getShardHandlerFactoryPluginInfo() {
+ Node node = config.getNode(getShardHandlerFactoryConfigPath(), false);
+ return (node == null) ? null : new PluginInfo(node, "shardHandlerFactory", false, true);
+ }
+
+ protected abstract String getShardHandlerFactoryConfigPath();
+
+ public String getZkHost() {
+ String sysZkHost = System.getProperty("zkHost");
+ if (sysZkHost != null)
+ return sysZkHost;
+ return get(CfgProp.SOLR_ZKHOST, null);
+ }
+
+ public int getZkClientTimeout() {
+ String sysProp = System.getProperty("zkClientTimeout");
+ if (sysProp != null)
+ return Integer.parseInt(sysProp);
+ return getInt(CfgProp.SOLR_ZKCLIENTTIMEOUT, DEFAULT_ZK_CLIENT_TIMEOUT);
+ }
+
+ private static final int DEFAULT_ZK_CLIENT_TIMEOUT = 15000;
+ private static final int DEFAULT_LEADER_VOTE_WAIT = 180000; // 3 minutes
+ private static final int DEFAULT_CORE_LOAD_THREADS = 3;
+
+ protected static final String DEFAULT_CORE_ADMIN_PATH = "/admin/cores";
+
+ public String getZkHostPort() {
+ return get(CfgProp.SOLR_HOSTPORT, null);
+ }
+
+ public String getZkHostContext() {
+ return get(CfgProp.SOLR_HOSTCONTEXT, null);
+ }
+
+ public String getHost() {
+ return get(CfgProp.SOLR_HOST, null);
+ }
+
+ public int getLeaderVoteWait() {
+ return getInt(CfgProp.SOLR_LEADERVOTEWAIT, DEFAULT_LEADER_VOTE_WAIT);
+ }
+
+ public boolean getGenericCoreNodeNames() {
+ return getBool(CfgProp.SOLR_GENERICCORENODENAMES, false);
+ }
+
+ public int getDistributedConnectionTimeout() {
+ return getInt(CfgProp.SOLR_DISTRIBUPDATECONNTIMEOUT, 0);
+ }
+
+ public int getDistributedSocketTimeout() {
+ return getInt(CfgProp.SOLR_DISTRIBUPDATESOTIMEOUT, 0);
+ }
+
+ public int getCoreLoadThreadCount() {
+ return getInt(ConfigSolr.CfgProp.SOLR_CORELOADTHREADS, DEFAULT_CORE_LOAD_THREADS);
+ }
+
+ public String getSharedLibDirectory() {
+ return get(ConfigSolr.CfgProp.SOLR_SHAREDLIB , null);
+ }
+
+ public String getDefaultCoreName() {
+ return get(CfgProp.SOLR_CORES_DEFAULT_CORE_NAME, null);
+ }
+
+ public abstract boolean isPersistent();
+
+ public String getAdminPath() {
+ return get(CfgProp.SOLR_ADMINPATH, DEFAULT_CORE_ADMIN_PATH);
+ }
+
+ public String getCoreAdminHandlerClass() {
+ return get(CfgProp.SOLR_ADMINHANDLER, "org.apache.solr.handler.admin.CoreAdminHandler");
+ }
+
+ public boolean hasSchemaCache() {
+ return getBool(ConfigSolr.CfgProp.SOLR_SHARESCHEMA, false);
+ }
+
+ public String getManagementPath() {
+ return get(CfgProp.SOLR_MANAGEMENTPATH, null);
+ }
+
+ public LogWatcherConfig getLogWatcherConfig() {
+ return new LogWatcherConfig(
+ getBool(CfgProp.SOLR_LOGGING_ENABLED, false),
+ get(CfgProp.SOLR_LOGGING_CLASS, null),
+ get(CfgProp.SOLR_LOGGING_WATCHER_THRESHOLD, null),
+ getInt(CfgProp.SOLR_LOGGING_WATCHER_SIZE, 50)
+ );
+ }
+
+ public int getTransientCacheSize() {
+ return getInt(CfgProp.SOLR_TRANSIENTCACHESIZE, Integer.MAX_VALUE);
+ }
+
// Ugly for now, but we'll at least be able to centralize all of the differences between 4x and 5x.
- public static enum CfgProp {
+ protected static enum CfgProp {
SOLR_ADMINHANDLER,
SOLR_CORELOADTHREADS,
SOLR_COREROOTDIRECTORY,
@@ -57,12 +216,9 @@ public abstract class ConfigSolr {
SOLR_LOGGING_WATCHER_THRESHOLD,
SOLR_MANAGEMENTPATH,
SOLR_SHAREDLIB,
- SOLR_SHARDHANDLERFACTORY_CLASS,
- SOLR_SHARDHANDLERFACTORY_CONNTIMEOUT,
- SOLR_SHARDHANDLERFACTORY_NAME,
- SOLR_SHARDHANDLERFACTORY_SOCKETTIMEOUT,
SOLR_SHARESCHEMA,
SOLR_TRANSIENTCACHESIZE,
+ SOLR_GENERICCORENODENAMES,
SOLR_ZKCLIENTTIMEOUT,
SOLR_ZKHOST,
@@ -77,6 +233,12 @@ public abstract class ConfigSolr {
public ConfigSolr(Config config) {
this.config = config;
+
+ }
+
+ // for extension & testing.
+ protected ConfigSolr() {
+
}
public Config getConfig() {
@@ -101,12 +263,6 @@ public abstract class ConfigSolr {
return (val == null) ? def : val;
}
- // For saving the original property, ${} syntax and all.
- public String getOrigProp(CfgProp prop, String def) {
- String val = propMap.get(prop);
- return (val == null) ? def : val;
- }
-
public Properties getSolrProperties(String path) {
try {
return readProperties(((NodeList) config.evaluate(
@@ -124,20 +280,11 @@ public abstract class ConfigSolr {
Properties properties = new Properties();
for (int i = 0; i < props.getLength(); i++) {
Node prop = props.item(i);
- properties.setProperty(DOMUtil.getAttr(prop, "name"), DOMUtil.getAttr(prop, "value"));
+ properties.setProperty(DOMUtil.getAttr(prop, "name"),
+ PropertiesUtil.substituteProperty(DOMUtil.getAttr(prop, "value"), null));
}
return properties;
}
- public abstract void substituteProperties();
-
- public abstract List<String> getAllCoreNames();
-
- public abstract String getProperty(String coreName, String property, String defaultVal);
-
- public abstract Properties readCoreProperties(String coreName);
-
- public abstract Map<String, String> readCoreAttributes(String coreName);
-
}