You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by us...@apache.org on 2012/01/29 13:18:55 UTC
svn commit: r1237259 [3/4] - in /lucene/dev/branches/lucene2858: ./
dev-tools/idea/lucene/contrib/ dev-tools/maven/solr/solrj/ lucene/
lucene/contrib/
lucene/contrib/sandbox/src/test/org/apache/lucene/sandbox/queries/regex/
lucene/src/java/org/apache/l...
Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Sun Jan 29 12:18:50 2012
@@ -118,7 +118,7 @@ final class ShardLeaderElectionContext e
}
// should I be leader?
if (weAreReplacement && !shouldIBeLeader(leaderProps)) {
- System.out.println("there is a better leader candidate it appears");
+ // System.out.println("there is a better leader candidate it appears");
rejoinLeaderElection(leaderSeqPath, core);
return;
}
@@ -209,14 +209,31 @@ final class OverseerElectionContext exte
private final ZkStateReader stateReader;
public OverseerElectionContext(final String zkNodeName, SolrZkClient zkClient, ZkStateReader stateReader) {
- super(zkNodeName, "/overseer_elect", null, null);
+ super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null);
this.zkClient = zkClient;
this.stateReader = stateReader;
}
@Override
void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement) throws KeeperException, InterruptedException {
- new Overseer(zkClient, stateReader);
+
+ final String id = leaderSeqPath.substring(leaderSeqPath.lastIndexOf("/")+1);
+ ZkNodeProps myProps = new ZkNodeProps("id", id);
+
+ try {
+ zkClient.makePath(leaderPath,
+ ZkStateReader.toJSON(myProps),
+ CreateMode.EPHEMERAL, true);
+ } catch (NodeExistsException e) {
+ // if a previous leader ephemeral still exists for some reason, try and
+ // remove it
+ zkClient.delete(leaderPath, -1, true);
+ zkClient.makePath(leaderPath,
+ ZkStateReader.toJSON(myProps),
+ CreateMode.EPHEMERAL, true);
+ }
+
+ new Overseer(zkClient, stateReader, id);
}
}
Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Sun Jan 29 12:18:50 2012
@@ -199,7 +199,6 @@ public class LeaderElector {
* watch the next lowest numbered node.
*
* @param context
- * @param SolrCore - optional - sometimes null
* @return sequential node number
* @throws KeeperException
* @throws InterruptedException
@@ -256,8 +255,7 @@ public class LeaderElector {
/**
* Set up any ZooKeeper nodes needed for leader election.
*
- * @param shardId
- * @param collection
+ * @param context
* @throws InterruptedException
* @throws KeeperException
*/
Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/Overseer.java Sun Jan 29 12:18:50 2012
@@ -22,9 +22,11 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.Set;
import org.apache.solr.cloud.NodeStateWatcher.NodeStateChangeListener;
@@ -51,13 +53,32 @@ import org.slf4j.LoggerFactory;
* Cluster leader. Responsible node assignments, cluster state file?
*/
public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
+
+ private static final int STATE_UPDATE_DELAY = 500; // delay between cloud state updates
+
+ static enum Op {
+ LeaderChange, StateChange;
+ }
+
+ private final class CloudStateUpdateRequest {
+
+ final Op operation;
+ final Object[] args;
+
+ CloudStateUpdateRequest(final Op operation, final Object... args) {
+ this.operation = operation;
+ this.args = args;
+ }
+ }
public static final String ASSIGNMENTS_NODE = "/node_assignments";
public static final String STATES_NODE = "/node_states";
private static Logger log = LoggerFactory.getLogger(Overseer.class);
private final SolrZkClient zkClient;
- private final ZkStateReader reader;
+
+ // pooled updates
+ private final LinkedBlockingQueue<CloudStateUpdateRequest> fifo = new LinkedBlockingQueue<CloudStateUpdateRequest>();
// node stateWatches
private HashMap<String,NodeStateWatcher> nodeStateWatches = new HashMap<String,NodeStateWatcher>();
@@ -66,12 +87,222 @@ public class Overseer implements NodeSta
private HashMap<String, HashMap<String,ShardLeaderWatcher>> shardLeaderWatches = new HashMap<String,HashMap<String,ShardLeaderWatcher>>();
private ZkCmdExecutor zkCmdExecutor;
- public Overseer(final SolrZkClient zkClient, final ZkStateReader reader) throws KeeperException, InterruptedException {
- log.info("Constructing new Overseer");
+ private static class CloudStateUpdater implements Runnable {
+
+ private final LinkedBlockingQueue<CloudStateUpdateRequest> fifo;
+ private final ZkStateReader reader;
+ private final SolrZkClient zkClient;
+ private final String myId;
+
+ public CloudStateUpdater(final LinkedBlockingQueue<CloudStateUpdateRequest> fifo, final ZkStateReader reader, final SolrZkClient zkClient, final String myId) {
+ this.fifo = fifo;
+ this.myId = myId;
+ this.reader = reader;
+ this.zkClient = zkClient;
+ }
+ @Override
+ public void run() {
+ while (amILeader()) {
+
+
+ LinkedList<CloudStateUpdateRequest> requests = new LinkedList<Overseer.CloudStateUpdateRequest>();
+ while (!fifo.isEmpty()) {
+ // collect all queued requests
+ CloudStateUpdateRequest req;
+ req = fifo.poll();
+ if (req == null) {
+ break;
+ }
+ requests.add(req);
+ }
+
+ if (requests.size() > 0) {
+ // process updates
+ synchronized (reader.getUpdateLock()) {
+ try {
+ reader.updateCloudState(true);
+ CloudState cloudState = reader.getCloudState();
+ for (CloudStateUpdateRequest request : requests) {
+
+ switch (request.operation) {
+ case LeaderChange:
+ cloudState = setShardLeader(cloudState,
+ (String) request.args[0], (String) request.args[1],
+ (String) request.args[2]);
+
+ break;
+ case StateChange:
+ cloudState = updateState(cloudState,
+ (String) request.args[0], (CoreState) request.args[1]);
+ break;
+
+ }
+ }
+
+ log.info("Announcing new cluster state");
+ zkClient.setData(ZkStateReader.CLUSTER_STATE,
+ ZkStateReader.toJSON(cloudState), true);
+
+ } catch (KeeperException e) {
+ // XXX stop processing, exit
+ return;
+ } catch (InterruptedException e) {
+ // XXX stop processing, exit
+ return;
+ }
+ }
+ }
+
+ try {
+ Thread.sleep(STATE_UPDATE_DELAY);
+ } catch (InterruptedException e) {
+ //
+ }
+ }
+ }
+
+ private boolean amILeader() {
+ try {
+ ZkNodeProps props = ZkNodeProps.load(zkClient.getData("/overseer_elect/leader", null, null, false));
+ if(myId.equals(props.get("id"))) {
+ return true;
+ }
+ } catch (KeeperException e) {
+ // assume we're dead
+ } catch (InterruptedException e) {
+ // assume we're dead
+ }
+ log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
+ return false;
+ }
+ /**
+ * Try to assign core to the cluster
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
+ String collection = coreState.getCollectionName();
+ String zkCoreNodeName = coreState.getCoreNodeName();
+
+ String shardId;
+ if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) {
+ shardId = AssignShard.assignShard(collection, state);
+ } else {
+ shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
+ }
+
+ Map<String,String> props = new HashMap<String,String>();
+ for (Entry<String,String> entry : coreState.getProperties().entrySet()) {
+ props.put(entry.getKey(), entry.getValue());
+ }
+ ZkNodeProps zkProps = new ZkNodeProps(props);
+ Slice slice = state.getSlice(collection, shardId);
+ Map<String,ZkNodeProps> shardProps;
+ if (slice == null) {
+ shardProps = new HashMap<String,ZkNodeProps>();
+ } else {
+ shardProps = state.getSlice(collection, shardId).getShardsCopy();
+ }
+ shardProps.put(zkCoreNodeName, zkProps);
+
+ slice = new Slice(shardId, shardProps);
+ CloudState newCloudState = updateSlice(state, collection, slice);
+ return newCloudState;
+ }
+
+ private CloudState updateSlice(CloudState state, String collection, Slice slice) {
+
+ final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
+ newStates.putAll(state.getCollectionStates());
+
+ if (!newStates.containsKey(collection)) {
+ newStates.put(collection, new LinkedHashMap<String,Slice>());
+ }
+
+ final Map<String, Slice> slices = newStates.get(collection);
+ if (!slices.containsKey(slice.getName())) {
+ slices.put(slice.getName(), slice);
+ } else {
+ final Map<String,ZkNodeProps> shards = new LinkedHashMap<String,ZkNodeProps>();
+ final Slice existingSlice = slices.get(slice.getName());
+ shards.putAll(existingSlice.getShards());
+ //XXX preserve existing leader
+ for(Entry<String, ZkNodeProps> edit: slice.getShards().entrySet()) {
+ if(existingSlice.getShards().get(edit.getKey())!=null && existingSlice.getShards().get(edit.getKey()).containsKey(ZkStateReader.LEADER_PROP)) {
+ HashMap<String, String> newProps = new HashMap<String,String>();
+ newProps.putAll(edit.getValue().getProperties());
+ newProps.put(ZkStateReader.LEADER_PROP, existingSlice.getShards().get(edit.getKey()).get(ZkStateReader.LEADER_PROP));
+ shards.put(edit.getKey(), new ZkNodeProps(newProps));
+ } else {
+ shards.put(edit.getKey(), edit.getValue());
+ }
+ }
+ final Slice updatedSlice = new Slice(slice.getName(), shards);
+ slices.put(slice.getName(), updatedSlice);
+ }
+ return new CloudState(state.getLiveNodes(), newStates);
+ }
+
+ private CloudState setShardLeader(CloudState state, String collection, String sliceName, String leaderUrl) {
+
+ boolean updated = false;
+ final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
+ newStates.putAll(state.getCollectionStates());
+
+ final Map<String, Slice> slices = newStates.get(collection);
+
+ if(slices==null) {
+ log.error("Could not mark shard leader for non existing collection.");
+ return state;
+ }
+
+ if (!slices.containsKey(sliceName)) {
+ log.error("Could not mark leader for non existing slice.");
+ return state;
+ } else {
+ final Map<String,ZkNodeProps> newShards = new LinkedHashMap<String,ZkNodeProps>();
+ for(Entry<String, ZkNodeProps> shard: slices.get(sliceName).getShards().entrySet()) {
+ Map<String, String> newShardProps = new LinkedHashMap<String,String>();
+ newShardProps.putAll(shard.getValue().getProperties());
+
+ String wasLeader = newShardProps.remove(ZkStateReader.LEADER_PROP); //clean any previously existed flag
+
+ ZkCoreNodeProps zkCoreNodeProps = new ZkCoreNodeProps(new ZkNodeProps(newShardProps));
+ if(leaderUrl!=null && leaderUrl.equals(zkCoreNodeProps.getCoreUrl())) {
+ newShardProps.put(ZkStateReader.LEADER_PROP,"true");
+ if (wasLeader == null) {
+ updated = true;
+ }
+ } else {
+ if (wasLeader != null) {
+ updated = true;
+ }
+ }
+ newShards.put(shard.getKey(), new ZkNodeProps(newShardProps));
+ }
+ Slice slice = new Slice(sliceName, newShards);
+ slices.put(sliceName, slice);
+ }
+ if (updated) {
+ return new CloudState(state.getLiveNodes(), newStates);
+ } else {
+ return state;
+ }
+ }
+
+ }
+
+ public Overseer(final SolrZkClient zkClient, final ZkStateReader reader, String id) throws KeeperException, InterruptedException {
+ log.info("Constructing new Overseer id=" + id);
this.zkClient = zkClient;
this.zkCmdExecutor = new ZkCmdExecutor();
- this.reader = reader;
createWatches();
+
+ //launch cluster state updater thread
+ ThreadGroup tg = new ThreadGroup("Overseer delayed state updater");
+ Thread updaterThread = new Thread(tg, new CloudStateUpdater(fifo, reader, zkClient, id));
+ updaterThread.setDaemon(true);
+ updaterThread.start();
}
public synchronized void createWatches()
@@ -267,41 +498,6 @@ public class Overseer implements NodeSta
}
}
- /**
- * Try to assign core to the cluster
- * @throws KeeperException
- * @throws InterruptedException
- */
- private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
- String collection = coreState.getCollectionName();
- String zkCoreNodeName = coreState.getCoreNodeName();
-
- String shardId;
- if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) {
- shardId = AssignShard.assignShard(collection, state);
- } else {
- shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
- }
-
- Map<String,String> props = new HashMap<String,String>();
- for (Entry<String,String> entry : coreState.getProperties().entrySet()) {
- props.put(entry.getKey(), entry.getValue());
- }
- ZkNodeProps zkProps = new ZkNodeProps(props);
- Slice slice = state.getSlice(collection, shardId);
- Map<String,ZkNodeProps> shardProps;
- if (slice == null) {
- shardProps = new HashMap<String,ZkNodeProps>();
- } else {
- shardProps = state.getSlice(collection, shardId).getShardsCopy();
- }
- shardProps.put(zkCoreNodeName, zkProps);
-
- slice = new Slice(shardId, shardProps);
- CloudState newCloudState = updateSlice(state, collection, slice);
- return newCloudState;
- }
-
private Set<String> complement(Collection<String> next,
Collection<String> prev) {
Set<String> downCollections = new HashSet<String>();
@@ -311,23 +507,11 @@ public class Overseer implements NodeSta
}
@Override
- public void coreChanged(final String nodeName, final Set<CoreState> states) throws KeeperException, InterruptedException {
- log.debug("Cores changed: " + nodeName + " states:" + states);
- synchronized(reader.getUpdateLock()) {
- reader.updateCloudState(true);
- CloudState cloudState = reader.getCloudState();
- for (CoreState state : states) {
- cloudState = updateState(cloudState, nodeName, state);
- }
-
- try {
- zkClient.setData(ZkStateReader.CLUSTER_STATE,
- ZkStateReader.toJSON(cloudState), true);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Interrupted while publishing new state", e);
- }
+ public void coreChanged(final String nodeName, final Set<CoreState> states)
+ throws KeeperException, InterruptedException {
+ log.info("Core change pooled: " + nodeName + " states:" + states);
+ for (CoreState state : states) {
+ fifo.add(new CloudStateUpdateRequest(Op.StateChange, nodeName, state));
}
}
@@ -340,111 +524,11 @@ public class Overseer implements NodeSta
ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();
zkCmdExecutor.ensureExists(node, zkClient);
}
-
- private CloudState updateSlice(CloudState state, String collection, Slice slice) {
-
- final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
- newStates.putAll(state.getCollectionStates());
-
- if (!newStates.containsKey(collection)) {
- newStates.put(collection, new LinkedHashMap<String,Slice>());
- }
-
- final Map<String, Slice> slices = newStates.get(collection);
- if (!slices.containsKey(slice.getName())) {
- slices.put(slice.getName(), slice);
- } else {
- final Map<String,ZkNodeProps> shards = new LinkedHashMap<String,ZkNodeProps>();
- final Slice existingSlice = slices.get(slice.getName());
- shards.putAll(existingSlice.getShards());
- //XXX preserve existing leader
- for(Entry<String, ZkNodeProps> edit: slice.getShards().entrySet()) {
- if(existingSlice.getShards().get(edit.getKey())!=null && existingSlice.getShards().get(edit.getKey()).containsKey(ZkStateReader.LEADER_PROP)) {
- HashMap<String, String> newProps = new HashMap<String,String>();
- newProps.putAll(edit.getValue().getProperties());
- newProps.put(ZkStateReader.LEADER_PROP, existingSlice.getShards().get(edit.getKey()).get(ZkStateReader.LEADER_PROP));
- shards.put(edit.getKey(), new ZkNodeProps(newProps));
- } else {
- shards.put(edit.getKey(), edit.getValue());
- }
- }
- final Slice updatedSlice = new Slice(slice.getName(), shards);
- slices.put(slice.getName(), updatedSlice);
- }
- return new CloudState(state.getLiveNodes(), newStates);
- }
-
- private CloudState setShardLeader(CloudState state, String collection, String sliceName, String leaderUrl) {
-
- boolean updated = false;
- final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
- newStates.putAll(state.getCollectionStates());
-
- final Map<String, Slice> slices = newStates.get(collection);
-
- if(slices==null) {
- log.error("Could not mark shard leader for non existing collection.");
- return state;
- }
-
- if (!slices.containsKey(sliceName)) {
- log.error("Could not mark leader for non existing slice.");
- return state;
- } else {
- final Map<String,ZkNodeProps> newShards = new LinkedHashMap<String,ZkNodeProps>();
- for(Entry<String, ZkNodeProps> shard: slices.get(sliceName).getShards().entrySet()) {
- Map<String, String> newShardProps = new LinkedHashMap<String,String>();
- newShardProps.putAll(shard.getValue().getProperties());
-
- String wasLeader = newShardProps.remove(ZkStateReader.LEADER_PROP); //clean any previously existed flag
-
- ZkCoreNodeProps zkCoreNodeProps = new ZkCoreNodeProps(new ZkNodeProps(newShardProps));
- if(leaderUrl!=null && leaderUrl.equals(zkCoreNodeProps.getCoreUrl())) {
- newShardProps.put(ZkStateReader.LEADER_PROP,"true");
- if (wasLeader == null) {
- updated = true;
- }
- } else {
- if (wasLeader != null) {
- updated = true;
- }
- }
- newShards.put(shard.getKey(), new ZkNodeProps(newShardProps));
- }
- Slice slice = new Slice(sliceName, newShards);
- slices.put(sliceName, slice);
- }
- if (updated) {
- return new CloudState(state.getLiveNodes(), newStates);
- } else {
- return state;
- }
- }
@Override
public void announceLeader(String collection, String shardId, ZkCoreNodeProps props) {
- synchronized (reader.getUpdateLock()) {
- try {
- reader.updateCloudState(true); // get fresh copy of the state
- final CloudState state = reader.getCloudState();
- final CloudState newState = setShardLeader(state, collection, shardId,
- props.getCoreUrl());
- if (state != newState) { // if same instance was returned no need to
- // update state
- log.info("Announcing new leader: coll: " + collection + " shard: " + shardId + " props:" + props);
- zkClient.setData(ZkStateReader.CLUSTER_STATE,
- ZkStateReader.toJSON(newState), true);
-
- } else {
- log.debug("State was not changed.");
- }
- } catch (KeeperException e) {
- log.warn("Could not announce new leader coll:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.warn("Could not promote new leader coll:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
- }
- }
+ log.info("Leader change pooled.");
+ fifo.add(new CloudStateUpdateRequest(Op.LeaderChange, collection, shardId, props.getCoreUrl()));
}
}
\ No newline at end of file
Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Sun Jan 29 12:18:50 2012
@@ -20,10 +20,8 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest.PrepRecovery;
@@ -38,10 +36,8 @@ import org.apache.solr.core.RequestHandl
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.request.SolrRequestHandler;
-import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateLog.RecoveryInfo;
-import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +63,7 @@ public class RecoveryStrategy extends Th
public RecoveryStrategy(SolrCore core) {
this.core = core;
this.coreName = core.getName();
-
+ setName("RecoveryThread");
zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
zkStateReader = zkController.getZkStateReader();
baseUrl = zkController.getBaseUrl();
Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java Sun Jan 29 12:18:50 2012
@@ -46,7 +46,8 @@ public class SyncStrategy {
public boolean sync(ZkController zkController, SolrCore core,
ZkNodeProps leaderProps) {
- zkController.publish(core, ZkStateReader.SYNC);
+ // TODO: look at our state usage of sync
+ // zkController.publish(core, ZkStateReader.SYNC);
// solrcloud_debug
// System.out.println("SYNC UP");
Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ZkController.java Sun Jan 29 12:18:50 2012
@@ -130,16 +130,15 @@ public final class ZkController {
zkServer.stop();
}
-
/**
- * @param coreContainer if null, recovery will not be enabled
+ * @param cc if null, recovery will not be enabled
* @param zkServerAddress
* @param zkClientTimeout
* @param zkClientConnectTimeout
* @param localHost
* @param locaHostPort
* @param localHostContext
- * @param numShards
+ * @param registerOnReconnect
* @throws InterruptedException
* @throws TimeoutException
* @throws IOException
@@ -437,13 +436,14 @@ public final class ZkController {
}
+
/**
* Register shard with ZooKeeper.
*
* @param coreName
- * @param cloudDesc
- * @return
- * @throws Exception
+ * @param desc
+ * @return the shardId for the SolrCore
+ * @throws Exception
*/
public String register(String coreName, final CoreDescriptor desc) throws Exception {
return register(coreName, desc, false);
@@ -456,7 +456,7 @@ public final class ZkController {
* @param coreName
* @param desc
* @param recoverReloadedCores
- * @return
+ * @return the shardId for the SolrCore
* @throws Exception
*/
public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores) throws Exception {
@@ -508,9 +508,33 @@ public final class ZkController {
try {
core = cc.getCore(desc.getName());
- boolean startRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
+ if (isLeader) {
+ // recover from local transaction log and wait for it to complete before
+ // going active
+ // TODO: should this be moved to another thread? To recoveryStrat?
+ // TODO: should this actually be done earlier, before (or as part of)
+ // leader election perhaps?
+ // TODO: ensure that a replica that is trying to recover waits until I'm
+ // active (or don't make me the
+ // leader until my local replay is done. But this replay is only needed
+ // on the leader - replicas
+ // will do recovery anyway
+
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+ if (!core.isReloaded() && ulog != null) {
+ Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
+ .getUpdateLog().recoverFromLog();
+ if (recoveryFuture != null) {
+ recoveryFuture.get(); // NOTE: this could potentially block for
+ // minutes or more!
+ // TODO: public as recovering in the mean time?
+ }
+ }
+ }
+
+ boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
collection, coreZkNodeName, shardId, leaderProps, core, cc);
- if (!startRecovery) {
+ if (!didRecovery) {
publishAsActive(baseUrl, desc, coreZkNodeName, coreName);
}
} finally {
@@ -539,6 +563,24 @@ public final class ZkController {
}
+ /**
+ * @param coreName
+ * @param desc
+ * @param recoverReloadedCores
+ * @param isLeader
+ * @param cloudDesc
+ * @param collection
+ * @param shardZkNodeName
+ * @param shardId
+ * @param leaderProps
+ * @param core
+ * @param cc
+ * @return whether or not a recovery was started
+ * @throws InterruptedException
+ * @throws KeeperException
+ * @throws IOException
+ * @throws ExecutionException
+ */
private boolean checkRecovery(String coreName, final CoreDescriptor desc,
boolean recoverReloadedCores, final boolean isLeader,
final CloudDescriptor cloudDesc, final String collection,
@@ -546,46 +588,18 @@ public final class ZkController {
SolrCore core, CoreContainer cc) throws InterruptedException,
KeeperException, IOException, ExecutionException {
-
boolean doRecovery = true;
-
-
- if (isLeader) {
- doRecovery = false;
-
- // recover from local transaction log and wait for it to complete before
- // going active
- // TODO: should this be moved to another thread? To recoveryStrat?
- // TODO: should this actually be done earlier, before (or as part of)
- // leader election perhaps?
- // TODO: ensure that a replica that is trying to recover waits until I'm
- // active (or don't make me the
- // leader until my local replay is done. But this replay is only needed
- // on the leader - replicas
- // will do recovery anyway
-
- UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- if (!core.isReloaded() && ulog != null) {
- Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
- .getUpdateLog().recoverFromLog();
- if (recoveryFuture != null) {
- recoveryFuture.get(); // NOTE: this could potentially block for
- // minutes or more!
- // TODO: public as recovering in the mean time?
- }
- }
- return false;
- } else {
+ if (!isLeader) {
if (core.isReloaded() && !recoverReloadedCores) {
doRecovery = false;
}
- }
-
- if (doRecovery && !SKIP_AUTO_RECOVERY) {
- log.info("Core needs to recover:" + core.getName());
- core.getUpdateHandler().getSolrCoreState().doRecovery(core);
- return true;
+
+ if (doRecovery && !SKIP_AUTO_RECOVERY) {
+ log.info("Core needs to recover:" + core.getName());
+ core.getUpdateHandler().getSolrCoreState().doRecovery(core);
+ return true;
+ }
}
return false;
Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/core/CoreContainer.java Sun Jan 29 12:18:50 2012
@@ -433,9 +433,9 @@ public class CoreContainer
try {
for (SolrCore core : cores.values()) {
try {
- if (!core.isClosed()) {
- core.close();
- }
+ core.close();
+ // make sure we wait for any recoveries to stop
+ core.getUpdateHandler().getSolrCoreState().cancelRecovery();
} catch (Throwable t) {
SolrException.log(log, "Error shutting down core", t);
}
@@ -491,6 +491,9 @@ public class CoreContainer
SolrCore old = null;
synchronized (cores) {
+ if (isShutDown) {
+ throw new IllegalStateException("This CoreContainer has been shutdown");
+ }
old = cores.put(name, core);
/*
* set both the name of the descriptor and the name of the
Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/core/SolrCore.java Sun Jan 29 12:18:50 2012
@@ -732,8 +732,17 @@ public final class SolrCore implements S
if (!searcherExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
log.error("Timeout waiting for searchExecutor to terminate");
}
+ } catch (InterruptedException e) {
+ searcherExecutor.shutdownNow();
+ try {
+ if (!searcherExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+ log.error("Timeout waiting for searchExecutor to terminate");
+ }
+ } catch (InterruptedException e2) {
+ SolrException.log(log, e2);
+ }
} catch (Exception e) {
- SolrException.log(log,e);
+ SolrException.log(log, e);
}
try {
// Since we waited for the searcherExecutor to shut down,
@@ -744,7 +753,7 @@ public final class SolrCore implements S
// then the searchExecutor will throw an exception when getSearcher()
// tries to use it, and the exception handling code should close it.
closeSearcher();
- } catch (Exception e) {
+ } catch (Throwable e) {
SolrException.log(log,e);
}
@@ -1053,14 +1062,13 @@ public final class SolrCore implements S
openSearcherLock.lock();
try {
- String newIndexDir = null;
+ String newIndexDir = getNewIndexDir();
File indexDirFile = null;
File newIndexDirFile = null;
// if it's not a normal near-realtime update, check that paths haven't changed.
if (!nrt) {
indexDirFile = new File(getIndexDir()).getCanonicalFile();
- newIndexDir = getNewIndexDir();
newIndexDirFile = new File(newIndexDir).getCanonicalFile();
}
Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/handler/SnapPuller.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/handler/SnapPuller.java Sun Jan 29 12:18:50 2012
@@ -288,7 +288,7 @@ public class SnapPuller {
return true;
}
- if (commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {
+ if (!force && commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {
//master and slave are already in sync just return
LOG.info("Slave in sync with master.");
successfulInstall = true;
Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/request/UnInvertedField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/request/UnInvertedField.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/request/UnInvertedField.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/request/UnInvertedField.java Sun Jan 29 12:18:50 2012
@@ -175,7 +175,7 @@ public class UnInvertedField extends Doc
final String prefix = TrieField.getMainValuePrefix(searcher.getSchema().getFieldType(field));
this.searcher = searcher;
try {
- uninvert(searcher.getIndexReader(), prefix == null ? null : new BytesRef(prefix));
+ uninvert(new SlowMultiReaderWrapper(searcher.getIndexReader()), prefix == null ? null : new BytesRef(prefix));
} catch (IllegalStateException ise) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, ise.getMessage());
}
@@ -227,7 +227,7 @@ public class UnInvertedField extends Doc
int startTerm = 0;
int endTerm = numTermsInField; // one past the end
- TermsEnum te = getOrdTermsEnum(searcher.getIndexReader());
+ TermsEnum te = getOrdTermsEnum(new SlowMultiReaderWrapper(searcher.getIndexReader()));
if (prefix != null && prefix.length() > 0) {
final BytesRef prefixBr = new BytesRef(prefix);
if (te.seekCeil(prefixBr, true) == TermsEnum.SeekStatus.END) {
@@ -497,7 +497,7 @@ public class UnInvertedField extends Doc
final int[] index = this.index;
final int[] counts = new int[numTermsInField];//keep track of the number of times we see each word in the field for all the documents in the docset
- TermsEnum te = getOrdTermsEnum(searcher.getIndexReader());
+ TermsEnum te = getOrdTermsEnum(new SlowMultiReaderWrapper(searcher.getIndexReader()));
boolean doNegative = false;
if (finfo.length == 0) {
Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java Sun Jan 29 12:18:50 2012
@@ -23,8 +23,10 @@ import java.util.*;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.util.Base64;
import org.apache.solr.common.util.FastWriter;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
@@ -130,20 +132,29 @@ public abstract class TextResponseWriter
else {
writeStr(name, f.stringValue(), true);
}
- } else if (val instanceof Integer) {
- writeInt(name, val.toString());
+ } else if (val instanceof Number) {
+ if (val instanceof Integer) {
+ writeInt(name, val.toString());
+ } else if (val instanceof Long) {
+ writeLong(name, val.toString());
+ } else if (val instanceof Float) {
+ // we pass the float instead of using toString() because
+ // it may need special formatting. same for double.
+ writeFloat(name, ((Float)val).floatValue());
+ } else if (val instanceof Double) {
+ writeDouble(name, ((Double)val).doubleValue());
+ } else if (val instanceof Short) {
+ writeInt(name, val.toString());
+ } else if (val instanceof Byte) {
+ writeInt(name, val.toString());
+ } else {
+ // default... for debugging only
+ writeStr(name, val.getClass().getName() + ':' + val.toString(), true);
+ }
} else if (val instanceof Boolean) {
writeBool(name, val.toString());
- } else if (val instanceof Long) {
- writeLong(name, val.toString());
} else if (val instanceof Date) {
writeDate(name,(Date)val);
- } else if (val instanceof Float) {
- // we pass the float instead of using toString() because
- // it may need special formatting. same for double.
- writeFloat(name, ((Float)val).floatValue());
- } else if (val instanceof Double) {
- writeDouble(name, ((Double)val).doubleValue());
} else if (val instanceof Document) {
SolrDocument doc = toSolrDocument( (Document)val );
DocTransformer transformer = returnFields.getTransformer();
@@ -181,6 +192,12 @@ public abstract class TextResponseWriter
writeArray(name,(Object[])val);
} else if (val instanceof Iterator) {
writeArray(name,(Iterator)val);
+ } else if (val instanceof byte[]) {
+ byte[] arr = (byte[])val;
+ writeByteArr(name, arr, 0, arr.length);
+ } else if (val instanceof BytesRef) {
+ BytesRef arr = (BytesRef)val;
+ writeByteArr(name, arr.bytes, arr.offset, arr.length);
} else {
// default... for debugging only
writeStr(name, val.getClass().getName() + ':' + val.toString(), true);
@@ -334,4 +351,7 @@ public abstract class TextResponseWriter
/** if this form of the method is called, val is the Solr ISO8601 based date format */
public abstract void writeDate(String name, String val) throws IOException;
+ public void writeByteArr(String name, byte[] buf, int offset, int len) throws IOException {
+ writeStr(name, Base64.byteArrayToBase64(buf, offset, len), false);
+ }
}
Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java Sun Jan 29 12:18:50 2012
@@ -21,11 +21,15 @@ import java.io.IOException;
import org.apache.lucene.index.IndexWriter;
import org.apache.solr.cloud.RecoveryStrategy;
+import org.apache.solr.common.SolrException;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.SolrCore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public final class DefaultSolrCoreState extends SolrCoreState {
-
+ public static Logger log = LoggerFactory.getLogger(DefaultSolrCoreState.class);
+
private final Object recoveryLock = new Object();
private int refCnt = 1;
private SolrIndexWriter indexWriter = null;
@@ -62,10 +66,14 @@ public final class DefaultSolrCoreState
synchronized (this) {
refCnt--;
if (refCnt == 0) {
- if (closer != null) {
- closer.closeWriter(indexWriter);
- } else if (indexWriter != null) {
- indexWriter.close();
+ try {
+ if (closer != null) {
+ closer.closeWriter(indexWriter);
+ } else if (indexWriter != null) {
+ indexWriter.close();
+ }
+ } catch (Throwable t) {
+ SolrException.log(log, t);
}
directoryFactory.close();
closed = true;
Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/TransactionLog.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/TransactionLog.java Sun Jan 29 12:18:50 2012
@@ -55,6 +55,8 @@ import java.util.concurrent.atomic.Atomi
*/
public class TransactionLog {
public static Logger log = LoggerFactory.getLogger(TransactionLog.class);
+ final boolean debug = log.isDebugEnabled();
+ final boolean trace = log.isTraceEnabled();
public final static String END_MESSAGE="SOLR_TLOG_END";
@@ -71,7 +73,6 @@ public class TransactionLog {
AtomicInteger refcount = new AtomicInteger(1);
Map<String,Integer> globalStringMap = new HashMap<String, Integer>();
List<String> globalStringList = new ArrayList<String>();
- final boolean debug = log.isDebugEnabled();
long snapshot_size;
int snapshot_numRecords;
@@ -156,6 +157,9 @@ public class TransactionLog {
addGlobalStrings(globalStrings);
}
} else {
+ if (start > 0) {
+ log.error("New transaction log already exists:" + tlogFile + " size=" + raf.length());
+ }
assert start==0;
if (start > 0) {
raf.setLength(0);
@@ -543,8 +547,8 @@ public class TransactionLog {
synchronized (TransactionLog.this) {
- if (debug) {
- log.debug("Reading log record. pos="+pos+" currentSize="+fos.size());
+ if (trace) {
+ log.trace("Reading log record. pos="+pos+" currentSize="+fos.size());
}
if (pos >= fos.size()) {
Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/UpdateLog.java Sun Jan 29 12:18:50 2012
@@ -48,6 +48,7 @@ import java.util.concurrent.*;
public class UpdateLog implements PluginInfoInitialized {
public static Logger log = LoggerFactory.getLogger(UpdateLog.class);
public boolean debug = log.isDebugEnabled();
+ public boolean trace = log.isTraceEnabled();
public enum SyncLevel { NONE, FLUSH, FSYNC }
@@ -141,6 +142,9 @@ public class UpdateLog implements Plugin
this.uhandler = uhandler;
if (dataDir.equals(lastDataDir)) {
+ if (debug) {
+ log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", next id=" + id, " this is a reopen... nothing else to do.");
+ }
// on a normal reopen, we currently shouldn't have to do anything
return;
}
@@ -150,6 +154,10 @@ public class UpdateLog implements Plugin
tlogFiles = getLogList(tlogDir);
id = getLastLogId() + 1; // add 1 since we will create a new log for the next update
+ if (debug) {
+ log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
+ }
+
TransactionLog oldLog = null;
for (String oldLogName : tlogFiles) {
File f = new File(tlogDir, oldLogName);
@@ -247,8 +255,8 @@ public class UpdateLog implements Plugin
map.put(cmd.getIndexedId(), ptr);
}
- if (debug) {
- log.debug("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+ if (trace) {
+ log.trace("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
}
}
}
@@ -274,8 +282,8 @@ public class UpdateLog implements Plugin
oldDeletes.put(br, ptr);
}
- if (debug) {
- log.debug("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+ if (trace) {
+ log.trace("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
}
}
}
@@ -312,8 +320,8 @@ public class UpdateLog implements Plugin
LogPtr ptr = new LogPtr(pos, cmd.getVersion());
- if (debug) {
- log.debug("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+ if (trace) {
+ log.trace("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
}
}
}
@@ -385,6 +393,7 @@ public class UpdateLog implements Plugin
public void preSoftCommit(CommitUpdateCommand cmd) {
debug = log.isDebugEnabled(); // refresh our view of debugging occasionally
+ trace = log.isTraceEnabled();
synchronized (this) {
@@ -562,7 +571,7 @@ public class UpdateLog implements Plugin
private void ensureLog() {
if (tlog == null) {
- String newLogName = String.format("%s.%019d", TLOG_NAME, id);
+ String newLogName = String.format(Locale.ENGLISH, "%s.%019d", TLOG_NAME, id);
try {
tlog = new TransactionLog(new File(tlogDir, newLogName), globalStrings);
} catch (IOException e) {
Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java Sun Jan 29 12:18:50 2012
@@ -102,7 +102,7 @@ public abstract class AbstractZkTestCase
private static void putConfig(SolrZkClient zkClient, final String name)
throws Exception {
zkClient.makePath("/configs/conf1/" + name, getFile("solr"
- + File.separator + "conf" + File.separator + name), false, false);
+ + File.separator + "conf" + File.separator + name), false, true);
}
@Override
Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java Sun Jan 29 12:18:50 2012
@@ -242,11 +242,7 @@ public class ChaosMonkey {
public JettySolrRunner getRandomJetty(String slice, boolean aggressivelyKillLeaders) throws KeeperException, InterruptedException {
- // get latest cloud state
- zkStateReader.updateCloudState(true);
-
- Slice theShards = zkStateReader.getCloudState().getSlices(collection)
- .get(slice);
+
int numRunning = 0;
int numRecovering = 0;
int numActive = 0;
@@ -254,6 +250,12 @@ public class ChaosMonkey {
for (CloudJettyRunner cloudJetty : shardToJetty.get(slice)) {
boolean running = true;
+ // get latest cloud state
+ zkStateReader.updateCloudState(true);
+
+ Slice theShards = zkStateReader.getCloudState().getSlices(collection)
+ .get(slice);
+
ZkNodeProps props = theShards.getShards().get(cloudJetty.coreNodeName);
if (props == null) {
throw new RuntimeException("shard name " + cloudJetty.coreNodeName + " not found in " + theShards.getShards().keySet());
Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java Sun Jan 29 12:18:50 2012
@@ -77,78 +77,88 @@ public class ChaosMonkeyNothingIsSafeTes
@Override
public void doTest() throws Exception {
-
- handle.clear();
- handle.put("QTime", SKIPVAL);
- handle.put("timestamp", SKIPVAL);
-
- // we cannot do delete by query
- // as it's not supported for recovery
- //del("*:*");
-
- List<StopableIndexingThread> threads = new ArrayList<StopableIndexingThread>();
- int threadCount = 1;
- int i = 0;
- for (i = 0; i < threadCount; i++) {
- StopableIndexingThread indexThread = new StopableIndexingThread(i * 50000, true);
- threads.add(indexThread);
- indexThread.start();
- }
-
- FullThrottleStopableIndexingThread ftIndexThread = new FullThrottleStopableIndexingThread(
- clients, i * 50000, true);
- threads.add(ftIndexThread);
- ftIndexThread.start();
-
- chaosMonkey.startTheMonkey(true, 1500);
+ boolean testsSuccesful = false;
try {
- Thread.sleep(atLeast(6000));
+ handle.clear();
+ handle.put("QTime", SKIPVAL);
+ handle.put("timestamp", SKIPVAL);
+
+ // we cannot do delete by query
+ // as it's not supported for recovery
+ // del("*:*");
+
+ List<StopableIndexingThread> threads = new ArrayList<StopableIndexingThread>();
+ int threadCount = 1;
+ int i = 0;
+ for (i = 0; i < threadCount; i++) {
+ StopableIndexingThread indexThread = new StopableIndexingThread(
+ i * 50000, true);
+ threads.add(indexThread);
+ indexThread.start();
+ }
+
+ FullThrottleStopableIndexingThread ftIndexThread = new FullThrottleStopableIndexingThread(
+ clients, i * 50000, true);
+ threads.add(ftIndexThread);
+ ftIndexThread.start();
+
+ chaosMonkey.startTheMonkey(true, 1500);
+ try {
+ Thread.sleep(atLeast(6000));
+ } finally {
+ chaosMonkey.stopTheMonkey();
+ }
+
+ for (StopableIndexingThread indexThread : threads) {
+ indexThread.safeStop();
+ }
+
+ // wait for stop...
+ for (StopableIndexingThread indexThread : threads) {
+ indexThread.join();
+ }
+
+ // fails will happen...
+ // for (StopableIndexingThread indexThread : threads) {
+ // assertEquals(0, indexThread.getFails());
+ // }
+
+ // try and wait for any replications and what not to finish...
+
+ Thread.sleep(2000);
+
+ // wait until there are no recoveries...
+ waitForThingsToLevelOut();
+
+ // make sure we again have leaders for each shard
+ for (int j = 1; j < sliceCount; j++) {
+ zkStateReader.getLeaderProps(DEFAULT_COLLECTION, "shard" + j, 10000);
+ }
+
+ commit();
+
+ // TODO: assert we didnt kill everyone
+
+ zkStateReader.updateCloudState(true);
+ assertTrue(zkStateReader.getCloudState().getLiveNodes().size() > 0);
+
+ checkShardConsistency(false, true);
+
+ // ensure we have added more than 0 docs
+ long cloudClientDocs = cloudClient.query(new SolrQuery("*:*"))
+ .getResults().getNumFound();
+
+ assertTrue(cloudClientDocs > 0);
+
+ if (VERBOSE) System.out.println("control docs:"
+ + controlClient.query(new SolrQuery("*:*")).getResults()
+ .getNumFound() + "\n\n");
+ testsSuccesful = true;
} finally {
- chaosMonkey.stopTheMonkey();
- }
-
- for (StopableIndexingThread indexThread : threads) {
- indexThread.safeStop();
- }
-
- // wait for stop...
- for (StopableIndexingThread indexThread : threads) {
- indexThread.join();
- }
-
-
- // fails will happen...
-// for (StopableIndexingThread indexThread : threads) {
-// assertEquals(0, indexThread.getFails());
-// }
-
- // try and wait for any replications and what not to finish...
-
- Thread.sleep(2000);
-
- // wait until there are no recoveries...
- waitForThingsToLevelOut();
-
- // make sure we again have leaders for each shard
- for (int j = 1; j < sliceCount; j++) {
- zkStateReader.getLeaderProps(DEFAULT_COLLECTION, "shard" + j, 10000);
+ if (!testsSuccesful) {
+ printLayout();
+ }
}
-
- commit();
-
- // TODO: assert we didnt kill everyone
-
- zkStateReader.updateCloudState(true);
- assertTrue(zkStateReader.getCloudState().getLiveNodes().size() > 0);
-
- checkShardConsistency(false, false);
-
- // ensure we have added more than 0 docs
- long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
-
- assertTrue(cloudClientDocs > 0);
-
- if (VERBOSE) System.out.println("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound() + "\n\n");
}
private void waitForThingsToLevelOut() throws KeeperException,
@@ -158,7 +168,11 @@ public class ChaosMonkeyNothingIsSafeTes
do {
waitForRecoveriesToFinish(VERBOSE);
- commit();
+ try {
+ commit();
+ } catch (Exception e) {
+ // we don't care if this commit fails on some nodes
+ }
updateMappingsFromZk(jettys, clients);
Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java Sun Jan 29 12:18:50 2012
@@ -30,7 +30,9 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
+@Ignore
public class ChaosMonkeySafeLeaderTest extends FullSolrCloudTest {
@BeforeClass
Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java Sun Jan 29 12:18:50 2012
@@ -45,7 +45,6 @@ import org.junit.Ignore;
/**
* Super basic testing, no shard restarting or anything.
*/
-@Ignore
public class FullSolrCloudDistribCmdsTest extends FullSolrCloudTest {
Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java Sun Jan 29 12:18:50 2012
@@ -51,6 +51,7 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
/**
*
@@ -64,6 +65,8 @@ public class FullSolrCloudTest extends A
protected static final String DEFAULT_COLLECTION = "collection1";
+ private boolean printLayoutOnTearDown = false;
+
String t1 = "a_t";
String i1 = "a_si";
String nint = "n_i";
@@ -223,13 +226,25 @@ public class FullSolrCloudTest extends A
System.clearProperty("collection");
controlClient = createNewSolrServer(controlJetty.getLocalPort());
- createJettys(numServers);
+ createJettys(numServers, true);
}
- private List<JettySolrRunner> createJettys(int numJettys) throws Exception,
- InterruptedException, TimeoutException, IOException, KeeperException,
- URISyntaxException {
+ private List<JettySolrRunner> createJettys(int numJettys) throws Exception {
+ return createJettys(numJettys, false);
+ }
+
+
+ /**
+ * @param numJettys
+ * @param checkCreatedVsState
+ * if true, make sure the number created (numJettys) matches the
+ * number in the cluster state - if you add more jetties this may not
+ * be the case
+ * @return
+ * @throws Exception
+ */
+ private List<JettySolrRunner> createJettys(int numJettys, boolean checkCreatedVsState) throws Exception {
List<JettySolrRunner> jettys = new ArrayList<JettySolrRunner>();
List<SolrServer> clients = new ArrayList<SolrServer>();
StringBuilder sb = new StringBuilder();
@@ -247,6 +262,28 @@ public class FullSolrCloudTest extends A
this.jettys.addAll(jettys);
this.clients.addAll(clients);
+ if (checkCreatedVsState) {
+ // now wait until we see that the number of shards in the cluster state
+ // matches what we expect
+ int numShards = getNumShards(DEFAULT_COLLECTION);
+ int retries = 0;
+ while (numShards != shardCount) {
+ numShards = getNumShards(DEFAULT_COLLECTION);
+ if (numShards == shardCount) break;
+ if (retries++ == 20) {
+ printLayoutOnTearDown = true;
+ fail("Shards in the state does not match what we set:" + numShards
+ + " vs " + shardCount);
+ }
+ Thread.sleep(500);
+ }
+
+ // also make sure we have a leader for each shard
+ for (int i = 1; i <= sliceCount; i++) {
+ zkStateReader.getLeaderProps(DEFAULT_COLLECTION, "shard" + i, 10000);
+ }
+ }
+
updateMappingsFromZk(this.jettys, this.clients);
// build the shard string
@@ -261,6 +298,16 @@ public class FullSolrCloudTest extends A
return jettys;
}
+
+ private int getNumShards(String defaultCollection) {
+ Map<String,Slice> slices = this.zkStateReader.getCloudState().getSlices(defaultCollection);
+ int cnt = 0;
+ for (Map.Entry<String,Slice> entry : slices.entrySet()) {
+ cnt += entry.getValue().getShards().size();
+ }
+
+ return cnt;
+ }
public JettySolrRunner createJetty(String dataDir, String shardList,
String solrConfigOverride) throws Exception {
@@ -467,71 +514,81 @@ public class FullSolrCloudTest extends A
*/
@Override
public void doTest() throws Exception {
- handle.clear();
- handle.put("QTime", SKIPVAL);
- handle.put("timestamp", SKIPVAL);
-
- indexr(id, 1, i1, 100, tlong, 100, t1, "now is the time for all good men",
- "foo_f", 1.414f, "foo_b", "true", "foo_d", 1.414d);
-
- // make sure we are in a steady state...
- waitForRecoveriesToFinish(false);
-
- commit();
-
- assertDocCounts(false);
-
- indexAbunchOfDocs();
-
- commit();
-
- assertDocCounts(VERBOSE);
- checkQueries();
-
- assertDocCounts(VERBOSE);
-
- query("q", "*:*", "sort", "n_tl1 desc");
-
- brindDownShardIndexSomeDocsAndRecover();
-
- query("q", "*:*", "sort", "n_tl1 desc");
-
- // test adding another replica to a shard - it should do a
- // recovery/replication to pick up the index from the leader
- addNewReplica();
-
- long docId = testUpdateAndDelete();
-
- // index a bad doc...
+ boolean testFinished = false;
try {
- indexr(t1, "a doc with no id");
- fail("this should fail");
- } catch (SolrException e) {
- // expected
+ handle.clear();
+ handle.put("QTime", SKIPVAL);
+ handle.put("timestamp", SKIPVAL);
+
+ indexr(id, 1, i1, 100, tlong, 100, t1,
+ "now is the time for all good men", "foo_f", 1.414f, "foo_b", "true",
+ "foo_d", 1.414d);
+
+ // make sure we are in a steady state...
+ waitForRecoveriesToFinish(false);
+
+ commit();
+
+ assertDocCounts(false);
+
+ indexAbunchOfDocs();
+
+ commit();
+
+ assertDocCounts(VERBOSE);
+ checkQueries();
+
+ assertDocCounts(VERBOSE);
+
+ query("q", "*:*", "sort", "n_tl1 desc");
+
+ brindDownShardIndexSomeDocsAndRecover();
+
+ query("q", "*:*", "sort", "n_tl1 desc");
+
+ // test adding another replica to a shard - it should do a
+ // recovery/replication to pick up the index from the leader
+ addNewReplica();
+
+ long docId = testUpdateAndDelete();
+
+ // index a bad doc...
+ try {
+ indexr(t1, "a doc with no id");
+ fail("this should fail");
+ } catch (SolrException e) {
+ // expected
+ }
+
+ // TODO: bring this to it's own method?
+ // try indexing to a leader that has no replicas up
+ ZkNodeProps leaderProps = zkStateReader.getLeaderProps(
+ DEFAULT_COLLECTION, SHARD2);
+
+ String nodeName = leaderProps.get(ZkStateReader.NODE_NAME_PROP);
+ chaosMonkey.stopShardExcept(SHARD2, nodeName);
+
+ SolrServer client = getClient(nodeName);
+
+ index_specific(client, "id", docId + 1, t1, "what happens here?");
+
+ // expire a session...
+ CloudJettyRunner cloudJetty = shardToJetty.get("shard1").get(0);
+ chaosMonkey.expireSession(cloudJetty.jetty);
+
+ indexr("id", docId + 1, t1, "slip this doc in");
+
+ waitForRecoveriesToFinish(false);
+
+ checkShardConsistency("shard1");
+
+ testFinished = true;
+ } finally {
+ if (!testFinished) {
+ printLayoutOnTearDown = true;
+ }
}
- // TODO: bring this to it's own method?
- // try indexing to a leader that has no replicas up
- ZkNodeProps leaderProps = zkStateReader.getLeaderProps(DEFAULT_COLLECTION,
- SHARD2);
-
- String nodeName = leaderProps.get(ZkStateReader.NODE_NAME_PROP);
- chaosMonkey.stopShardExcept(SHARD2, nodeName);
-
- SolrServer client = getClient(nodeName);
-
- index_specific(client, "id", docId + 1, t1, "what happens here?");
-
- // expire a session...
- CloudJettyRunner cloudJetty = shardToJetty.get("shard1").get(0);
- chaosMonkey.expireSession(cloudJetty.jetty);
-
- indexr("id", docId + 1, t1, "slip this doc in");
-
- waitForRecoveriesToFinish(false);
-
- checkShardConsistency("shard1");
-
}
private long testUpdateAndDelete() throws Exception, SolrServerException,
@@ -1182,7 +1239,7 @@ public class FullSolrCloudTest extends A
@Override
@After
public void tearDown() throws Exception {
- if (VERBOSE) {
+ if (VERBOSE || printLayoutOnTearDown) {
super.printLayout();
}
((CommonsHttpSolrServer) controlClient).shutdown();
@@ -1222,7 +1279,7 @@ public class FullSolrCloudTest extends A
+ DEFAULT_COLLECTION;
CommonsHttpSolrServer s = new CommonsHttpSolrServer(url);
s.setConnectionTimeout(100); // 1/10th sec
- s.setSoTimeout(30000);
+ s.setSoTimeout(45000);
s.setDefaultMaxConnectionsPerHost(100);
s.setMaxTotalConnections(100);
return s;
Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java Sun Jan 29 12:18:50 2012
@@ -34,7 +34,6 @@ import org.apache.solr.common.cloud.ZkNo
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreContainer.Initializer;
-import org.apache.solr.core.SolrConfig;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -101,7 +100,7 @@ public class LeaderElectionIntegrationTe
AbstractZkTestCase.TIMEOUT);
reader = new ZkStateReader(zkClient);
-
+ reader.createClusterStateWatchersAndUpdate();
log.info("####SETUP_END " + getName());
}
@@ -150,12 +149,12 @@ public class LeaderElectionIntegrationTe
//printLayout(zkServer.getZkAddress());
// poll until leader change is visible
- for (int j = 0; j < 30; j++) {
+ for (int j = 0; j < 90; j++) {
String currentLeader = getLeader();
if(!leader.equals(currentLeader)) {
break;
}
- Thread.sleep(100);
+ Thread.sleep(500);
}
leader = getLeader();
@@ -216,23 +215,11 @@ public class LeaderElectionIntegrationTe
//Thread.sleep(100000);
}
- private String getLeader() throws InterruptedException {
- String leader = null;
- int tries = 30;
- while (tries-- > 0) {
- ZkNodeProps props;
- try {
- reader.updateCloudState(true);
- props = reader.getLeaderProps("collection1", "shard1", 500);
- leader = props.get(ZkStateReader.NODE_NAME_PROP);
- if (leader != null) {
- break;
- }
- } catch (KeeperException e) {
- // ignore
- }
- Thread.sleep(200);
- }
+ private String getLeader() throws InterruptedException, KeeperException {
+
+ ZkNodeProps props = reader.getLeaderProps("collection1", "shard1", 30000);
+ String leader = props.get(ZkStateReader.NODE_NAME_PROP);
+
return leader;
}
Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java Sun Jan 29 12:18:50 2012
@@ -32,15 +32,12 @@ import org.apache.solr.common.cloud.Solr
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.core.SolrConfig;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
-@Ignore
public class LeaderElectionTest extends SolrTestCaseJ4 {
static final int TIMEOUT = 30000;
@@ -58,8 +55,7 @@ public class LeaderElectionTest extends
@AfterClass
public static void afterClass() throws InterruptedException {
- // wait just a bit for any zk client threads to outlast timeout
- Thread.sleep(2000);
+
}
@Override
@@ -90,10 +86,18 @@ public class LeaderElectionTest extends
public ClientThread(int nodeNumber) throws Exception {
super("Thread-" + nodeNumber);
+ boolean created = false;
this.zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
- this.zkStateReader = new ZkStateReader(zkClient);
- this.nodeNumber = nodeNumber;
- props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, Integer.toString(nodeNumber), ZkStateReader.CORE_NAME_PROP, "");
+ try {
+ this.zkStateReader = new ZkStateReader(zkClient);
+ this.nodeNumber = nodeNumber;
+ props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, Integer.toString(nodeNumber), ZkStateReader.CORE_NAME_PROP, "");
+ created = true;
+ } finally {
+ if (!created) {
+ zkClient.close();
+ }
+ }
}
@Override
@@ -156,7 +160,7 @@ public class LeaderElectionTest extends
private String getLeaderUrl(final String collection, final String slice)
throws KeeperException, InterruptedException {
- int iterCount = 30;
+ int iterCount = 60;
while (iterCount-- > 0)
try {
byte[] data = zkClient.getData(
@@ -166,7 +170,7 @@ public class LeaderElectionTest extends
ZkNodeProps.load(data));
return leaderProps.getCoreUrl();
} catch (NoNodeException e) {
- Thread.sleep(100);
+ Thread.sleep(500);
}
throw new RuntimeException("Could not get leader props");
}
@@ -284,13 +288,13 @@ public class LeaderElectionTest extends
threads.add(thread1);
scheduler.schedule(thread1, 0, TimeUnit.MILLISECONDS);
- Thread.sleep(4000);
+ Thread.sleep(2000);
Thread scheduleThread = new Thread() {
@Override
public void run() {
-
- for (int i = 1; i < atLeast(15); i++) {
+ int count = atLeast(5);
+ for (int i = 1; i < count; i++) {
int launchIn = random.nextInt(500);
ClientThread thread = null;
try {
@@ -365,7 +369,7 @@ public class LeaderElectionTest extends
connLossThread.start();
killThread.start();
- Thread.sleep(6000);
+ Thread.sleep(4000);
stopStress = true;
@@ -374,14 +378,14 @@ public class LeaderElectionTest extends
killThread.interrupt();
scheduleThread.join();
+ scheduler.shutdownNow();
+
connLossThread.join();
killThread.join();
- scheduler.shutdownNow();
-
int seq = threads.get(getLeaderThread()).getSeq();
- assertFalse("seq is -1 and we may have a zombie leader", seq == -1);
+ // we have a leader we know, TODO: lets check some other things
// cleanup any threads still running
for (ClientThread thread : threads) {
Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java Sun Jan 29 12:18:50 2012
@@ -146,7 +146,7 @@ public class OverseerTest extends SolrTe
SolrZkClient zkClient = null;
ZkStateReader reader = null;
final ZkController[] controllers = new ZkController[nodeCount];
-
+ final ExecutorService[] nodeExecutors = new ExecutorService[nodeCount];
try {
server.run();
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
@@ -174,7 +174,6 @@ public class OverseerTest extends SolrTe
.getAbsolutePath());
- final ExecutorService[] nodeExecutors = new ExecutorService[nodeCount];
for (int i = 0; i < nodeCount; i++) {
nodeExecutors[i] = Executors.newFixedThreadPool(1);
}
@@ -232,7 +231,7 @@ public class OverseerTest extends SolrTe
}
// make sure all cores have been returned a id
- for (int i = 0; i < 150; i++) {
+ for (int i = 0; i < 90; i++) {
int assignedCount = 0;
for (int j = 0; j < coreCount; j++) {
if (ids[j] != null) {
@@ -242,7 +241,7 @@ public class OverseerTest extends SolrTe
if (coreCount == assignedCount) {
break;
}
- Thread.sleep(200);
+ Thread.sleep(500);
}
final HashMap<String, AtomicInteger> counters = new HashMap<String,AtomicInteger>();
@@ -289,6 +288,9 @@ public class OverseerTest extends SolrTe
controllers[i].close();
}
server.shutdown();
+ for (int i = 0; i < nodeCount; i++) {
+ nodeExecutors[i].shutdownNow();
+ }
}
System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java Sun Jan 29 12:18:50 2012
@@ -102,6 +102,7 @@ public class ZkControllerTest extends So
ZkTestServer server = new ZkTestServer(zkDir);
ZkController zkController = null;
+ boolean testFinished = false;
try {
server.run();
@@ -127,8 +128,12 @@ public class ZkControllerTest extends So
if (DEBUG) {
zkController.printLayoutToStdOut();
}
-
+ testFinished = true;
} finally {
+ if (testFinished) {
+ zkController.getZkClient().printLayoutToStdOut();
+ }
+
if (zkController != null) {
zkController.close();
}
Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/request/JSONWriterTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/request/JSONWriterTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/request/JSONWriterTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/request/JSONWriterTest.java Sun Jan 29 12:18:50 2012
@@ -41,7 +41,7 @@ public class JSONWriterTest extends Solr
}
@Test
- public void testNaNInf() throws IOException {
+ public void testTypes() throws IOException {
SolrQueryRequest req = req("dummy");
SolrQueryResponse rsp = new SolrQueryResponse();
QueryResponseWriter w = new PythonResponseWriter();
@@ -77,8 +77,12 @@ public class JSONWriterTest extends Solr
nl.add(null, 42);
rsp.add("nl", nl);
+ rsp.add("byte", Byte.valueOf((byte)-3));
+ rsp.add("short", Short.valueOf((short)-4));
+ rsp.add("bytes", "abc".getBytes("UTF-8"));
+
w.write(buf, req, rsp);
- assertEquals("{\"nl\":[[\"data1\",\"he\\u2028llo\\u2029!\"],[null,42]]}", buf.toString());
+ assertEquals("{\"nl\":[[\"data1\",\"he\\u2028llo\\u2029!\"],[null,42]],\"byte\":-3,\"short\":-4,\"bytes\":\"YWJj\"}", buf.toString());
req.close();
}
Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/request/TestFaceting.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/request/TestFaceting.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/request/TestFaceting.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/request/TestFaceting.java Sun Jan 29 12:18:50 2012
@@ -21,6 +21,7 @@ import java.util.Locale;
import java.util.Random;
import org.apache.lucene.index.DocTermOrds;
+import org.apache.lucene.index.SlowMultiReaderWrapper;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
@@ -80,7 +81,7 @@ public class TestFaceting extends SolrTe
assertEquals(size, uif.getNumTerms());
- TermsEnum te = uif.getOrdTermsEnum(req.getSearcher().getIndexReader());
+ TermsEnum te = uif.getOrdTermsEnum(new SlowMultiReaderWrapper(req.getSearcher().getIndexReader()));
assertEquals(size == 0, te == null);
Random r = new Random(size);
Modified: lucene/dev/branches/lucene2858/solr/solrj/build.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/solrj/build.xml?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/solrj/build.xml (original)
+++ lucene/dev/branches/lucene2858/solr/solrj/build.xml Sun Jan 29 12:18:50 2012
@@ -31,6 +31,7 @@
<mkdir dir="${dist}/solrj-lib" />
<copy todir="${dist}/solrj-lib">
<fileset dir="${common-solr.dir}/lib">
+ <include name="apache-solr-noggit-*.jar"/>
<include name="commons-codec-*.jar"/>
<include name="commons-io-*.jar"/>
<include name="commons-httpclient-*.jar"/>
Modified: lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java (original)
+++ lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java Sun Jan 29 12:18:50 2012
@@ -475,10 +475,10 @@ public class CommonsHttpSolrServer exten
return processor.processResponse(respBody, charset);
}
catch (HttpException e) {
- throw new SolrServerException( e );
+ throw new SolrServerException(getBaseURL(), e);
}
catch (IOException e) {
- throw new SolrServerException( e );
+ throw new SolrServerException(getBaseURL(), e);
}
finally {
method.releaseConnection();
Modified: lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java (original)
+++ lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java Sun Jan 29 12:18:50 2012
@@ -40,7 +40,7 @@ public class HashPartitioner {
* works up to 65537 before requested num of ranges is one short
*
* @param partitions
- * @return
+ * @return Range for each partition
*/
public List<Range> partitionRange(int partitions) {
// some hokey code to partition the int space
Modified: lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (original)
+++ lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java Sun Jan 29 12:18:50 2012
@@ -34,7 +34,6 @@ import javax.xml.transform.stream.Stream
import javax.xml.transform.stream.StreamSource;
import org.apache.commons.io.FileUtils;
-import org.apache.lucene.store.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkClientConnectionStrategy.ZkUpdate;
import org.apache.zookeeper.CreateMode;
@@ -119,7 +118,6 @@ public class SolrZkClient {
public SolrZkClient(String zkServerAddress, int zkClientTimeout,
ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) throws InterruptedException,
TimeoutException, IOException {
- numOpens.incrementAndGet();
connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
+ zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect);
strat.connect(zkServerAddress, zkClientTimeout, connManager,
@@ -142,6 +140,7 @@ public class SolrZkClient {
}
});
connManager.waitForConnected(clientConnectTimeout);
+ numOpens.incrementAndGet();
}
/**
@@ -644,9 +643,6 @@ public class SolrZkClient {
* @throws InterruptedException
*/
public void close() throws InterruptedException {
- if (isClosed) {
- throw new AlreadyClosedException("This client has already been closed");
- }
isClosed = true;
keeper.close();
numCloses.incrementAndGet();
Modified: lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java (original)
+++ lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java Sun Jan 29 12:18:50 2012
@@ -54,7 +54,6 @@ public class ZkCmdExecutor {
/**
* Perform the given operation, retrying if the connection fails
*
- * @return
* @throws IOException
*/
@SuppressWarnings("unchecked")
Modified: lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/ZkOperation.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/ZkOperation.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/ZkOperation.java (original)
+++ lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/ZkOperation.java Sun Jan 29 12:18:50 2012
@@ -22,8 +22,7 @@ import java.io.IOException;
import org.apache.zookeeper.KeeperException;
/**
- * A callback object which can be used for implementing retry-able operations in the
- * {@link org.apache.solr.common.cloud.ZkCmdExecutor.lock.ProtocolSupport} class
+ * A callback object which can be used for implementing retry-able operations.
*
*/
public abstract class ZkOperation {