You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2013/01/25 16:28:47 UTC
svn commit: r1438555 - in /lucene/dev/branches/branch_4x: ./ dev-tools/
lucene/ lucene/analysis/
lucene/analysis/icu/src/java/org/apache/lucene/collation/ lucene/backwards/
lucene/benchmark/ lucene/codecs/ lucene/core/
lucene/core/src/test/org/apache/l...
Author: markrmiller
Date: Fri Jan 25 15:28:45 2013
New Revision: 1438555
URL: http://svn.apache.org/viewvc?rev=1438555&view=rev
Log:
SOLR-4043: Add ability to get success/failure responses from Collections API.
Added:
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerSolrResponse.java
- copied unchanged from r1438550, lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerSolrResponse.java
Modified:
lucene/dev/branches/branch_4x/ (props changed)
lucene/dev/branches/branch_4x/dev-tools/ (props changed)
lucene/dev/branches/branch_4x/lucene/ (props changed)
lucene/dev/branches/branch_4x/lucene/BUILD.txt (props changed)
lucene/dev/branches/branch_4x/lucene/CHANGES.txt (props changed)
lucene/dev/branches/branch_4x/lucene/JRE_VERSION_MIGRATION.txt (props changed)
lucene/dev/branches/branch_4x/lucene/LICENSE.txt (props changed)
lucene/dev/branches/branch_4x/lucene/MIGRATE.txt (props changed)
lucene/dev/branches/branch_4x/lucene/NOTICE.txt (props changed)
lucene/dev/branches/branch_4x/lucene/README.txt (props changed)
lucene/dev/branches/branch_4x/lucene/SYSTEM_REQUIREMENTS.txt (props changed)
lucene/dev/branches/branch_4x/lucene/analysis/ (props changed)
lucene/dev/branches/branch_4x/lucene/analysis/icu/src/java/org/apache/lucene/collation/ICUCollationKeyFilterFactory.java (props changed)
lucene/dev/branches/branch_4x/lucene/backwards/ (props changed)
lucene/dev/branches/branch_4x/lucene/benchmark/ (props changed)
lucene/dev/branches/branch_4x/lucene/build.xml (props changed)
lucene/dev/branches/branch_4x/lucene/codecs/ (props changed)
lucene/dev/branches/branch_4x/lucene/common-build.xml (props changed)
lucene/dev/branches/branch_4x/lucene/core/ (props changed)
lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestBackwardsCompatibility.java (props changed)
lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/index.40.cfs.zip (props changed)
lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/index.40.nocfs.zip (props changed)
lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/index.40.optimized.cfs.zip (props changed)
lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/index.40.optimized.nocfs.zip (props changed)
lucene/dev/branches/branch_4x/lucene/demo/ (props changed)
lucene/dev/branches/branch_4x/lucene/facet/ (props changed)
lucene/dev/branches/branch_4x/lucene/grouping/ (props changed)
lucene/dev/branches/branch_4x/lucene/highlighter/ (props changed)
lucene/dev/branches/branch_4x/lucene/ivy-settings.xml (props changed)
lucene/dev/branches/branch_4x/lucene/join/ (props changed)
lucene/dev/branches/branch_4x/lucene/licenses/ (props changed)
lucene/dev/branches/branch_4x/lucene/memory/ (props changed)
lucene/dev/branches/branch_4x/lucene/misc/ (props changed)
lucene/dev/branches/branch_4x/lucene/module-build.xml (props changed)
lucene/dev/branches/branch_4x/lucene/queries/ (props changed)
lucene/dev/branches/branch_4x/lucene/queryparser/ (props changed)
lucene/dev/branches/branch_4x/lucene/sandbox/ (props changed)
lucene/dev/branches/branch_4x/lucene/site/ (props changed)
lucene/dev/branches/branch_4x/lucene/spatial/ (props changed)
lucene/dev/branches/branch_4x/lucene/suggest/ (props changed)
lucene/dev/branches/branch_4x/lucene/test-framework/ (props changed)
lucene/dev/branches/branch_4x/lucene/tools/ (props changed)
lucene/dev/branches/branch_4x/solr/ (props changed)
lucene/dev/branches/branch_4x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_4x/solr/LICENSE.txt (props changed)
lucene/dev/branches/branch_4x/solr/NOTICE.txt (props changed)
lucene/dev/branches/branch_4x/solr/README.txt (props changed)
lucene/dev/branches/branch_4x/solr/SYSTEM_REQUIREMENTS.txt (props changed)
lucene/dev/branches/branch_4x/solr/build.xml (props changed)
lucene/dev/branches/branch_4x/solr/cloud-dev/ (props changed)
lucene/dev/branches/branch_4x/solr/common-build.xml (props changed)
lucene/dev/branches/branch_4x/solr/contrib/ (props changed)
lucene/dev/branches/branch_4x/solr/core/ (props changed)
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
lucene/dev/branches/branch_4x/solr/example/ (props changed)
lucene/dev/branches/branch_4x/solr/licenses/ (props changed)
lucene/dev/branches/branch_4x/solr/licenses/httpclient-LICENSE-ASL.txt (props changed)
lucene/dev/branches/branch_4x/solr/licenses/httpclient-NOTICE.txt (props changed)
lucene/dev/branches/branch_4x/solr/licenses/httpcore-LICENSE-ASL.txt (props changed)
lucene/dev/branches/branch_4x/solr/licenses/httpcore-NOTICE.txt (props changed)
lucene/dev/branches/branch_4x/solr/licenses/httpmime-LICENSE-ASL.txt (props changed)
lucene/dev/branches/branch_4x/solr/licenses/httpmime-NOTICE.txt (props changed)
lucene/dev/branches/branch_4x/solr/scripts/ (props changed)
lucene/dev/branches/branch_4x/solr/site/ (props changed)
lucene/dev/branches/branch_4x/solr/solrj/ (props changed)
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/SolrResponse.java
lucene/dev/branches/branch_4x/solr/test-framework/ (props changed)
lucene/dev/branches/branch_4x/solr/testlogging.properties (props changed)
lucene/dev/branches/branch_4x/solr/webapp/ (props changed)
Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1438555&r1=1438554&r2=1438555&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Fri Jan 25 15:28:45 2013
@@ -41,6 +41,9 @@ Detailed Change List
New Features
----------------------
+* SOLR-4043: Add ability to get success/failure responses from Collections API.
+ (Raintung Li, Mark Miller)
+
Bug Fixes
----------------------
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1438555&r1=1438554&r2=1438555&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Fri Jan 25 15:28:45 2013
@@ -48,6 +48,8 @@ public class DistributedQueue {
private final String prefix = "qn-";
+ private final String response_prefix = "qnr-" ;
+
public DistributedQueue(SolrZkClient zookeeper, String dir, List<ACL> acl) {
this.dir = dir;
@@ -100,7 +102,7 @@ public class DistributedQueue {
*
* @return the data at the head of the queue.
*/
- public byte[] element() throws NoSuchElementException, KeeperException,
+ private QueueEvent element() throws NoSuchElementException, KeeperException,
InterruptedException {
TreeMap<Long,String> orderedChildren;
@@ -122,7 +124,7 @@ public class DistributedQueue {
for (String headNode : orderedChildren.values()) {
if (headNode != null) {
try {
- return zookeeper.getData(dir + "/" + headNode, null, null, true);
+ return new QueueEvent(dir + "/" + headNode, zookeeper.getData(dir + "/" + headNode, null, null, true), null);
} catch (KeeperException.NoNodeException e) {
// Another client removed the node first, try next
}
@@ -162,17 +164,41 @@ public class DistributedQueue {
}
}
+ /**
+ * Remove the event and save the response into the other path.
+ *
+ */
+ public byte[] remove(QueueEvent event) throws KeeperException,
+ InterruptedException {
+ String path = event.getId();
+ String responsePath = dir + "/" + response_prefix
+ + path.substring(path.lastIndexOf("-") + 1);
+ if (zookeeper.exists(responsePath, true)) {
+ zookeeper.setData(responsePath, event.getBytes(), true);
+ }
+ byte[] data = zookeeper.getData(path, null, null, true);
+ zookeeper.delete(path, -1, true);
+ return data;
+ }
+
+
private class LatchChildWatcher implements Watcher {
Object lock = new Object();
+ private WatchedEvent event = null;
public LatchChildWatcher() {}
+ public LatchChildWatcher(Object lock) {
+ this.lock = lock;
+ }
+
@Override
public void process(WatchedEvent event) {
LOG.info("Watcher fired on path: " + event.getPath() + " state: "
+ event.getState() + " type " + event.getType());
synchronized (lock) {
+ this.event = event;
lock.notifyAll();
}
}
@@ -182,6 +208,10 @@ public class DistributedQueue {
lock.wait(timeout);
}
}
+
+ public WatchedEvent getWatchedEvent() {
+ return event;
+ }
}
/**
@@ -225,22 +255,51 @@ public class DistributedQueue {
*/
public boolean offer(byte[] data) throws KeeperException,
InterruptedException {
+ return createData(dir + "/" + prefix, data,
+ CreateMode.PERSISTENT_SEQUENTIAL) != null;
+ }
+
+ /**
+ * Inserts data into zookeeper.
+ *
+ * @return true if data was successfully added
+ */
+ private String createData(String path, byte[] data, CreateMode mode)
+ throws KeeperException, InterruptedException {
for (;;) {
try {
- zookeeper.create(dir + "/" + prefix, data, acl,
- CreateMode.PERSISTENT_SEQUENTIAL, true);
- return true;
+ return zookeeper.create(path, data, acl, mode, true);
} catch (KeeperException.NoNodeException e) {
try {
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
} catch (KeeperException.NodeExistsException ne) {
- //someone created it
+ // someone created it
}
}
}
-
-
-
+ }
+
+ /**
+ * Offer the data and wait for the response
+ *
+ */
+ public QueueEvent offer(byte[] data, long timeout) throws KeeperException,
+ InterruptedException {
+ String path = createData(dir + "/" + prefix, data,
+ CreateMode.PERSISTENT_SEQUENTIAL);
+ String watchID = createData(
+ dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
+ null, CreateMode.EPHEMERAL);
+ Object lock = new Object();
+ LatchChildWatcher watcher = new LatchChildWatcher(lock);
+ synchronized (lock) {
+ if (zookeeper.exists(watchID, watcher, true) != null) {
+ watcher.await(timeout);
+ }
+ }
+ byte[] bytes = zookeeper.getData(watchID, null, null, true);
+ zookeeper.delete(watchID, -1, true);
+ return new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
}
/**
@@ -251,21 +310,74 @@ public class DistributedQueue {
*/
public byte[] peek() throws KeeperException, InterruptedException {
try {
- return element();
+ return element().getBytes();
} catch (NoSuchElementException e) {
return null;
}
}
+ public static class QueueEvent {
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((id == null) ? 0 : id.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ QueueEvent other = (QueueEvent) obj;
+ if (id == null) {
+ if (other.id != null) return false;
+ } else if (!id.equals(other.id)) return false;
+ return true;
+ }
+
+ private WatchedEvent event = null;
+ private String id;
+ private byte[] bytes;
+
+ QueueEvent(String id, byte[] bytes, WatchedEvent event) {
+ this.id = id;
+ this.bytes = bytes;
+ this.event = event;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setBytes(byte[] bytes) {
+ this.bytes = bytes;
+ }
+
+ public byte[] getBytes() {
+ return bytes;
+ }
+
+ public WatchedEvent getWatchedEvent() {
+ return event;
+ }
+
+ }
+
/**
* Returns the data at the first element of the queue, or null if the queue is
* empty.
*
* @return data at the first element of the queue, or null.
*/
- public byte[] peek(boolean block) throws KeeperException, InterruptedException {
+ public QueueEvent peek(boolean block) throws KeeperException, InterruptedException {
if (!block) {
- return peek();
+ return element();
}
TreeMap<Long,String> orderedChildren;
@@ -286,7 +398,7 @@ public class DistributedQueue {
String path = dir + "/" + headNode;
try {
byte[] data = zookeeper.getData(path, null, null, true);
- return data;
+ return new QueueEvent(path, data, childWatcher.getWatchedEvent());
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first.
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1438555&r1=1438554&r2=1438555&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Fri Jan 25 15:28:45 2013
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClosableThread;
@@ -36,6 +38,7 @@ import org.apache.solr.common.cloud.ZooK
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
@@ -94,47 +97,33 @@ public class OverseerCollectionProcessor
@Override
public void run() {
- log.info("Process current queue of collection messages");
- while (amILeader() && !isClosed) {
- try {
- byte[] head = workQueue.peek(true);
-
- //if (head != null) { // should not happen since we block above
- final ZkNodeProps message = ZkNodeProps.load(head);
- final String operation = message.getStr(QUEUE_OPERATION);
- try {
- boolean success = processMessage(message, operation);
- if (!success) {
- // TODO: what to do on failure / partial failure
- // if we fail, do we clean up then ?
- SolrException.log(log,
- "Collection " + operation + " of " + message.getStr("name")
- + " failed");
- }
- } catch(Throwable t) {
- SolrException.log(log,
- "Collection " + operation + " of " + message.getStr("name")
- + " failed", t);
- }
- //}
-
-
- workQueue.poll();
-
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.SESSIONEXPIRED
- || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("Overseer cannot talk to ZK");
- return;
- }
- SolrException.log(log, "", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
- e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
- }
- }
+ log.info("Process current queue of collection creations");
+ while (amILeader() && !isClosed) {
+ try {
+ QueueEvent head = workQueue.peek(true);
+ final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
+ log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString());
+ final String operation = message.getStr(QUEUE_OPERATION);
+ SolrResponse response = processMessage(message, operation);
+ head.setBytes(SolrResponse.serializable(response));
+ workQueue.remove(head);
+ log.info("Overseer Collection Processor: Message id:" + head.getId() + " complete, response:"+ response.getResponse().toString());
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED
+ || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("Overseer cannot talk to ZK");
+ return;
+ }
+ SolrException.log(log, "", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ } catch (Throwable e) {
+ SolrException.log(log, "", e);
+ }
+ }
}
public void close() {
@@ -157,21 +146,49 @@ public class OverseerCollectionProcessor
return false;
}
- protected boolean processMessage(ZkNodeProps message, String operation) {
- if (CREATECOLLECTION.equals(operation)) {
- return createCollection(zkStateReader.getClusterState(), message);
- } else if (DELETECOLLECTION.equals(operation)) {
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
- params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
- return collectionCmd(zkStateReader.getClusterState(), message, params);
- } else if (RELOADCOLLECTION.equals(operation)) {
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
- return collectionCmd(zkStateReader.getClusterState(), message, params);
+
+ protected SolrResponse processMessage(ZkNodeProps message, String operation) {
+
+ NamedList results = new NamedList();
+ try {
+ if (CREATECOLLECTION.equals(operation)) {
+ createCollection(zkStateReader.getClusterState(), message);
+ } else if (DELETECOLLECTION.equals(operation)) {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
+ params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
+ collectionCmd(zkStateReader.getClusterState(), message, params);
+ } else if (RELOADCOLLECTION.equals(operation)) {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
+ collectionCmd(zkStateReader.getClusterState(), message, params);
+ } else {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Unknow the operation:"
+ + operation);
+ }
+ int failed = 0;
+ ShardResponse srsp;
+
+ do {
+ srsp = shardHandler.takeCompletedIncludingErrors();
+ if (srsp != null) {
+ Throwable e = srsp.getException();
+ if (e != null) {
+ failed++;
+ log.error("Error talking to shard: " + srsp.getShard(), e);
+ results.add(srsp.getShard(), e);
+ } else {
+ results.add(srsp.getShard(), srsp.getSolrResponse().getResponse());
+ }
+ }
+ } while (srsp != null);
+ } catch (SolrException ex) {
+ SolrException.log(log, "Collection " + operation + " of " + operation
+ + " failed");
+ results.add("Operation " + operation + " cause exception:", ex);
+ } finally {
+ return new OverseerSolrResponse(results);
}
- // unknown command, toss it from our queue
- return true;
}
private boolean createCollection(ClusterState clusterState, ZkNodeProps message) {
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1438555&r1=1438554&r2=1438555&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Fri Jan 25 15:28:45 2013
@@ -21,10 +21,12 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
+import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerCollectionProcessor;
import org.apache.solr.common.SolrException;
@@ -127,7 +129,35 @@ public class CollectionsHandler extends
rsp.setHttpCaching(false);
}
-
+
+ public static long DEFAULT_ZK_TIMEOUT = 60*1000;
+
+ private void handleResponse(String operation, ZkNodeProps m,
+ SolrQueryResponse rsp) throws KeeperException, InterruptedException {
+ long time = System.currentTimeMillis();
+ QueueEvent event = coreContainer.getZkController()
+ .getOverseerCollectionQueue()
+ .offer(ZkStateReader.toJSON(m), DEFAULT_ZK_TIMEOUT);
+ if (event.getBytes() != null) {
+ SolrResponse response = SolrResponse.deserialize(event.getBytes());
+ rsp.getValues().addAll(response.getResponse());
+ } else {
+ if (System.currentTimeMillis() - time >= DEFAULT_ZK_TIMEOUT) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, operation
+ + " the collection time out:" + DEFAULT_ZK_TIMEOUT / 1000 + "s");
+ } else if (event.getWatchedEvent() != null) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, operation
+ + " the collection error [Watcher fired on path: "
+ + event.getWatchedEvent().getPath() + " state: "
+ + event.getWatchedEvent().getState() + " type "
+ + event.getWatchedEvent().getType() + "]");
+ } else {
+ throw new SolrException(ErrorCode.SERVER_ERROR, operation
+ + " the collection unkown case");
+ }
+ }
+ }
+
private void handleReloadAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
log.info("Reloading Collection : " + req.getParamString());
String name = req.getParams().required().get("name");
@@ -135,8 +165,7 @@ public class CollectionsHandler extends
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.RELOADCOLLECTION, "name", name);
- // TODO: what if you want to block until the collection is available?
- coreContainer.getZkController().getOverseerCollectionQueue().offer(ZkStateReader.toJSON(m));
+ handleResponse(OverseerCollectionProcessor.RELOADCOLLECTION, m, rsp);
}
private void handleSyncShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException, SolrServerException, IOException {
@@ -168,8 +197,7 @@ public class CollectionsHandler extends
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.DELETECOLLECTION, "name", name);
- // TODO: what if you want to block until the collection is available?
- coreContainer.getZkController().getOverseerCollectionQueue().offer(ZkStateReader.toJSON(m));
+ handleResponse(OverseerCollectionProcessor.DELETECOLLECTION, m, rsp);
}
@@ -208,8 +236,7 @@ public class CollectionsHandler extends
ZkNodeProps m = new ZkNodeProps(props);
- // TODO: what if you want to block until the collection is available?
- coreContainer.getZkController().getOverseerCollectionQueue().offer(ZkStateReader.toJSON(m));
+ handleResponse(OverseerCollectionProcessor.CREATECOLLECTION, m, rsp);
}
public static ModifiableSolrParams params(String... params) {
Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java?rev=1438555&r1=1438554&r2=1438555&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java Fri Jan 25 15:28:45 2013
@@ -19,10 +19,11 @@ package org.apache.solr.cloud;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import java.util.ArrayList;
@@ -36,6 +37,8 @@ import java.util.Queue;
import java.util.Set;
import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -43,11 +46,13 @@ import org.apache.solr.common.cloud.ZkSt
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.easymock.Capture;
+import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.junit.After;
@@ -71,12 +76,12 @@ public class OverseerCollectionProcessor
private OverseerCollectionProcessorToBeTested underTest;
private Thread thread;
- private Queue<byte[]> queue = new BlockingArrayQueue<byte[]>();
+ private Queue<QueueEvent> queue = new BlockingArrayQueue<QueueEvent>();
private class OverseerCollectionProcessorToBeTested extends
OverseerCollectionProcessor {
- private boolean lastProcessMessageResult = true;
+ private SolrResponse lastProcessMessageResult;
public OverseerCollectionProcessorToBeTested(ZkStateReader zkStateReader,
String myId, ShardHandler shardHandler, String adminPath,
@@ -85,7 +90,7 @@ public class OverseerCollectionProcessor
}
@Override
- protected boolean processMessage(ZkNodeProps message, String operation) {
+ protected SolrResponse processMessage(ZkNodeProps message, String operation) {
lastProcessMessageResult = super.processMessage(message, operation);
return lastProcessMessageResult;
}
@@ -147,11 +152,12 @@ public class OverseerCollectionProcessor
}
}).anyTimes();
- workQueueMock.remove();
+ workQueueMock.remove(anyObject(QueueEvent.class));
expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
- return queue.poll();
+ queue.remove((QueueEvent)EasyMock.getCurrentArguments()[0]);
+ return null;
}
}).anyTimes();
@@ -273,7 +279,8 @@ public class OverseerCollectionProcessor
OverseerCollectionProcessor.MAX_SHARDS_PER_NODE,
maxShardsPerNode.toString());
}
- queue.add(ZkStateReader.toJSON(props));
+ QueueEvent qe = new QueueEvent("id", ZkStateReader.toJSON(props), null);
+ queue.add(qe);
}
protected void verifySubmitCaptures(List<SubmitCapture> submitCaptures,
@@ -443,7 +450,9 @@ public class OverseerCollectionProcessor
waitForEmptyQueue(10000);
- assertEquals(collectionExceptedToBeCreated, underTest.lastProcessMessageResult);
+ if (collectionExceptedToBeCreated) {
+ assertNotNull(underTest.lastProcessMessageResult.getResponse().toString(), underTest.lastProcessMessageResult);
+ }
verify(shardHandlerMock);
if (collectionExceptedToBeCreated) {
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/SolrResponse.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/SolrResponse.java?rev=1438555&r1=1438554&r2=1438555&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/SolrResponse.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/SolrResponse.java Fri Jan 25 15:28:45 2013
@@ -17,19 +17,47 @@
package org.apache.solr.client.solrj;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.io.Serializable;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.util.NamedList;
/**
*
- *
+ *
* @since solr 1.3
*/
-public abstract class SolrResponse implements Serializable
-{
+public abstract class SolrResponse implements Serializable {
public abstract long getElapsedTime();
- public abstract void setResponse( NamedList<Object> rsp );
+
+ public abstract void setResponse(NamedList<Object> rsp);
+
public abstract NamedList<Object> getResponse();
+
+ public static byte[] serializable(SolrResponse response) {
+ try {
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ ObjectOutputStream outputStream = new ObjectOutputStream(byteStream);
+ outputStream.writeObject(response);
+ return byteStream.toByteArray();
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+ public static SolrResponse deserialize(byte[] bytes) {
+ try {
+ ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
+ ObjectInputStream inputStream = new ObjectInputStream(byteStream);
+ return (SolrResponse) inputStream.readObject();
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+ }
}