You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2012/12/10 17:37:54 UTC
svn commit: r1419570 [10/14] - in /lucene/dev/branches/lucene4547: ./
dev-tools/ dev-tools/eclipse/ dev-tools/idea/.idea/libraries/
dev-tools/idea/lucene/analysis/icu/
dev-tools/idea/solr/contrib/dataimporthandler/ dev-tools/maven/
dev-tools/maven/luce...
Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/SolrLogFormatter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/SolrLogFormatter.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/SolrLogFormatter.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/SolrLogFormatter.java Mon Dec 10 16:36:47 2012
@@ -263,7 +263,7 @@ sb.append("(group_name=").append(tg.getN
private Map<String,Object> getCoreProps(ZkController zkController, SolrCore core) {
final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- Replica props = zkController.getClusterState().getShardProps(collection, ZkStateReader.getCoreNodeName(zkController.getNodeName(), core.getName()));
+ Replica props = zkController.getClusterState().getReplica(collection, ZkStateReader.getCoreNodeName(zkController.getNodeName(), core.getName()));
if(props!=null) {
return props.getProperties();
}
Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/AssignShard.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/AssignShard.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/AssignShard.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/AssignShard.java Mon Dec 10 16:36:47 2012
@@ -39,7 +39,10 @@ public class AssignShard {
numShards = 1;
}
String returnShardId = null;
- Map<String, Slice> sliceMap = state.getSlices(collection);
+ Map<String, Slice> sliceMap = state.getSlicesMap(collection);
+
+
+ // TODO: now that we create shards ahead of time, is this code needed? Esp since hash ranges aren't assigned when creating via this method?
if (sliceMap == null) {
return "shard1";
@@ -51,6 +54,8 @@ public class AssignShard {
return "shard" + (shardIdNames.size() + 1);
}
+ // TODO: don't need to sort to find shard with fewest replicas!
+
// else figure out which shard needs more replicas
final Map<String, Integer> map = new HashMap<String, Integer>();
for (String shardId : shardIdNames) {
Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Mon Dec 10 16:36:47 2012
@@ -21,7 +21,6 @@ package org.apache.solr.cloud;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.TreeMap;
-import java.util.concurrent.CountDownLatch;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.zookeeper.CreateMode;
@@ -40,6 +39,8 @@ public class DistributedQueue {
private static final Logger LOG = LoggerFactory
.getLogger(DistributedQueue.class);
+ private static long DEFAULT_TIMEOUT = 5*60*1000;
+
private final String dir;
private SolrZkClient zookeeper;
@@ -163,20 +164,22 @@ public class DistributedQueue {
private class LatchChildWatcher implements Watcher {
- CountDownLatch latch;
+ Object lock = new Object();
- public LatchChildWatcher() {
- latch = new CountDownLatch(1);
- }
+ public LatchChildWatcher() {}
public void process(WatchedEvent event) {
- LOG.debug("Watcher fired on path: " + event.getPath() + " state: "
+ LOG.info("Watcher fired on path: " + event.getPath() + " state: "
+ event.getState() + " type " + event.getType());
- latch.countDown();
+ synchronized (lock) {
+ lock.notifyAll();
+ }
}
- public void await() throws InterruptedException {
- latch.await();
+ public void await(long timeout) throws InterruptedException {
+ synchronized (lock) {
+ lock.wait(timeout);
+ }
}
}
@@ -197,7 +200,7 @@ public class DistributedQueue {
continue;
}
if (orderedChildren.size() == 0) {
- childWatcher.await();
+ childWatcher.await(DEFAULT_TIMEOUT);
continue;
}
@@ -274,7 +277,7 @@ public class DistributedQueue {
continue;
}
if (orderedChildren.size() == 0) {
- childWatcher.await();
+ childWatcher.await(DEFAULT_TIMEOUT);
continue;
}
Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Mon Dec 10 16:36:47 2012
@@ -163,7 +163,7 @@ final class ShardLeaderElectionContext e
}
// should I be leader?
- if (weAreReplacement && !shouldIBeLeader(leaderProps, core)) {
+ if (weAreReplacement && !shouldIBeLeader(leaderProps, core, weAreReplacement)) {
rejoinLeaderElection(leaderSeqPath, core);
return;
}
@@ -249,7 +249,7 @@ final class ShardLeaderElectionContext e
core.getCoreDescriptor().getCloudDescriptor().isLeader = false;
// we could not publish ourselves as leader - rejoin election
- rejoinLeaderElection(coreName, core);
+ rejoinLeaderElection(leaderSeqPath, core);
} finally {
if (core != null) {
core.close();
@@ -263,7 +263,7 @@ final class ShardLeaderElectionContext e
ZkNodeProps leaderProps, String collection, String shardId) {
ClusterState clusterState = zkController.getZkStateReader()
.getClusterState();
- Map<String,Slice> slices = clusterState.getSlices(collection);
+ Map<String,Slice> slices = clusterState.getSlicesMap(collection);
Slice slice = slices.get(shardId);
Map<String,Replica> replicasMap = slice.getReplicasMap();
for (Map.Entry<String,Replica> shard : replicasMap.entrySet()) {
@@ -293,7 +293,7 @@ final class ShardLeaderElectionContext e
final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
Slice slices = zkController.getClusterState().getSlice(collection, shardId);
-
+ int cnt = 0;
while (true && !isClosed) {
// wait for everyone to be up
if (slices != null) {
@@ -310,9 +310,11 @@ final class ShardLeaderElectionContext e
log.info("Enough replicas found to continue.");
return;
} else {
- log.info("Waiting until we see more replicas up: total="
+ if (cnt % 40 == 0) {
+ log.info("Waiting until we see more replicas up: total="
+ slices.getReplicasMap().size() + " found=" + found
+ " timeoutin=" + (timeoutAt - System.currentTimeMillis()));
+ }
}
if (System.currentTimeMillis() > timeoutAt) {
@@ -323,6 +325,8 @@ final class ShardLeaderElectionContext e
Thread.sleep(500);
slices = zkController.getClusterState().getSlice(collection, shardId);
+ // System.out.println("###### waitForReplicasToComeUp : slices=" + slices + " all=" + zkController.getClusterState().getCollectionStates() );
+ cnt++;
}
}
@@ -347,7 +351,7 @@ final class ShardLeaderElectionContext e
leaderElector.joinElection(this, true);
}
- private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core) {
+ private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core, boolean weAreReplacement) {
log.info("Checking if I should try and be the leader.");
if (isClosed) {
@@ -355,6 +359,12 @@ final class ShardLeaderElectionContext e
return false;
}
+ if (!weAreReplacement) {
+ // we are the first node starting in the shard - there is a configurable wait
+ // to make sure others participate in sync and leader election, we can be leader
+ return true;
+ }
+
if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished()
.equals(ZkStateReader.ACTIVE)) {
log.info("My last published State was Active, it's okay to be the leader.");
Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/Overseer.java Mon Dec 10 16:36:47 2012
@@ -17,6 +17,7 @@ package org.apache.solr.cloud;
* the License.
*/
+import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -25,7 +26,10 @@ import java.util.Map;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ClosableThread;
-import org.apache.solr.common.cloud.HashPartitioner;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
@@ -221,14 +225,19 @@ public class Overseer {
String nodeName = message.getStr(ZkStateReader.NODE_NAME_PROP);
//get shardId from ClusterState
sliceName = getAssignedId(state, nodeName, message);
+ if (sliceName != null) {
+ log.info("shard=" + sliceName + " is already registered");
+ }
}
if(sliceName == null) {
//request new shardId
if (collectionExists) {
// use existing numShards
- numShards = state.getCollectionStates().get(collection).size();
+ numShards = state.getCollectionStates().get(collection).getSlices().size();
+ log.info("Collection already exists with " + ZkStateReader.NUM_SHARDS_PROP + "=" + numShards);
}
sliceName = AssignShard.assignShard(collection, state, numShards);
+ log.info("Assigning new node to shard shard=" + sliceName);
}
Slice slice = state.getSlice(collection, sliceName);
@@ -269,16 +278,23 @@ public class Overseer {
return newClusterState;
}
+ private Map<String,Object> defaultCollectionProps() {
+ HashMap<String,Object> props = new HashMap<String, Object>(2);
+ props.put(DocCollection.DOC_ROUTER, DocRouter.DEFAULT_NAME);
+ return props;
+ }
+
private ClusterState createCollection(ClusterState state, String collectionName, int numShards) {
log.info("Create collection {} with numShards {}", collectionName, numShards);
-
- HashPartitioner hp = new HashPartitioner();
- List<HashPartitioner.Range> ranges = hp.partitionRange(numShards, hp.fullRange());
+
+ DocRouter router = DocRouter.DEFAULT;
+ List<DocRouter.Range> ranges = router.partitionRange(numShards, router.fullRange());
+
+ Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>();
- Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String, Slice>>();
Map<String, Slice> newSlices = new LinkedHashMap<String,Slice>();
- newStates.putAll(state.getCollectionStates());
+ newCollections.putAll(state.getCollectionStates());
for (int i = 0; i < numShards; i++) {
final String sliceName = "shard" + (i+1);
@@ -287,20 +303,26 @@ public class Overseer {
newSlices.put(sliceName, new Slice(sliceName, null, sliceProps));
}
- newStates.put(collectionName, newSlices);
- ClusterState newClusterState = new ClusterState(state.getLiveNodes(), newStates);
+
+ // TODO: fill in with collection properties read from the /collections/<collectionName> node
+ Map<String,Object> collectionProps = defaultCollectionProps();
+
+ DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router);
+
+ newCollections.put(collectionName, newCollection);
+ ClusterState newClusterState = new ClusterState(state.getLiveNodes(), newCollections);
return newClusterState;
}
-
+
/*
* Return an already assigned id or null if not assigned
*/
private String getAssignedId(final ClusterState state, final String nodeName,
final ZkNodeProps coreState) {
final String key = coreState.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + coreState.getStr(ZkStateReader.CORE_NAME_PROP);
- Map<String, Slice> slices = state.getSlices(coreState.getStr(ZkStateReader.COLLECTION_PROP));
+ Collection<Slice> slices = state.getSlices(coreState.getStr(ZkStateReader.COLLECTION_PROP));
if (slices != null) {
- for (Slice slice : slices.values()) {
+ for (Slice slice : slices) {
if (slice.getReplicasMap().get(key) != null) {
return slice.getName();
}
@@ -309,40 +331,49 @@ public class Overseer {
return null;
}
- private ClusterState updateSlice(ClusterState state, String collection, Slice slice) {
+ private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) {
// System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates()));
// System.out.println("Updating slice:" + slice);
- Map<String, Map<String, Slice>> newCollections = new LinkedHashMap<String,Map<String,Slice>>(state.getCollectionStates()); // make a shallow copy
- Map<String, Slice> slices = newCollections.get(collection);
- if (slices == null) {
+ Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(state.getCollectionStates()); // make a shallow copy
+ DocCollection coll = newCollections.get(collectionName);
+ Map<String,Slice> slices;
+ Map<String,Object> props;
+ DocRouter router;
+
+ if (coll == null) {
+ // when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself
+ // without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router.
slices = new HashMap<String, Slice>(1);
+ props = new HashMap<String,Object>(1);
+ props.put(DocCollection.DOC_ROUTER, ImplicitDocRouter.NAME);
+ router = new ImplicitDocRouter();
} else {
- slices = new LinkedHashMap<String, Slice>(slices); // make a shallow copy
- }
- slices.put(slice.getName(), slice);
- newCollections.put(collection, slices);
+ props = coll.getProperties();
+ router = coll.getRouter();
+ slices = new LinkedHashMap<String, Slice>(coll.getSlicesMap()); // make a shallow copy
+ }
+ slices.put(slice.getName(), slice);
+ DocCollection newCollection = new DocCollection(collectionName, slices, props, router);
+ newCollections.put(collectionName, newCollection);
// System.out.println("###!!!### NEW CLUSTERSTATE: " + JSONUtil.toJSON(newCollections));
return new ClusterState(state.getLiveNodes(), newCollections);
}
- private ClusterState setShardLeader(ClusterState state, String collection, String sliceName, String leaderUrl) {
-
- final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>(state.getCollectionStates());
+ private ClusterState setShardLeader(ClusterState state, String collectionName, String sliceName, String leaderUrl) {
- Map<String, Slice> slices = newStates.get(collection);
-
- if(slices==null) {
- log.error("Could not mark shard leader for non existing collection:" + collection);
+ final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(state.getCollectionStates());
+ DocCollection coll = newCollections.get(collectionName);
+ if(coll == null) {
+ log.error("Could not mark shard leader for non existing collection:" + collectionName);
return state;
}
+ Map<String, Slice> slices = coll.getSlicesMap();
// make a shallow copy and add it to the new collection
slices = new LinkedHashMap<String,Slice>(slices);
- newStates.put(collection, slices);
-
Slice slice = slices.get(sliceName);
if (slice == null) {
@@ -378,7 +409,11 @@ public class Overseer {
Slice newSlice = new Slice(slice.getName(), newReplicas, slice.getProperties());
slices.put(newSlice.getName(), newSlice);
}
- return new ClusterState(state.getLiveNodes(), newStates);
+
+
+ DocCollection newCollection = new DocCollection(coll.getName(), slices, coll.getProperties(), coll.getRouter());
+ newCollections.put(collectionName, newCollection);
+ return new ClusterState(state.getLiveNodes(), newCollections);
}
/*
@@ -389,48 +424,57 @@ public class Overseer {
final String coreNodeName = message.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + message.getStr(ZkStateReader.CORE_NAME_PROP);
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
- final LinkedHashMap<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
- for(String collectionName: clusterState.getCollections()) {
- if(collection.equals(collectionName)) {
- Map<String, Slice> slices = clusterState.getSlices(collection);
- LinkedHashMap<String, Slice> newSlices = new LinkedHashMap<String, Slice>();
- for(Slice slice: slices.values()) {
- if(slice.getReplicasMap().containsKey(coreNodeName)) {
- Map<String, Replica> newReplicas = slice.getReplicasCopy();
- newReplicas.remove(coreNodeName);
- Slice newSlice = new Slice(slice.getName(), newReplicas, slice.getProperties());
- newSlices.put(slice.getName(), newSlice);
- } else {
- newSlices.put(slice.getName(), slice);
- }
- }
- int cnt = 0;
- for (Slice slice : newSlices.values()) {
- cnt+=slice.getReplicasMap().size();
- }
- // TODO: if no nodes are left after this unload
- // remove from zk - do we have a race where Overseer
- // see's registered nodes and publishes though?
- if (cnt > 0) {
- newStates.put(collectionName, newSlices);
+ final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
+ DocCollection coll = newCollections.get(collection);
+ if (coll == null) {
+ // TODO: log/error that we didn't find it?
+ return clusterState;
+ }
+
+ Map<String, Slice> newSlices = new LinkedHashMap<String, Slice>();
+ for (Slice slice : coll.getSlices()) {
+ Replica replica = slice.getReplica(coreNodeName);
+ if (replica != null) {
+ Map<String, Replica> newReplicas = slice.getReplicasCopy();
+ newReplicas.remove(coreNodeName);
+ // TODO TODO TODO!!! if there are no replicas left for the slice, and the slice has no hash range, remove it
+ // if (newReplicas.size() == 0 && slice.getRange() == null) {
+ // if there are no replicas left for the slice remove it
+ if (newReplicas.size() == 0) {
+ slice = null;
} else {
- // TODO: it might be better logically to have this in ZkController
- // but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or
- // ZkController out of the Overseer.
- try {
- zkClient.clean("/collections/" + collectionName);
- } catch (InterruptedException e) {
- SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collectionName, e);
- Thread.currentThread().interrupt();
- } catch (KeeperException e) {
- SolrException.log(log, "Problem cleaning up collection in zk:" + collectionName, e);
- }
+ slice = new Slice(slice.getName(), newReplicas, slice.getProperties());
}
- } else {
- newStates.put(collectionName, clusterState.getSlices(collectionName));
}
+
+ if (slice != null) {
+ newSlices.put(slice.getName(), slice);
+ }
+ }
+
+ // if there are no slices left in the collection, remove it?
+ if (newSlices.size() == 0) {
+ newCollections.remove(coll.getName());
+
+ // TODO: it might be better logically to have this in ZkController
+ // but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or
+ // ZkController out of the Overseer.
+ try {
+ zkClient.clean("/collections/" + collection);
+ } catch (InterruptedException e) {
+ SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collection, e);
+ Thread.currentThread().interrupt();
+ } catch (KeeperException e) {
+ SolrException.log(log, "Problem cleaning up collection in zk:" + collection, e);
+ }
+
+
+ } else {
+ DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
+ newCollections.put(newCollection.getName(), newCollection);
}
- ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newStates);
+
+ ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newCollections);
return newState;
}
Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Mon Dec 10 16:36:47 2012
@@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -42,8 +43,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OverseerCollectionProcessor implements Runnable {
+
+ public static final String NUM_SLICES = "numShards";
+
public static final String REPLICATION_FACTOR = "replicationFactor";
-
+
+ public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
+
public static final String DELETECOLLECTION = "deletecollection";
public static final String CREATECOLLECTION = "createcollection";
@@ -158,124 +164,178 @@ public class OverseerCollectionProcessor
}
private boolean createCollection(ClusterState clusterState, ZkNodeProps message) {
-
- // look at the replication factor and see if it matches reality
- // if it does not, find best nodes to create more cores
-
- String numReplicasString = message.getStr(REPLICATION_FACTOR);
- int numReplicas;
- try {
- numReplicas = numReplicasString == null ? 0 : Integer.parseInt(numReplicasString);
- } catch (Exception ex) {
- SolrException.log(log, "Could not parse " + REPLICATION_FACTOR, ex);
- return false;
- }
- String numShardsString = message.getStr("numShards");
- int numShards;
- try {
- numShards = numShardsString == null ? 0 : Integer.parseInt(numShardsString);
- } catch (Exception ex) {
- SolrException.log(log, "Could not parse numShards", ex);
+ String collectionName = message.getStr("name");
+ if (clusterState.getCollections().contains(collectionName)) {
+ SolrException.log(log, "collection already exists: " + collectionName);
return false;
}
- String name = message.getStr("name");
- String configName = message.getStr("collection.configName");
-
- // we need to look at every node and see how many cores it serves
- // add our new cores to existing nodes serving the least number of cores
- // but (for now) require that each core goes on a distinct node.
-
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
-
-
- // TODO: add smarter options that look at the current number of cores per node?
- // for now we just go random
- Set<String> nodes = clusterState.getLiveNodes();
- List<String> nodeList = new ArrayList<String>(nodes.size());
- nodeList.addAll(nodes);
- Collections.shuffle(nodeList);
-
- int numNodes = numShards * (numReplicas + 1);
- List<String> createOnNodes = nodeList.subList(0, Math.min(nodeList.size(), numNodes));
-
- log.info("Create collection " + name + " on " + createOnNodes);
-
- for (String replica : createOnNodes) {
- // TODO: this does not work if original url had _ in it
- // We should have a master list
- replica = replica.replaceAll("_", "/");
- params.set(CoreAdminParams.NAME, name);
- params.set("collection.configName", configName);
- params.set("numShards", numShards);
- ShardRequest sreq = new ShardRequest();
- params.set("qt", adminPath);
- sreq.purpose = 1;
- // TODO: this sucks
- if (replica.startsWith("http://")) replica = replica.substring(7);
- sreq.shards = new String[] {replica};
- sreq.actualShards = sreq.shards;
- sreq.params = params;
+ try {
+ // look at the replication factor and see if it matches reality
+ // if it does not, find best nodes to create more cores
- shardHandler.submit(sreq, replica, sreq.params);
- }
-
- int failed = 0;
- ShardResponse srsp;
- do {
- srsp = shardHandler.takeCompletedOrError();
- if (srsp != null) {
- Throwable e = srsp.getException();
- if (e != null) {
- // should we retry?
- // TODO: we should return errors to the client
- // TODO: what if one fails and others succeed?
- failed++;
- log.error("Error talking to shard: " + srsp.getShard(), e);
+ int numReplica = msgStrToInt(message, REPLICATION_FACTOR, 0);
+ int numSlices = msgStrToInt(message, NUM_SLICES, 0);
+ int maxShardsPerNode = msgStrToInt(message, MAX_SHARDS_PER_NODE, 1);
+
+ if (numReplica < 0) {
+ SolrException.log(log, REPLICATION_FACTOR + " must be > 0");
+ return false;
+ }
+
+ if (numSlices < 0) {
+ SolrException.log(log, NUM_SLICES + " must be > 0");
+ return false;
+ }
+
+ String configName = message.getStr("collection.configName");
+
+ // we need to look at every node and see how many cores it serves
+ // add our new cores to existing nodes serving the least number of cores
+ // but (for now) require that each core goes on a distinct node.
+
+ // TODO: add smarter options that look at the current number of cores per
+ // node?
+ // for now we just go random
+ Set<String> nodes = clusterState.getLiveNodes();
+ List<String> nodeList = new ArrayList<String>(nodes.size());
+ nodeList.addAll(nodes);
+ Collections.shuffle(nodeList);
+
+ if (nodeList.size() <= 0) {
+ log.error("Cannot create collection " + collectionName
+ + ". No live Solr-instaces");
+ return false;
+ }
+
+ int numShardsPerSlice = numReplica + 1;
+ if (numShardsPerSlice > nodeList.size()) {
+ log.warn("Specified "
+ + REPLICATION_FACTOR
+ + " of "
+ + numReplica
+ + " on collection "
+ + collectionName
+ + " is higher than or equal to the number of Solr instances currently live ("
+ + nodeList.size()
+ + "). Its unusual to run two replica of the same slice on the same Solr-instance.");
+ }
+
+ int maxShardsAllowedToCreate = maxShardsPerNode * nodeList.size();
+ int requestedShardsToCreate = numSlices * numShardsPerSlice;
+ if (maxShardsAllowedToCreate < requestedShardsToCreate) {
+ log.error("Cannot create collection " + collectionName + ". Value of "
+ + MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
+ + ", and the number of live nodes is " + nodeList.size()
+ + ". This allows a maximum of " + maxShardsAllowedToCreate
+ + " to be created. Value of " + NUM_SLICES + " is " + numSlices
+ + " and value of " + REPLICATION_FACTOR + " is " + numReplica
+ + ". This requires " + requestedShardsToCreate
+ + " shards to be created (higher than the allowed number)");
+ return false;
+ }
+
+ for (int i = 1; i <= numSlices; i++) {
+ for (int j = 1; j <= numShardsPerSlice; j++) {
+ String nodeName = nodeList.get(((i - 1) + (j - 1)) % nodeList.size());
+ String sliceName = "shard" + i;
+ String shardName = collectionName + "_" + sliceName + "_replica" + j;
+ log.info("Creating shard " + shardName + " as part of slice "
+ + sliceName + " of collection " + collectionName + " on "
+ + nodeName);
+
+ // Need to create new params for each request
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
+
+ params.set(CoreAdminParams.NAME, shardName);
+ params.set("collection.configName", configName);
+ params.set(CoreAdminParams.COLLECTION, collectionName);
+ params.set(CoreAdminParams.SHARD, sliceName);
+ params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+
+ ShardRequest sreq = new ShardRequest();
+ params.set("qt", adminPath);
+ sreq.purpose = 1;
+ // TODO: this does not work if original url had _ in it
+ // We should have a master list
+ String replica = nodeName.replaceAll("_", "/");
+ if (replica.startsWith("http://")) replica = replica.substring(7);
+ sreq.shards = new String[] {replica};
+ sreq.actualShards = sreq.shards;
+ sreq.params = params;
+
+ shardHandler.submit(sreq, replica, sreq.params);
+
}
}
- } while (srsp != null);
-
-
- // if all calls succeeded, return true
- if (failed > 0) {
+
+ int failed = 0;
+ ShardResponse srsp;
+ do {
+ srsp = shardHandler.takeCompletedOrError();
+ if (srsp != null) {
+ Throwable e = srsp.getException();
+ if (e != null) {
+ // should we retry?
+ // TODO: we should return errors to the client
+ // TODO: what if one fails and others succeed?
+ failed++;
+ log.error("Error talking to shard: " + srsp.getShard(), e);
+ }
+ }
+ } while (srsp != null);
+
+ // if all calls succeeded, return true
+ if (failed > 0) {
+ return false;
+ }
+ log.info("Successfully created all shards for collection "
+ + collectionName);
+ return true;
+ } catch (Exception ex) {
+ // Expecting that the necessary logging has already been performed
return false;
}
- return true;
}
private boolean collectionCmd(ClusterState clusterState, ZkNodeProps message, ModifiableSolrParams params) {
log.info("Executing Collection Cmd : " + params);
- String name = message.getStr("name");
+ String collectionName = message.getStr("name");
- Map<String,Slice> slices = clusterState.getCollectionStates().get(name);
+ DocCollection coll = clusterState.getCollection(collectionName);
- if (slices == null) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Could not find collection:" + name);
+ if (coll == null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "Could not find collection:" + collectionName);
}
- for (Map.Entry<String,Slice> entry : slices.entrySet()) {
+ for (Map.Entry<String,Slice> entry : coll.getSlicesMap().entrySet()) {
Slice slice = entry.getValue();
Map<String,Replica> shards = slice.getReplicasMap();
Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
for (Map.Entry<String,Replica> shardEntry : shardEntries) {
final ZkNodeProps node = shardEntry.getValue();
if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP))) {
- params.set(CoreAdminParams.CORE, node.getStr(ZkStateReader.CORE_NAME_PROP));
-
+ // For thread safety, only simple clone the ModifiableSolrParams
+ ModifiableSolrParams cloneParams = new ModifiableSolrParams();
+ cloneParams.add(params);
+ cloneParams.set(CoreAdminParams.CORE,
+ node.getStr(ZkStateReader.CORE_NAME_PROP));
+
String replica = node.getStr(ZkStateReader.BASE_URL_PROP);
ShardRequest sreq = new ShardRequest();
+
// yes, they must use same admin handler path everywhere...
- params.set("qt", adminPath);
-
+ cloneParams.set("qt", adminPath);
sreq.purpose = 1;
// TODO: this sucks
if (replica.startsWith("http://")) replica = replica.substring(7);
sreq.shards = new String[] {replica};
sreq.actualShards = sreq.shards;
- sreq.params = params;
- log.info("Collection Admin sending CoreAdmin cmd to " + replica);
+ sreq.params = cloneParams;
+ log.info("Collection Admin sending CoreAdmin cmd to " + replica
+ + " params:" + sreq.params);
shardHandler.submit(sreq, replica, sreq.params);
}
}
@@ -304,4 +364,15 @@ public class OverseerCollectionProcessor
}
return true;
}
+
+ private int msgStrToInt(ZkNodeProps message, String key, Integer def)
+ throws Exception {
+ String str = message.getStr(key);
+ try {
+ return str == null ? def : Integer.parseInt(str);
+ } catch (Exception ex) {
+ SolrException.log(log, "Could not parse " + key, ex);
+ throw ex;
+ }
+ }
}
Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/ZkController.java Mon Dec 10 16:36:47 2012
@@ -20,7 +20,9 @@ package org.apache.solr.cloud;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
+import java.net.NetworkInterface;
import java.util.Collections;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -42,6 +44,7 @@ import org.apache.solr.client.solrj.requ
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
@@ -137,14 +140,6 @@ public final class ZkController {
private int clientTimeout;
-
- public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
- String localHostContext, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
- TimeoutException, IOException {
- this(cc, zkServerAddress, zkClientTimeout, zkClientConnectTimeout, localHost, locaHostPort, localHostContext, null, registerOnReconnect);
- }
-
-
public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
String localHostContext, String leaderVoteWait, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
TimeoutException, IOException {
@@ -358,7 +353,29 @@ public final class ZkController {
private String getHostAddress(String host) throws IOException {
if (host == null) {
- host = "http://" + InetAddress.getLocalHost().getHostName();
+ String hostaddress = InetAddress.getLocalHost().getHostAddress();
+ // Re-get the IP again for "127.0.0.1", the other case we trust the hosts
+ // file is right.
+ if ("127.0.0.1".equals(hostaddress)) {
+ Enumeration<NetworkInterface> netInterfaces = null;
+ try {
+ netInterfaces = NetworkInterface.getNetworkInterfaces();
+ while (netInterfaces.hasMoreElements()) {
+ NetworkInterface ni = netInterfaces.nextElement();
+ Enumeration<InetAddress> ips = ni.getInetAddresses();
+ while (ips.hasMoreElements()) {
+ InetAddress ip = ips.nextElement();
+ if (ip.isSiteLocalAddress()) {
+ hostaddress = ip.getHostAddress();
+ }
+ }
+ }
+ } catch (Throwable e) {
+ SolrException.log(log,
+ "Error while looking for a better host name than 127.0.0.1", e);
+ }
+ }
+ host = "http://" + hostaddress;
} else {
Matcher m = URL_PREFIX.matcher(host);
if (m.matches()) {
@@ -576,7 +593,10 @@ public final class ZkController {
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
- String leaderUrl = getLeader(cloudDesc);
+
+ // in this case, we want to wait for the leader as long as the leader might
+ // wait for a vote, at least
+ String leaderUrl = getLeader(cloudDesc, Integer.parseInt(leaderVoteWait) + 1000);
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
log.info("We are " + ourUrl + " and leader is " + leaderUrl);
@@ -628,7 +648,9 @@ public final class ZkController {
return shardId;
}
- private String getLeader(final CloudDescriptor cloudDesc) {
+ // timeoutms is the timeout for the first call to get the leader - there is then
+ // a longer wait to make sure that leader matches our local state
+ private String getLeader(final CloudDescriptor cloudDesc, int timeoutms) {
String collection = cloudDesc.getCollectionName();
String shardId = cloudDesc.getShardId();
@@ -637,7 +659,7 @@ public final class ZkController {
// cluster state node that won't be updated for a moment
String leaderUrl;
try {
- leaderUrl = getLeaderProps(collection, cloudDesc.getShardId())
+ leaderUrl = getLeaderProps(collection, cloudDesc.getShardId(), timeoutms)
.getCoreUrl();
// now wait until our currently cloud state contains the latest leader
@@ -655,7 +677,7 @@ public final class ZkController {
tries++;
clusterStateLeader = zkStateReader.getLeaderUrl(collection, shardId,
30000);
- leaderUrl = getLeaderProps(collection, cloudDesc.getShardId())
+ leaderUrl = getLeaderProps(collection, cloudDesc.getShardId(), timeoutms)
.getCoreUrl();
}
@@ -671,8 +693,8 @@ public final class ZkController {
* Get leader props directly from zk nodes.
*/
public ZkCoreNodeProps getLeaderProps(final String collection,
- final String slice) throws InterruptedException {
- return getLeaderProps(collection, slice, false);
+ final String slice, int timeoutms) throws InterruptedException {
+ return getLeaderProps(collection, slice, timeoutms, false);
}
/**
@@ -681,8 +703,8 @@ public final class ZkController {
* @return leader props
*/
public ZkCoreNodeProps getLeaderProps(final String collection,
- final String slice, boolean failImmediatelyOnExpiration) throws InterruptedException {
- int iterCount = 60;
+ final String slice, int timeoutms, boolean failImmediatelyOnExpiration) throws InterruptedException {
+ int iterCount = timeoutms / 1000;
Exception exp = null;
while (iterCount-- > 0) {
try {
@@ -699,10 +721,10 @@ public final class ZkController {
throw new RuntimeException("Session has expired - could not get leader props", exp);
}
exp = e;
- Thread.sleep(500);
+ Thread.sleep(1000);
} catch (Exception e) {
exp = e;
- Thread.sleep(500);
+ Thread.sleep(1000);
}
if (cc.isShutDown()) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "CoreContainer is shutdown");
@@ -780,6 +802,7 @@ public final class ZkController {
* Publish core state to overseer.
*/
public void publish(final CoreDescriptor cd, final String state, boolean updateLastState) throws KeeperException, InterruptedException {
+ log.info("publishing core={} state={}", cd.getName(), state);
//System.out.println(Thread.currentThread().getStackTrace()[3]);
Integer numShards = cd.getCloudDescriptor().getNumShards();
if (numShards == null) { //XXX sys prop hack
@@ -865,6 +888,10 @@ public final class ZkController {
try {
Map<String,Object> collectionProps = new HashMap<String,Object>();
+
+ // set defaults
+ collectionProps.put(DocCollection.DOC_ROUTER, "compositeId");
+
// TODO: if collection.configName isn't set, and there isn't already a conf in zk, just use that?
String defaultConfigName = System.getProperty(COLLECTION_PARAM_PREFIX+CONFIGNAME_PROP, collection);
@@ -879,8 +906,10 @@ public final class ZkController {
}
// if the config name wasn't passed in, use the default
- if (!collectionProps.containsKey(CONFIGNAME_PROP))
+ if (!collectionProps.containsKey(CONFIGNAME_PROP)) {
+ // TODO: getting the configName from the collectionPath should fail since we already know it doesn't exist?
getConfName(collection, collectionPath, collectionProps);
+ }
} else if(System.getProperty("bootstrap_confdir") != null) {
// if we are bootstrapping a collection, default the config for
@@ -904,7 +933,6 @@ public final class ZkController {
} else {
getConfName(collection, collectionPath, collectionProps);
}
-
ZkNodeProps zkProps = new ZkNodeProps(collectionProps);
zkClient.makePath(collectionPath, ZkStateReader.toJSON(zkProps), CreateMode.PERSISTENT, null, true);
@@ -996,7 +1024,7 @@ public final class ZkController {
}
throw new SolrException(ErrorCode.SERVER_ERROR,
- "Could not get shard_id for core: " + coreName);
+ "Could not get shard_id for core: " + coreName + " coreNodeName:" + shardZkNodeName);
}
public static void uploadToZK(SolrZkClient zkClient, File dir, String zkPath) throws IOException, KeeperException, InterruptedException {
@@ -1070,7 +1098,7 @@ public final class ZkController {
for (int i = 0; i < retries; i++) {
try {
// go straight to zk, not the cloud state - we must have current info
- leaderProps = getLeaderProps(collection, shard);
+ leaderProps = getLeaderProps(collection, shard, 30000);
break;
} catch (Exception e) {
SolrException.log(log, "There was a problem finding the leader in zk", e);
Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java Mon Dec 10 16:36:47 2012
@@ -116,7 +116,7 @@ public abstract class CachingDirectoryFa
while(val.refCnt != 0) {
wait(100);
- if (cnt++ >= 300) {
+ if (cnt++ >= 12000) {
log.error("Timeout waiting for all directory ref counts to be released");
break;
}
Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/CoreContainer.java Mon Dec 10 16:36:47 2012
@@ -28,12 +28,23 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.xml.transform.Transformer;
@@ -55,6 +66,7 @@ import org.apache.solr.common.SolrExcept
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.core.SolrXMLSerializer.SolrCoreXMLDef;
import org.apache.solr.core.SolrXMLSerializer.SolrXMLDef;
import org.apache.solr.handler.admin.CollectionsHandler;
@@ -66,7 +78,9 @@ import org.apache.solr.logging.LogWatche
import org.apache.solr.logging.jul.JulWatcher;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.update.SolrCoreState;
+import org.apache.solr.util.AdjustableSemaphore;
import org.apache.solr.util.DOMUtil;
+import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.FileUtils;
import org.apache.solr.util.SystemIdResolver;
import org.apache.zookeeper.KeeperException;
@@ -86,6 +100,7 @@ import org.xml.sax.InputSource;
public class CoreContainer
{
private static final String LEADER_VOTE_WAIT = "180000"; // 3 minutes
+ private static final int CORE_LOAD_THREADS = 3;
private static final String DEFAULT_HOST_CONTEXT = "solr";
private static final String DEFAULT_HOST_PORT = "8983";
private static final int DEFAULT_ZK_CLIENT_TIMEOUT = 15000;
@@ -143,8 +158,9 @@ public class CoreContainer
protected LogWatcher logging = null;
private String zkHost;
private Map<SolrCore,String> coreToOrigName = new ConcurrentHashMap<SolrCore,String>();
- private String leaderVoteWait;
+ private String leaderVoteWait = LEADER_VOTE_WAIT;
protected int swappableCacheSize = Integer.MAX_VALUE; // Use as a flag too, if swappableCacheSize set in solr.xml this will be changed
+ private int coreLoadThreads;
{
log.info("New CoreContainer " + System.identityHashCode(this));
@@ -382,13 +398,13 @@ public class CoreContainer
* @param cfgis the configuration file InputStream
*/
public void load(String dir, InputSource cfgis) {
-
+ ThreadPoolExecutor coreLoadExecutor = null;
if (null == dir) {
// don't rely on SolrResourceLoader(), determine explicitly first
dir = SolrResourceLoader.locateSolrHome();
}
log.info("Loading CoreContainer using Solr Home: '{}'", dir);
-
+
this.loader = new SolrResourceLoader(dir);
solrHome = loader.getInstanceDir();
@@ -401,89 +417,97 @@ public class CoreContainer
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "", e);
}
- // Since the cores var is now initialized to null, let's set it up right now.
+ // Since the cores var is now initialized to null, let's set it up right
+ // now.
cfg.substituteProperties();
-
+
allocateLazyCores(cfg);
-
+
// Initialize Logging
- if(cfg.getBool("solr/logging/@enabled",true)) {
+ if (cfg.getBool("solr/logging/@enabled", true)) {
String slf4jImpl = null;
String fname = cfg.get("solr/logging/watcher/@class", null);
try {
- slf4jImpl = StaticLoggerBinder.getSingleton().getLoggerFactoryClassStr();
- if(fname==null) {
- if( slf4jImpl.indexOf("Log4j") > 0) {
- log.warn("Log watching is not yet implemented for log4j" );
- }
- else if( slf4jImpl.indexOf("JDK") > 0) {
+ slf4jImpl = StaticLoggerBinder.getSingleton()
+ .getLoggerFactoryClassStr();
+ if (fname == null) {
+ if (slf4jImpl.indexOf("Log4j") > 0) {
+ log.warn("Log watching is not yet implemented for log4j");
+ } else if (slf4jImpl.indexOf("JDK") > 0) {
fname = "JUL";
}
}
- }
- catch(Throwable ex) {
- log.warn("Unable to read SLF4J version. LogWatcher will be disabled: "+ex);
+ } catch (Throwable ex) {
+ log.warn("Unable to read SLF4J version. LogWatcher will be disabled: "
+ + ex);
}
// Now load the framework
- if(fname!=null) {
- if("JUL".equalsIgnoreCase(fname)) {
+ if (fname != null) {
+ if ("JUL".equalsIgnoreCase(fname)) {
logging = new JulWatcher(slf4jImpl);
- }
- else {
+ } else {
try {
logging = loader.newInstance(fname, LogWatcher.class);
- }
- catch (Throwable e) {
+ } catch (Throwable e) {
log.warn("Unable to load LogWatcher", e);
}
}
- if( logging != null ) {
+ if (logging != null) {
ListenerConfig v = new ListenerConfig();
- v.size = cfg.getInt("solr/logging/watcher/@size",50);
- v.threshold = cfg.get("solr/logging/watcher/@threshold",null);
- if(v.size>0) {
+ v.size = cfg.getInt("solr/logging/watcher/@size", 50);
+ v.threshold = cfg.get("solr/logging/watcher/@threshold", null);
+ if (v.size > 0) {
log.info("Registering Log Listener");
logging.registerListener(v, this);
}
}
}
}
-
+
String dcoreName = cfg.get("solr/cores/@defaultCoreName", null);
- if(dcoreName != null && !dcoreName.isEmpty()) {
+ if (dcoreName != null && !dcoreName.isEmpty()) {
defaultCoreName = dcoreName;
}
persistent = cfg.getBool("solr/@persistent", false);
libDir = cfg.get("solr/@sharedLib", null);
- zkHost = cfg.get("solr/@zkHost" , null);
+ zkHost = cfg.get("solr/@zkHost", null);
+ coreLoadThreads = cfg.getInt("solr/@coreLoadThreads", CORE_LOAD_THREADS);
+
adminPath = cfg.get("solr/cores/@adminPath", null);
shareSchema = cfg.getBool("solr/cores/@shareSchema", DEFAULT_SHARE_SCHEMA);
- zkClientTimeout = cfg.getInt("solr/cores/@zkClientTimeout", DEFAULT_ZK_CLIENT_TIMEOUT);
-
+ zkClientTimeout = cfg.getInt("solr/cores/@zkClientTimeout",
+ DEFAULT_ZK_CLIENT_TIMEOUT);
+
hostPort = cfg.get("solr/cores/@hostPort", DEFAULT_HOST_PORT);
-
+
hostContext = cfg.get("solr/cores/@hostContext", DEFAULT_HOST_CONTEXT);
host = cfg.get("solr/cores/@host", null);
leaderVoteWait = cfg.get("solr/cores/@leaderVoteWait", LEADER_VOTE_WAIT);
-
- if(shareSchema){
- indexSchemaCache = new ConcurrentHashMap<String ,IndexSchema>();
+
+ if (shareSchema) {
+ indexSchemaCache = new ConcurrentHashMap<String,IndexSchema>();
}
- adminHandler = cfg.get("solr/cores/@adminHandler", null );
- managementPath = cfg.get("solr/cores/@managementPath", null );
+ adminHandler = cfg.get("solr/cores/@adminHandler", null);
+ managementPath = cfg.get("solr/cores/@managementPath", null);
- zkClientTimeout = Integer.parseInt(System.getProperty("zkClientTimeout", Integer.toString(zkClientTimeout)));
+ zkClientTimeout = Integer.parseInt(System.getProperty("zkClientTimeout",
+ Integer.toString(zkClientTimeout)));
initZooKeeper(zkHost, zkClientTimeout);
-
+
+ if (isZooKeeperAware() && coreLoadThreads <= 1) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "SolrCloud requires a value of at least 2 in solr.xml for coreLoadThreads");
+ }
+
if (libDir != null) {
File f = FileUtils.resolvePath(new File(dir), libDir);
- log.info( "loading shared library: "+f.getAbsolutePath() );
+ log.info("loading shared library: " + f.getAbsolutePath());
libLoader = SolrResourceLoader.createClassLoader(f, null);
}
-
+
if (adminPath != null) {
if (adminHandler == null) {
coreAdminHandler = new CoreAdminHandler(this);
@@ -491,91 +515,164 @@ public class CoreContainer
coreAdminHandler = this.createMultiCoreHandler(adminHandler);
}
}
-
+
collectionsHandler = new CollectionsHandler(this);
try {
- containerProperties = readProperties(cfg, ((NodeList) cfg.evaluate(DEFAULT_HOST_CONTEXT, XPathConstants.NODESET)).item(0));
+ containerProperties = readProperties(cfg, ((NodeList) cfg.evaluate(
+ DEFAULT_HOST_CONTEXT, XPathConstants.NODESET)).item(0));
} catch (Throwable e) {
- SolrException.log(log,null,e);
+ SolrException.log(log, null, e);
}
-
- NodeList nodes = (NodeList)cfg.evaluate("solr/cores/core", XPathConstants.NODESET);
-
- for (int i=0; i<nodes.getLength(); i++) {
- Node node = nodes.item(i);
- SolrCore core = null;
- try {
- String rawName = DOMUtil.getAttr(node, CORE_NAME, null);
- if (null == rawName) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Each core in solr.xml must have a 'name'");
- }
- String name = rawName;
- CoreDescriptor p = new CoreDescriptor(this, name, DOMUtil.getAttr(node, CORE_INSTDIR, null));
-
- // deal with optional settings
- String opt = DOMUtil.getAttr(node, CORE_CONFIG, null);
-
- if (opt != null) {
- p.setConfigName(opt);
- }
- opt = DOMUtil.getAttr(node, CORE_SCHEMA, null);
- if (opt != null) {
- p.setSchemaName(opt);
- }
-
- if (zkController != null) {
- opt = DOMUtil.getAttr(node, CORE_SHARD, null);
- if (opt != null && opt.length() > 0) {
- p.getCloudDescriptor().setShardId(opt);
+
+ NodeList nodes = (NodeList) cfg.evaluate("solr/cores/core",
+ XPathConstants.NODESET);
+
+ // setup executor to load cores in parallel
+ coreLoadExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new DefaultSolrThreadFactory("coreLoadExecutor"));
+ try {
+ // 4 threads at a time max
+ final AdjustableSemaphore semaphore = new AdjustableSemaphore(
+ coreLoadThreads);
+
+ CompletionService<SolrCore> completionService = new ExecutorCompletionService<SolrCore>(
+ coreLoadExecutor);
+ Set<Future<SolrCore>> pending = new HashSet<Future<SolrCore>>();
+
+ for (int i = 0; i < nodes.getLength(); i++) {
+ Node node = nodes.item(i);
+ try {
+ String rawName = DOMUtil.getAttr(node, CORE_NAME, null);
+ if (null == rawName) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Each core in solr.xml must have a 'name'");
+ }
+ final String name = rawName;
+ final CoreDescriptor p = new CoreDescriptor(this, name,
+ DOMUtil.getAttr(node, CORE_INSTDIR, null));
+
+ // deal with optional settings
+ String opt = DOMUtil.getAttr(node, CORE_CONFIG, null);
+
+ if (opt != null) {
+ p.setConfigName(opt);
}
- opt = DOMUtil.getAttr(node, CORE_COLLECTION, null);
+ opt = DOMUtil.getAttr(node, CORE_SCHEMA, null);
if (opt != null) {
- p.getCloudDescriptor().setCollectionName(opt);
+ p.setSchemaName(opt);
}
- opt = DOMUtil.getAttr(node, CORE_ROLES, null);
- if(opt != null){
- p.getCloudDescriptor().setRoles(opt);
+
+ if (zkController != null) {
+ opt = DOMUtil.getAttr(node, CORE_SHARD, null);
+ if (opt != null && opt.length() > 0) {
+ p.getCloudDescriptor().setShardId(opt);
+ }
+ opt = DOMUtil.getAttr(node, CORE_COLLECTION, null);
+ if (opt != null) {
+ p.getCloudDescriptor().setCollectionName(opt);
+ }
+ opt = DOMUtil.getAttr(node, CORE_ROLES, null);
+ if (opt != null) {
+ p.getCloudDescriptor().setRoles(opt);
+ }
}
- }
- opt = DOMUtil.getAttr(node, CORE_PROPERTIES, null);
- if (opt != null) {
- p.setPropertiesName(opt);
- }
- opt = DOMUtil.getAttr(node, CORE_DATADIR, null);
- if (opt != null) {
- p.setDataDir(opt);
- }
-
- p.setCoreProperties(readProperties(cfg, node));
-
- opt = DOMUtil.getAttr(node, CORE_LOADONSTARTUP, null);
- if (opt != null) {
- p.setLoadOnStartup(("true".equalsIgnoreCase(opt) || "on".equalsIgnoreCase(opt)) ? true : false);
- }
-
- opt = DOMUtil.getAttr(node, CORE_SWAPPABLE, null);
- if (opt != null) {
- p.setSwappable(("true".equalsIgnoreCase(opt) || "on".equalsIgnoreCase(opt)) ? true : false);
- }
-
- if (! p.isSwappable() && p.isLoadOnStartup()) { // Just like current case.
- core = create(p);
- register(name, core, false);
- // track original names
- coreToOrigName.put(core, rawName);
- } else {
- // Store it away for later use. includes non-swappable but not loaded at startup cores.
- dynamicDescriptors.put(rawName, p);
+ opt = DOMUtil.getAttr(node, CORE_PROPERTIES, null);
+ if (opt != null) {
+ p.setPropertiesName(opt);
+ }
+ opt = DOMUtil.getAttr(node, CORE_DATADIR, null);
+ if (opt != null) {
+ p.setDataDir(opt);
+ }
+
+ p.setCoreProperties(readProperties(cfg, node));
+
+ opt = DOMUtil.getAttr(node, CORE_LOADONSTARTUP, null);
+ if (opt != null) {
+ p.setLoadOnStartup(("true".equalsIgnoreCase(opt) || "on"
+ .equalsIgnoreCase(opt)) ? true : false);
+ }
+
+ opt = DOMUtil.getAttr(node, CORE_SWAPPABLE, null);
+ if (opt != null) {
+ p.setSwappable(("true".equalsIgnoreCase(opt) || "on"
+ .equalsIgnoreCase(opt)) ? true : false);
+ }
+
+ if (!p.isSwappable() && p.isLoadOnStartup()) { // Just like current
+ // case.
+ Callable<SolrCore> task = new Callable<SolrCore>() {
+ public SolrCore call() {
+ SolrCore c = null;
+ try {
+ c = create(p);
+ register(name, c, false);
+ } catch (Throwable t) {
+ SolrException.log(log, null, t);
+ if (c != null) {
+ c.close();
+ }
+ }
+ semaphore.release();
+
+ return c;
+ }
+ };
+
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Interrupted while loading SolrCore(s)", e);
+ }
+
+ try {
+ pending.add(completionService.submit(task));
+ } catch (RejectedExecutionException e) {
+ semaphore.release();
+ throw e;
+ }
+
+ } else {
+ // Store it away for later use. includes non-swappable but not
+ // loaded at startup cores.
+ dynamicDescriptors.put(rawName, p);
+ }
+ } catch (Throwable ex) {
+ SolrException.log(log, null, ex);
}
}
- catch (Throwable ex) {
- SolrException.log(log,null,ex);
- if (core != null) {
- core.close();
+
+ while (pending != null && pending.size() > 0) {
+ try {
+ Future<SolrCore> future = completionService.take();
+ if (future == null) return;
+ pending.remove(future);
+
+ try {
+ SolrCore c = future.get();
+ // track original names
+ if (c != null) {
+ coreToOrigName.put(c, c.getName());
+ }
+ } catch (ExecutionException e) {
+ // shouldn't happen since we catch exceptions ourselves
+ SolrException.log(SolrCore.log,
+ "error sending update request to shard", e);
+ }
+
+ } catch (InterruptedException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+ "interrupted waiting for shard update response", e);
}
}
+ } finally {
+ if (coreLoadExecutor != null) {
+ ExecutorUtil.shutdownNowAndAwaitTermination(coreLoadExecutor);
+ }
}
}
@@ -1276,7 +1373,7 @@ public class CoreContainer
Integer.toString(DEFAULT_ZK_CLIENT_TIMEOUT));
addCoresAttrib(coresAttribs, "hostContext", this.hostContext, DEFAULT_HOST_CONTEXT);
addCoresAttrib(coresAttribs, "leaderVoteWait", this.leaderVoteWait, LEADER_VOTE_WAIT);
-
+ addCoresAttrib(coresAttribs, "coreLoadThreads", Integer.toString(this.coreLoadThreads), Integer.toString(CORE_LOAD_THREADS));
List<SolrCoreXMLDef> solrCoreXMLDefs = new ArrayList<SolrCoreXMLDef>();
Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java Mon Dec 10 16:36:47 2012
@@ -158,7 +158,7 @@ public abstract class DirectoryFactory i
public static long sizeOf(Directory directory, String file) throws IOException {
if (!directory.fileExists(file)) {
- throw new IllegalArgumentException(file + " does not exist");
+ return 0;
}
return directory.fileLength(file);
Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/NRTCachingDirectoryFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/NRTCachingDirectoryFactory.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/NRTCachingDirectoryFactory.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/NRTCachingDirectoryFactory.java Mon Dec 10 16:36:47 2012
@@ -23,15 +23,32 @@ import java.io.IOException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.NRTCachingDirectory;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
/**
* Factory to instantiate {@link org.apache.lucene.store.NRTCachingDirectory}
*/
public class NRTCachingDirectoryFactory extends StandardDirectoryFactory {
+ private double maxMergeSizeMB;
+ private double maxCachedMB;
+
+ @Override
+ public void init(NamedList args) {
+ SolrParams params = SolrParams.toSolrParams(args);
+ maxMergeSizeMB = params.getDouble("maxMergeSizeMB", 4);
+ if (maxMergeSizeMB <= 0){
+ throw new IllegalArgumentException("maxMergeSizeMB must be greater than 0");
+ }
+ maxCachedMB = params.getDouble("maxCachedMB", 48);
+ if (maxCachedMB <= 0){
+ throw new IllegalArgumentException("maxCachedMB must be greater than 0");
+ }
+ }
@Override
protected Directory create(String path) throws IOException {
- return new NRTCachingDirectory(FSDirectory.open(new File(path)), 4, 48);
+ return new NRTCachingDirectory(FSDirectory.open(new File(path)), maxMergeSizeMB, maxCachedMB);
}
}
Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/SolrCore.java Mon Dec 10 16:36:47 2012
@@ -17,6 +17,40 @@
package org.apache.solr.core;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Writer;
+import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.xml.parsers.ParserConfigurationException;
+
import org.apache.commons.io.IOUtils;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.DirectoryReader;
@@ -67,16 +101,17 @@ import org.apache.solr.search.ValueSourc
import org.apache.solr.update.DefaultSolrCoreState;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.SolrCoreState;
+import org.apache.solr.update.SolrCoreState.IndexWriterCloser;
import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.update.UpdateHandler;
import org.apache.solr.update.VersionInfo;
-import org.apache.solr.update.SolrCoreState.IndexWriterCloser;
import org.apache.solr.update.processor.DistributedUpdateProcessorFactory;
import org.apache.solr.update.processor.LogUpdateProcessorFactory;
import org.apache.solr.update.processor.RunUpdateProcessorFactory;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.PropertiesInputStream;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.plugin.NamedListInitializedPlugin;
import org.apache.solr.util.plugin.PluginInfoInitialized;
@@ -85,41 +120,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
-import javax.xml.parsers.ParserConfigurationException;
-
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Writer;
-import java.lang.reflect.Constructor;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.StringTokenizer;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-
/**
*
@@ -235,30 +235,11 @@ public final class SolrCore implements S
Properties p = new Properties();
Directory dir = null;
try {
- dir = getDirectoryFactory().get(getDataDir(), null);
+ dir = getDirectoryFactory().get(getDataDir(), getSolrConfig().indexConfig.lockType);
if (dir.fileExists("index.properties")){
final IndexInput input = dir.openInput("index.properties", IOContext.DEFAULT);
- final InputStream is = new InputStream() {
-
- @Override
- public int read() throws IOException {
- byte next;
- try {
- next = input.readByte();
- } catch (EOFException e) {
- return -1;
- }
- return next;
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- input.close();
- }
- };
-
+ final InputStream is = new PropertiesInputStream(input);
try {
p.load(is);
Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/MoreLikeThisHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/MoreLikeThisHandler.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/MoreLikeThisHandler.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/MoreLikeThisHandler.java Mon Dec 10 16:36:47 2012
@@ -29,11 +29,9 @@ import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
-import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.StoredDocument;
import org.apache.lucene.index.Term;
-import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.search.*;
import org.apache.lucene.queries.mlt.MoreLikeThis;
import org.apache.solr.common.SolrException;
@@ -108,7 +106,7 @@ public class MoreLikeThisHandler extends
}
}
}
- } catch (ParseException e) {
+ } catch (SyntaxError e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
}
@@ -300,8 +298,10 @@ public class MoreLikeThisHandler extends
mlt.setAnalyzer( searcher.getSchema().getAnalyzer() );
// configurable params
+
mlt.setMinTermFreq( params.getInt(MoreLikeThisParams.MIN_TERM_FREQ, MoreLikeThis.DEFAULT_MIN_TERM_FREQ));
mlt.setMinDocFreq( params.getInt(MoreLikeThisParams.MIN_DOC_FREQ, MoreLikeThis.DEFAULT_MIN_DOC_FREQ));
+ mlt.setMaxDocFreq( params.getInt(MoreLikeThisParams.MAX_DOC_FREQ, MoreLikeThis.DEFAULT_MAX_DOC_FREQ));
mlt.setMinWordLen( params.getInt(MoreLikeThisParams.MIN_WORD_LEN, MoreLikeThis.DEFAULT_MIN_WORD_LENGTH));
mlt.setMaxWordLen( params.getInt(MoreLikeThisParams.MAX_WORD_LEN, MoreLikeThis.DEFAULT_MAX_WORD_LENGTH));
mlt.setMaxQueryTerms( params.getInt(MoreLikeThisParams.MAX_QUERY_TERMS, MoreLikeThis.DEFAULT_MAX_QUERY_TERMS));
Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java Mon Dec 10 16:36:47 2012
@@ -361,7 +361,7 @@ public class ReplicationHandler extends
// use a set to workaround possible Lucene bug which returns same file
// name multiple times
Collection<String> files = new HashSet<String>(commit.getFileNames());
- dir = core.getDirectoryFactory().get(core.getNewIndexDir(), null);
+ dir = core.getDirectoryFactory().get(core.getNewIndexDir(), core.getSolrConfig().indexConfig.lockType);
try {
for (String fileName : files) {
@@ -467,7 +467,7 @@ public class ReplicationHandler extends
Directory dir;
long size = 0;
try {
- dir = core.getDirectoryFactory().get(core.getIndexDir(), null);
+ dir = core.getDirectoryFactory().get(core.getNewIndexDir(), core.getSolrConfig().indexConfig.lockType);
try {
size = DirectoryFactory.sizeOfDirectory(dir);
} finally {
@@ -1062,7 +1062,7 @@ public class ReplicationHandler extends
while (true) {
offset = offset == -1 ? 0 : offset;
int read = (int) Math.min(buf.length, filelen - offset);
- in.readBytes(buf, offset == -1 ? 0 : (int) offset, read);
+ in.readBytes(buf, 0, read);
fos.writeInt((int) read);
if (useChecksum) {
@@ -1082,6 +1082,8 @@ public class ReplicationHandler extends
fos.close();
break;
}
+ offset += read;
+ in.seek(offset);
}
} catch (IOException e) {
LOG.warn("Exception while writing response for params: " + params, e);
Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java Mon Dec 10 16:36:47 2012
@@ -22,7 +22,6 @@ import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.core.TimerContext;
import com.yammer.metrics.stats.Snapshot;
-import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
@@ -32,6 +31,7 @@ import org.apache.solr.core.SolrInfoMBea
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.search.SyntaxError;
import org.apache.solr.util.SolrPluginUtils;
import java.net.URL;
@@ -163,7 +163,7 @@ public abstract class RequestHandlerBase
}
} else {
SolrException.log(SolrCore.log,e);
- if (e instanceof ParseException) {
+ if (e instanceof SyntaxError) {
e = new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
}
}
Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/SnapPuller.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/SnapPuller.java Mon Dec 10 16:36:47 2012
@@ -36,10 +36,7 @@ import static org.apache.solr.handler.Re
import static org.apache.solr.handler.ReplicationHandler.OFFSET;
import static org.apache.solr.handler.ReplicationHandler.SIZE;
-import java.io.EOFException;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -99,6 +96,8 @@ import org.apache.solr.search.SolrIndexS
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.FileUtils;
+import org.apache.solr.util.PropertiesInputStream;
+import org.apache.solr.util.PropertiesOutputStream;
import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,6 +110,8 @@ import org.slf4j.LoggerFactory;
* @since solr 1.4
*/
public class SnapPuller {
+ private static final String INDEX_PEROPERTIES = "index.peroperties";
+
private static final Logger LOG = LoggerFactory.getLogger(SnapPuller.class.getName());
private final String masterUrl;
@@ -296,6 +297,7 @@ public class SnapPuller {
successfulInstall = false;
replicationStartTime = System.currentTimeMillis();
Directory tmpIndexDir = null;
+ String tmpIndex = null;
Directory indexDir = null;
boolean deleteTmpIdxDir = true;
try {
@@ -368,12 +370,12 @@ public class SnapPuller {
boolean isFullCopyNeeded = IndexDeletionPolicyWrapper.getCommitTimestamp(commit) >= latestVersion || forceReplication;
String tmpIdxDirName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
- String tmpIndex = createTempindexDir(core, tmpIdxDirName);
+ tmpIndex = createTempindexDir(core, tmpIdxDirName);
- tmpIndexDir = core.getDirectoryFactory().get(tmpIndex, null);
+ tmpIndexDir = core.getDirectoryFactory().get(tmpIndex, core.getSolrConfig().indexConfig.lockType);
// make sure it's the newest known index dir...
- indexDir = core.getDirectoryFactory().get(core.getNewIndexDir(), null);
+ indexDir = core.getDirectoryFactory().get(core.getNewIndexDir(), core.getSolrConfig().indexConfig.lockType);
Directory oldDirectory = null;
try {
@@ -466,7 +468,9 @@ public class SnapPuller {
} finally {
if (deleteTmpIdxDir) {
LOG.info("removing temporary index download directory files " + tmpIndexDir);
- DirectoryFactory.empty(tmpIndexDir);
+ if (tmpIndex != null && core.getDirectoryFactory().exists(tmpIndex)) {
+ DirectoryFactory.empty(tmpIndexDir);
+ }
}
}
} finally {
@@ -519,9 +523,9 @@ public class SnapPuller {
/**
* Helper method to record the last replication's details so that we can show them on the statistics page across
* restarts.
+ * @throws IOException on IO error
*/
- private void logReplicationTimeAndConfFiles(Collection<Map<String, Object>> modifiedConfFiles, boolean successfulInstall) {
- FileOutputStream outFile = null;
+ private void logReplicationTimeAndConfFiles(Collection<Map<String, Object>> modifiedConfFiles, boolean successfulInstall) throws IOException {
List<String> confFiles = new ArrayList<String>();
if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty())
for (Map<String, Object> map1 : modifiedConfFiles)
@@ -530,7 +534,10 @@ public class SnapPuller {
Properties props = replicationHandler.loadReplicationProperties();
long replicationTime = System.currentTimeMillis();
long replicationTimeTaken = (replicationTime - getReplicationStartTime()) / 1000;
+ Directory dir = null;
try {
+ dir = solrCore.getDirectoryFactory().get(solrCore.getDataDir(), solrCore.getSolrConfig().indexConfig.lockType);
+
int indexCount = 1, confFilesCount = 1;
if (props.containsKey(TIMES_INDEX_REPLICATED)) {
indexCount = Integer.valueOf(props.getProperty(TIMES_INDEX_REPLICATED)) + 1;
@@ -560,15 +567,21 @@ public class SnapPuller {
sb = readToStringBuffer(replicationTime, props.getProperty(REPLICATION_FAILED_AT_LIST));
props.setProperty(REPLICATION_FAILED_AT_LIST, sb.toString());
}
- File f = new File(solrCore.getDataDir(), REPLICATION_PROPERTIES);
- outFile = new FileOutputStream(f);
- props.store(outFile, "Replication details");
- outFile.close();
+
+ final IndexOutput out = dir.createOutput(REPLICATION_PROPERTIES, IOContext.DEFAULT);
+ OutputStream outFile = new PropertiesOutputStream(out);
+ try {
+ props.store(outFile, "Replication details");
+ dir.sync(Collections.singleton(REPLICATION_PROPERTIES));
+ } finally {
+ IOUtils.closeQuietly(outFile);
+ }
} catch (Exception e) {
LOG.warn("Exception while updating statistics", e);
- }
- finally {
- IOUtils.closeQuietly(outFile);
+ } finally {
+ if (dir != null) {
+ solrCore.getDirectoryFactory().release(dir);
+ }
}
}
@@ -706,7 +719,7 @@ public class SnapPuller {
String indexDir = solrCore.getIndexDir();
// it's okay to use null for lock factory since we know this dir will exist
- Directory dir = solrCore.getDirectoryFactory().get(indexDir, null);
+ Directory dir = solrCore.getDirectoryFactory().get(indexDir, solrCore.getSolrConfig().indexConfig.lockType);
try {
for (Map<String,Object> file : filesToDownload) {
if (!dir.fileExists((String) file.get(NAME)) || downloadCompleteIndex) {
@@ -829,30 +842,11 @@ public class SnapPuller {
Properties p = new Properties();
Directory dir = null;
try {
- dir = solrCore.getDirectoryFactory().get(solrCore.getDataDir(), null);
+ dir = solrCore.getDirectoryFactory().get(solrCore.getDataDir(), solrCore.getSolrConfig().indexConfig.lockType);
if (dir.fileExists("index.properties")){
final IndexInput input = dir.openInput("index.properties", IOContext.DEFAULT);
- final InputStream is = new InputStream() {
-
- @Override
- public int read() throws IOException {
- byte next;
- try {
- next = input.readByte();
- } catch (EOFException e) {
- return -1;
- }
- return next;
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- input.close();
- }
- };
-
+ final InputStream is = new PropertiesInputStream(input);
try {
p.load(is);
} catch (Exception e) {
@@ -870,27 +864,16 @@ public class SnapPuller {
p.put("index", tmpIdxDirName);
OutputStream os = null;
try {
- os = new OutputStream() {
-
- @Override
- public void write(int b) throws IOException {
- out.writeByte((byte) b);
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- out.close();
- }
- };
+ os = new PropertiesOutputStream(out);
p.store(os, "index properties");
+ dir.sync(Collections.singleton(INDEX_PEROPERTIES));
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to write index.properties", e);
} finally {
IOUtils.closeQuietly(os);
}
- return true;
+ return true;
} catch (IOException e1) {
throw new RuntimeException(e1);
Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/SnapShooter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/SnapShooter.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/SnapShooter.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/SnapShooter.java Mon Dec 10 16:36:47 2012
@@ -101,7 +101,7 @@ public class SnapShooter {
Collection<String> files = indexCommit.getFileNames();
FileCopier fileCopier = new FileCopier();
- Directory dir = solrCore.getDirectoryFactory().get(solrCore.getIndexDir(), null);
+ Directory dir = solrCore.getDirectoryFactory().get(solrCore.getIndexDir(), solrCore.getSolrConfig().indexConfig.lockType);
try {
fileCopier.copyFiles(dir, files, snapShotDir);
} finally {