You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2013/01/25 16:14:51 UTC

svn commit: r1438550 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/handler/admin/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/client/solrj/

Author: markrmiller
Date: Fri Jan 25 15:14:51 2013
New Revision: 1438550

URL: http://svn.apache.org/viewvc?rev=1438550&view=rev
Log:
SOLR-4043: Add ability to get success/failure responses from Collections API.

Added:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerSolrResponse.java   (with props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/SolrResponse.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1438550&r1=1438549&r2=1438550&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Fri Jan 25 15:14:51 2013
@@ -63,6 +63,9 @@ Detailed Change List
 New Features
 ----------------------
 
+* SOLR-4043: Add ability to get success/failure responses from Collections API.
+  (Raintung Li, Mark Miller)
+
 Bug Fixes
 ----------------------
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1438550&r1=1438549&r2=1438550&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Fri Jan 25 15:14:51 2013
@@ -48,6 +48,8 @@ public class DistributedQueue {
   
   private final String prefix = "qn-";
   
+  private final String response_prefix = "qnr-" ;
+  
   public DistributedQueue(SolrZkClient zookeeper, String dir, List<ACL> acl) {
     this.dir = dir;
     
@@ -100,7 +102,7 @@ public class DistributedQueue {
    * 
    * @return the data at the head of the queue.
    */
-  public byte[] element() throws NoSuchElementException, KeeperException,
+  private QueueEvent element() throws NoSuchElementException, KeeperException,
       InterruptedException {
     TreeMap<Long,String> orderedChildren;
     
@@ -122,7 +124,7 @@ public class DistributedQueue {
       for (String headNode : orderedChildren.values()) {
         if (headNode != null) {
           try {
-            return zookeeper.getData(dir + "/" + headNode, null, null, true);
+            return new QueueEvent(dir + "/" + headNode, zookeeper.getData(dir + "/" + headNode, null, null, true), null);
           } catch (KeeperException.NoNodeException e) {
             // Another client removed the node first, try next
           }
@@ -162,17 +164,41 @@ public class DistributedQueue {
     }
   }
   
+  /**
+   * Remove the event and save the response into the other path.
+   * 
+   */
+  public byte[] remove(QueueEvent event) throws KeeperException,
+      InterruptedException {
+    String path = event.getId();
+    String responsePath = dir + "/" + response_prefix
+        + path.substring(path.lastIndexOf("-") + 1);
+    if (zookeeper.exists(responsePath, true)) {
+      zookeeper.setData(responsePath, event.getBytes(), true);
+    }
+    byte[] data = zookeeper.getData(path, null, null, true);
+    zookeeper.delete(path, -1, true);
+    return data;
+  }
+  
+  
   private class LatchChildWatcher implements Watcher {
     
     Object lock = new Object();
+    private WatchedEvent event = null;
     
     public LatchChildWatcher() {}
     
+    public LatchChildWatcher(Object lock) {
+      this.lock = lock;
+    }
+    
     @Override
     public void process(WatchedEvent event) {
       LOG.info("Watcher fired on path: " + event.getPath() + " state: "
           + event.getState() + " type " + event.getType());
       synchronized (lock) {
+        this.event = event;
         lock.notifyAll();
       }
     }
@@ -182,6 +208,10 @@ public class DistributedQueue {
         lock.wait(timeout);
       }
     }
+    
+    public WatchedEvent getWatchedEvent() {
+      return event;
+    }
   }
   
   /**
@@ -225,22 +255,51 @@ public class DistributedQueue {
    */
   public boolean offer(byte[] data) throws KeeperException,
       InterruptedException {
+    return createData(dir + "/" + prefix, data,
+        CreateMode.PERSISTENT_SEQUENTIAL) != null;
+  }
+  
+  /**
+   * Inserts data into zookeeper.
+   * 
+   * @return true if data was successfully added
+   */
+  private String createData(String path, byte[] data, CreateMode mode)
+      throws KeeperException, InterruptedException {
     for (;;) {
       try {
-        zookeeper.create(dir + "/" + prefix, data, acl,
-            CreateMode.PERSISTENT_SEQUENTIAL, true);
-        return true;
+        return zookeeper.create(path, data, acl, mode, true);
       } catch (KeeperException.NoNodeException e) {
         try {
           zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
         } catch (KeeperException.NodeExistsException ne) {
-        //someone created it
+          // someone created it
         }
       }
     }
-
-    
-    
+  }
+  
+  /**
+   * Offer the data and wait for the response
+   * 
+   */
+  public QueueEvent offer(byte[] data, long timeout) throws KeeperException,
+      InterruptedException {
+    String path = createData(dir + "/" + prefix, data,
+        CreateMode.PERSISTENT_SEQUENTIAL);
+    String watchID = createData(
+        dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
+        null, CreateMode.EPHEMERAL);
+    Object lock = new Object();
+    LatchChildWatcher watcher = new LatchChildWatcher(lock);
+    synchronized (lock) {
+      if (zookeeper.exists(watchID, watcher, true) != null) {
+        watcher.await(timeout);
+      }
+    }
+    byte[] bytes = zookeeper.getData(watchID, null, null, true);
+    zookeeper.delete(watchID, -1, true);
+    return new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
   }
   
   /**
@@ -251,21 +310,74 @@ public class DistributedQueue {
    */
   public byte[] peek() throws KeeperException, InterruptedException {
     try {
-      return element();
+      return element().getBytes();
     } catch (NoSuchElementException e) {
       return null;
     }
   }
   
+  public static class QueueEvent {
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((id == null) ? 0 : id.hashCode());
+      return result;
+    }
+    
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (obj == null) return false;
+      if (getClass() != obj.getClass()) return false;
+      QueueEvent other = (QueueEvent) obj;
+      if (id == null) {
+        if (other.id != null) return false;
+      } else if (!id.equals(other.id)) return false;
+      return true;
+    }
+    
+    private WatchedEvent event = null;
+    private String id;
+    private byte[] bytes;
+    
+    QueueEvent(String id, byte[] bytes, WatchedEvent event) {
+      this.id = id;
+      this.bytes = bytes;
+      this.event = event;
+    }
+    
+    public void setId(String id) {
+      this.id = id;
+    }
+    
+    public String getId() {
+      return id;
+    }
+    
+    public void setBytes(byte[] bytes) {
+      this.bytes = bytes;
+    }
+    
+    public byte[] getBytes() {
+      return bytes;
+    }
+    
+    public WatchedEvent getWatchedEvent() {
+      return event;
+    }
+    
+  }
+  
   /**
    * Returns the data at the first element of the queue, or null if the queue is
    * empty.
    * 
    * @return data at the first element of the queue, or null.
    */
-  public byte[] peek(boolean block) throws KeeperException, InterruptedException {
+  public QueueEvent peek(boolean block) throws KeeperException, InterruptedException {
     if (!block) {
-      return peek();
+      return element();
     }
     
     TreeMap<Long,String> orderedChildren;
@@ -286,7 +398,7 @@ public class DistributedQueue {
         String path = dir + "/" + headNode;
         try {
           byte[] data = zookeeper.getData(path, null, null, true);
-          return data;
+          return new QueueEvent(path, data, childWatcher.getWatchedEvent());
         } catch (KeeperException.NoNodeException e) {
           // Another client deleted the node first.
         }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1438550&r1=1438549&r2=1438550&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Fri Jan 25 15:14:51 2013
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.cloud.DistributedQueue.QueueEvent;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClosableThread;
@@ -36,6 +38,7 @@ import org.apache.solr.common.cloud.ZooK
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.handler.component.ShardRequest;
@@ -94,47 +97,33 @@ public class OverseerCollectionProcessor
   
   @Override
   public void run() {
-    log.info("Process current queue of collection messages");
-    while (amILeader() && !isClosed) {
-      try {
-        byte[] head = workQueue.peek(true);
-        
-        //if (head != null) {    // should not happen since we block above
-          final ZkNodeProps message = ZkNodeProps.load(head);
-          final String operation = message.getStr(QUEUE_OPERATION);
-        try {
-          boolean success = processMessage(message, operation);
-          if (!success) {
-            // TODO: what to do on failure / partial failure
-            // if we fail, do we clean up then ?
-            SolrException.log(log,
-                "Collection " + operation + " of " + message.getStr("name")
-                    + " failed");
-          }
-        } catch(Throwable t) {
-          SolrException.log(log,
-              "Collection " + operation + " of " + message.getStr("name")
-                  + " failed", t);
-        }
-        //}
-        
-        
-        workQueue.poll();
-       
-      } catch (KeeperException e) {
-        if (e.code() == KeeperException.Code.SESSIONEXPIRED
-            || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-          log.warn("Overseer cannot talk to ZK");
-          return;
-        }
-        SolrException.log(log, "", e);
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
-            e);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        return;
-      }
-    }
+       log.info("Process current queue of collection creations");
+       while (amILeader() && !isClosed) {
+         try {
+           QueueEvent head = workQueue.peek(true);
+           final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
+           log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString());
+           final String operation = message.getStr(QUEUE_OPERATION);
+           SolrResponse response = processMessage(message, operation);
+           head.setBytes(SolrResponse.serializable(response));
+           workQueue.remove(head);
+          log.info("Overseer Collection Processor: Message id:" + head.getId() + " complete, response:"+ response.getResponse().toString());
+        } catch (KeeperException e) {
+          if (e.code() == KeeperException.Code.SESSIONEXPIRED
+              || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+             log.warn("Overseer cannot talk to ZK");
+             return;
+           }
+           SolrException.log(log, "", e);
+           throw new ZooKeeperException(
+               SolrException.ErrorCode.SERVER_ERROR, "", e);
+         } catch (InterruptedException e) {
+           Thread.currentThread().interrupt();
+           return;
+         } catch (Throwable e) {
+           SolrException.log(log, "", e);
+         }
+       }
   }
   
   public void close() {
@@ -157,21 +146,49 @@ public class OverseerCollectionProcessor
     return false;
   }
   
-  protected boolean processMessage(ZkNodeProps message, String operation) {
-    if (CREATECOLLECTION.equals(operation)) {
-      return createCollection(zkStateReader.getClusterState(), message);
-    } else if (DELETECOLLECTION.equals(operation)) {
-      ModifiableSolrParams params = new ModifiableSolrParams();
-      params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
-      params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
-      return collectionCmd(zkStateReader.getClusterState(), message, params);
-    } else if (RELOADCOLLECTION.equals(operation)) {
-      ModifiableSolrParams params = new ModifiableSolrParams();
-      params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
-      return collectionCmd(zkStateReader.getClusterState(), message, params);
+  
+  protected SolrResponse processMessage(ZkNodeProps message, String operation) {
+    
+    NamedList results = new NamedList();
+    try {
+      if (CREATECOLLECTION.equals(operation)) {
+        createCollection(zkStateReader.getClusterState(), message);
+      } else if (DELETECOLLECTION.equals(operation)) {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
+        params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
+        collectionCmd(zkStateReader.getClusterState(), message, params);
+      } else if (RELOADCOLLECTION.equals(operation)) {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
+        collectionCmd(zkStateReader.getClusterState(), message, params);
+      } else {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "Unknow the operation:"
+            + operation);
+      }
+      int failed = 0;
+      ShardResponse srsp;
+      
+      do {
+        srsp = shardHandler.takeCompletedIncludingErrors();
+        if (srsp != null) {
+          Throwable e = srsp.getException();
+          if (e != null) {
+            failed++;
+            log.error("Error talking to shard: " + srsp.getShard(), e);
+            results.add(srsp.getShard(), e);
+          } else {
+            results.add(srsp.getShard(), srsp.getSolrResponse().getResponse());
+          }
+        }
+      } while (srsp != null);
+    } catch (SolrException ex) {
+      SolrException.log(log, "Collection " + operation + " of " + operation
+          + " failed");
+      results.add("Operation " + operation + " cause exception:", ex);
+    } finally {
+      return new OverseerSolrResponse(results);
     }
-    // unknown command, toss it from our queue
-    return true;
   }
 
   private boolean createCollection(ClusterState clusterState, ZkNodeProps message) {

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerSolrResponse.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerSolrResponse.java?rev=1438550&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerSolrResponse.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerSolrResponse.java Fri Jan 25 15:14:51 2013
@@ -0,0 +1,47 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.common.util.NamedList;
+
+public class OverseerSolrResponse extends SolrResponse {
+  
+  NamedList responseList = null;
+  
+  public OverseerSolrResponse(NamedList list) {
+    responseList = list;
+  }
+  
+  @Override
+  public long getElapsedTime() {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+  
+  @Override
+  public void setResponse(NamedList<Object> rsp) {
+    this.responseList = rsp;
+  }
+  
+  @Override
+  public NamedList<Object> getResponse() {
+    return responseList;
+  }
+  
+}

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1438550&r1=1438549&r2=1438550&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Fri Jan 25 15:14:51 2013
@@ -21,10 +21,12 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.HttpSolrServer;
 import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
+import org.apache.solr.cloud.DistributedQueue.QueueEvent;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.OverseerCollectionProcessor;
 import org.apache.solr.common.SolrException;
@@ -127,7 +129,35 @@ public class CollectionsHandler extends 
 
     rsp.setHttpCaching(false);
   }
-
+  
+  public static long DEFAULT_ZK_TIMEOUT = 60*1000;
+  
+  private void handleResponse(String operation, ZkNodeProps m,
+      SolrQueryResponse rsp) throws KeeperException, InterruptedException {
+    long time = System.currentTimeMillis();
+    QueueEvent event = coreContainer.getZkController()
+        .getOverseerCollectionQueue()
+        .offer(ZkStateReader.toJSON(m), DEFAULT_ZK_TIMEOUT);
+    if (event.getBytes() != null) {
+      SolrResponse response = SolrResponse.deserialize(event.getBytes());
+      rsp.getValues().addAll(response.getResponse());
+    } else {
+      if (System.currentTimeMillis() - time >= DEFAULT_ZK_TIMEOUT) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, operation
+            + " the collection time out:" + DEFAULT_ZK_TIMEOUT / 1000 + "s");
+      } else if (event.getWatchedEvent() != null) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, operation
+            + " the collection error [Watcher fired on path: "
+            + event.getWatchedEvent().getPath() + " state: "
+            + event.getWatchedEvent().getState() + " type "
+            + event.getWatchedEvent().getType() + "]");
+      } else {
+        throw new SolrException(ErrorCode.SERVER_ERROR, operation
+            + " the collection unkown case");
+      }
+    }
+  }
+  
   private void handleReloadAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
     log.info("Reloading Collection : " + req.getParamString());
     String name = req.getParams().required().get("name");
@@ -135,8 +165,7 @@ public class CollectionsHandler extends 
     ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
         OverseerCollectionProcessor.RELOADCOLLECTION, "name", name);
 
-    // TODO: what if you want to block until the collection is available?
-    coreContainer.getZkController().getOverseerCollectionQueue().offer(ZkStateReader.toJSON(m));
+    handleResponse(OverseerCollectionProcessor.RELOADCOLLECTION, m, rsp);
   }
   
   private void handleSyncShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException, SolrServerException, IOException {
@@ -168,8 +197,7 @@ public class CollectionsHandler extends 
     ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
         OverseerCollectionProcessor.DELETECOLLECTION, "name", name);
 
-    // TODO: what if you want to block until the collection is available?
-    coreContainer.getZkController().getOverseerCollectionQueue().offer(ZkStateReader.toJSON(m));
+    handleResponse(OverseerCollectionProcessor.DELETECOLLECTION, m, rsp);
   }
 
 
@@ -208,8 +236,7 @@ public class CollectionsHandler extends 
     
     ZkNodeProps m = new ZkNodeProps(props);
 
-    // TODO: what if you want to block until the collection is available?
-    coreContainer.getZkController().getOverseerCollectionQueue().offer(ZkStateReader.toJSON(m));
+    handleResponse(OverseerCollectionProcessor.CREATECOLLECTION, m, rsp);
   }
 
   public static ModifiableSolrParams params(String... params) {

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java?rev=1438550&r1=1438549&r2=1438550&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java Fri Jan 25 15:14:51 2013
@@ -19,10 +19,11 @@ package org.apache.solr.cloud;
 
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 
 import java.util.ArrayList;
@@ -36,6 +37,8 @@ import java.util.Queue;
 import java.util.Set;
 
 import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.cloud.DistributedQueue.QueueEvent;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -43,11 +46,13 @@ import org.apache.solr.common.cloud.ZkSt
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.handler.component.ShardRequest;
 import org.apache.solr.handler.component.ShardResponse;
 import org.easymock.Capture;
+import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.eclipse.jetty.util.BlockingArrayQueue;
 import org.junit.After;
@@ -71,12 +76,12 @@ public class OverseerCollectionProcessor
   private OverseerCollectionProcessorToBeTested underTest;
   
   private Thread thread;
-  private Queue<byte[]> queue = new BlockingArrayQueue<byte[]>();
+  private Queue<QueueEvent> queue = new BlockingArrayQueue<QueueEvent>();
   
   private class OverseerCollectionProcessorToBeTested extends
       OverseerCollectionProcessor {
     
-    private boolean lastProcessMessageResult = true;
+    private SolrResponse lastProcessMessageResult;
     
     public OverseerCollectionProcessorToBeTested(ZkStateReader zkStateReader,
         String myId, ShardHandler shardHandler, String adminPath,
@@ -85,7 +90,7 @@ public class OverseerCollectionProcessor
     }
     
     @Override
-    protected boolean processMessage(ZkNodeProps message, String operation) {
+    protected SolrResponse processMessage(ZkNodeProps message, String operation) {
       lastProcessMessageResult = super.processMessage(message, operation);
       return lastProcessMessageResult;
     }
@@ -147,11 +152,12 @@ public class OverseerCollectionProcessor
       }
     }).anyTimes();
     
-    workQueueMock.remove();
+    workQueueMock.remove(anyObject(QueueEvent.class));
     expectLastCall().andAnswer(new IAnswer<Object>() {
       @Override
       public Object answer() throws Throwable {
-        return queue.poll();
+        queue.remove((QueueEvent)EasyMock.getCurrentArguments()[0]);
+        return null;
       }
     }).anyTimes();
     
@@ -273,7 +279,8 @@ public class OverseerCollectionProcessor
           OverseerCollectionProcessor.MAX_SHARDS_PER_NODE,
           maxShardsPerNode.toString());
     }
-    queue.add(ZkStateReader.toJSON(props));
+    QueueEvent qe = new QueueEvent("id", ZkStateReader.toJSON(props), null);
+    queue.add(qe);
   }
   
   protected void verifySubmitCaptures(List<SubmitCapture> submitCaptures,
@@ -443,7 +450,9 @@ public class OverseerCollectionProcessor
     
     waitForEmptyQueue(10000);
     
-    assertEquals(collectionExceptedToBeCreated, underTest.lastProcessMessageResult);
+    if (collectionExceptedToBeCreated) {
+      assertNotNull(underTest.lastProcessMessageResult.getResponse().toString(), underTest.lastProcessMessageResult);
+    }
     verify(shardHandlerMock);
     
     if (collectionExceptedToBeCreated) {

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/SolrResponse.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/SolrResponse.java?rev=1438550&r1=1438549&r2=1438550&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/SolrResponse.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/SolrResponse.java Fri Jan 25 15:14:51 2013
@@ -17,19 +17,47 @@
 
 package org.apache.solr.client.solrj;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.util.NamedList;
 
 
 /**
  * 
- *
+ * 
  * @since solr 1.3
  */
-public abstract class SolrResponse implements Serializable
-{
+public abstract class SolrResponse implements Serializable {
   public abstract long getElapsedTime();
-  public abstract void setResponse(  NamedList<Object> rsp );
+  
+  public abstract void setResponse(NamedList<Object> rsp);
+  
   public abstract NamedList<Object> getResponse();
+  
+  public static byte[] serializable(SolrResponse response) {
+    try {
+      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+      ObjectOutputStream outputStream = new ObjectOutputStream(byteStream);
+      outputStream.writeObject(response);
+      return byteStream.toByteArray();
+    } catch (Exception e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, e);
+    }
+  }
+  
+  public static SolrResponse deserialize(byte[] bytes) {
+    try {
+      ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
+      ObjectInputStream inputStream = new ObjectInputStream(byteStream);
+      return (SolrResponse) inputStream.readObject();
+    } catch (Exception e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, e);
+    }
+  }
 }