You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2013/10/21 20:58:44 UTC
svn commit: r1534320 [37/39] - in /lucene/dev/branches/lucene4956: ./
dev-tools/ dev-tools/idea/.idea/ dev-tools/idea/lucene/expressions/
dev-tools/idea/solr/contrib/velocity/ dev-tools/maven/
dev-tools/maven/lucene/ dev-tools/maven/lucene/expressions/...
Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Mon Oct 21 18:58:24 2013
@@ -30,18 +30,31 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.IsUpdateRequest;
+import org.apache.solr.client.solrj.request.RequestWriter;
+import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -49,7 +62,9 @@ import org.apache.solr.common.cloud.ZkSt
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.KeeperException;
@@ -58,6 +73,10 @@ import org.apache.zookeeper.KeeperExcept
* Instances of this class communicate with Zookeeper to discover
* Solr endpoints for SolrCloud collections, and then use the
* {@link LBHttpSolrServer} to issue requests.
+ *
+ * This class assumes the id field for your documents is called
+ * 'id' - if this is not the case, you must set the right name
+ * with {@link #setIdField(String)}.
*/
public class CloudSolrServer extends SolrServer {
private volatile ZkStateReader zkStateReader;
@@ -65,22 +84,37 @@ public class CloudSolrServer extends Sol
private int zkConnectTimeout = 10000;
private int zkClientTimeout = 10000;
private volatile String defaultCollection;
- private LBHttpSolrServer lbServer;
+ private final LBHttpSolrServer lbServer;
+ private final boolean shutdownLBHttpSolrServer;
private HttpClient myClient;
Random rand = new Random();
- private Object cachLock = new Object();
- // since the state shouldn't change often, should be very cheap reads
- private Map<String,List<String>> urlLists = new HashMap<String,List<String>>();
- private Map<String,List<String>> leaderUrlLists = new HashMap<String,List<String>>();
-
- private Map<String,List<String>> replicasLists = new HashMap<String,List<String>>();
-
- private volatile int lastClusterStateHashCode;
-
private final boolean updatesToLeaders;
+ private boolean parallelUpdates = true;
+ private ExecutorService threadPool = Executors
+ .newCachedThreadPool(new SolrjNamedThreadFactory(
+ "CloudSolrServer ThreadPool"));
+ private String idField = "id";
+ private final Set<String> NON_ROUTABLE_PARAMS;
+ {
+ NON_ROUTABLE_PARAMS = new HashSet<String>();
+ NON_ROUTABLE_PARAMS.add(UpdateParams.EXPUNGE_DELETES);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.MAX_OPTIMIZE_SEGMENTS);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.COMMIT);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.WAIT_SEARCHER);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.OPEN_SEARCHER);
+
+ NON_ROUTABLE_PARAMS.add(UpdateParams.SOFT_COMMIT);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.PREPARE_COMMIT);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.OPTIMIZE);
+
+ // Not supported via SolrCloud
+ // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
+
+ }
+
+
-
/**
* @param zkHost The client endpoint of the zookeeper quorum containing the cloud state,
* in the form HOST:PORT.
@@ -89,15 +123,22 @@ public class CloudSolrServer extends Sol
this.zkHost = zkHost;
this.myClient = HttpClientUtil.createClient(null);
this.lbServer = new LBHttpSolrServer(myClient);
+ this.lbServer.setRequestWriter(new BinaryRequestWriter());
+ this.lbServer.setParser(new BinaryResponseParser());
this.updatesToLeaders = true;
+ shutdownLBHttpSolrServer = true;
}
- public CloudSolrServer(String zkHost, boolean updatesToLeaders) throws MalformedURLException {
+ public CloudSolrServer(String zkHost, boolean updatesToLeaders)
+ throws MalformedURLException {
this.zkHost = zkHost;
this.myClient = HttpClientUtil.createClient(null);
this.lbServer = new LBHttpSolrServer(myClient);
+ this.lbServer.setRequestWriter(new BinaryRequestWriter());
+ this.lbServer.setParser(new BinaryResponseParser());
this.updatesToLeaders = updatesToLeaders;
-}
+ shutdownLBHttpSolrServer = true;
+ }
/**
* @param zkHost The client endpoint of the zookeeper quorum containing the cloud state,
@@ -108,6 +149,7 @@ public class CloudSolrServer extends Sol
this.zkHost = zkHost;
this.lbServer = lbServer;
this.updatesToLeaders = true;
+ shutdownLBHttpSolrServer = false;
}
/**
@@ -120,11 +162,50 @@ public class CloudSolrServer extends Sol
this.zkHost = zkHost;
this.lbServer = lbServer;
this.updatesToLeaders = updatesToLeaders;
+ shutdownLBHttpSolrServer = false;
+ }
+
+ public ResponseParser getParser() {
+ return lbServer.getParser();
+ }
+
+ /**
+ * Note: This setter method is <b>not thread-safe</b>.
+ *
+ * @param processor
+ * Default Response Parser chosen to parse the response if the parser
+ * were not specified as part of the request.
+ * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
+ */
+ public void setParser(ResponseParser processor) {
+ lbServer.setParser(processor);
+ }
+
+ public RequestWriter getRequestWriter() {
+ return lbServer.getRequestWriter();
+ }
+
+ public void setRequestWriter(RequestWriter requestWriter) {
+ lbServer.setRequestWriter(requestWriter);
}
public ZkStateReader getZkStateReader() {
return zkStateReader;
}
+
+ /**
+ * @param idField the field to route documents on.
+ */
+ public void setIdField(String idField) {
+ this.idField = idField;
+ }
+
+ /**
+ * @return the field that updates are routed on.
+ */
+ public String getIdField() {
+ return idField;
+ }
/** Sets the default collection for request */
public void setDefaultCollection(String collection) {
@@ -156,8 +237,8 @@ public class CloudSolrServer extends Sol
synchronized (this) {
if (zkStateReader == null) {
try {
- ZkStateReader zk = new ZkStateReader(zkHost, zkConnectTimeout,
- zkClientTimeout);
+ ZkStateReader zk = new ZkStateReader(zkHost, zkClientTimeout,
+ zkConnectTimeout);
zk.createClusterStateWatchersAndUpdate();
zkStateReader = zk;
} catch (InterruptedException e) {
@@ -179,18 +260,259 @@ public class CloudSolrServer extends Sol
}
}
+ public void setParallelUpdates(boolean parallelUpdates) {
+ this.parallelUpdates = parallelUpdates;
+ }
+
+ private NamedList directUpdate(AbstractUpdateRequest request, ClusterState clusterState) throws SolrServerException {
+ UpdateRequest updateRequest = (UpdateRequest) request;
+ ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
+ ModifiableSolrParams routableParams = new ModifiableSolrParams();
+ ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams();
+
+ if(params != null) {
+ nonRoutableParams.add(params);
+ routableParams.add(params);
+ for(String param : NON_ROUTABLE_PARAMS) {
+ routableParams.remove(param);
+ }
+ }
+
+ String collection = nonRoutableParams.get("collection", defaultCollection);
+ if (collection == null) {
+ throw new SolrServerException("No collection param specified on request and no default collection has been set.");
+ }
+
+
+ //Check to see if the collection is an alias.
+ Aliases aliases = zkStateReader.getAliases();
+ if(aliases != null) {
+ Map<String, String> collectionAliases = aliases.getCollectionAliasMap();
+ if(collectionAliases != null && collectionAliases.containsKey(collection)) {
+ collection = collectionAliases.get(collection);
+ }
+ }
+
+ DocCollection col = clusterState.getCollection(collection);
+
+ DocRouter router = col.getRouter();
+
+ if (router instanceof ImplicitDocRouter) {
+ // short circuit as optimization
+ return null;
+ }
+
+ //Create the URL map, which is keyed on slice name.
+ //The value is a list of URLs for each replica in the slice.
+ //The first value in the list is the leader for the slice.
+ Map<String,List<String>> urlMap = buildUrlMap(col);
+ if (urlMap == null) {
+ // we could not find a leader yet - use unoptimized general path
+ return null;
+ }
+
+ NamedList exceptions = new NamedList();
+ NamedList shardResponses = new NamedList();
+
+ Map<String, LBHttpSolrServer.Req> routes = updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField);
+ if (routes == null) {
+ return null;
+ }
+
+ long start = System.nanoTime();
+
+ if (parallelUpdates) {
+ final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<String, Future<NamedList<?>>>(routes.size());
+ for (final Map.Entry<String, LBHttpSolrServer.Req> entry : routes.entrySet()) {
+ final String url = entry.getKey();
+ final LBHttpSolrServer.Req lbRequest = entry.getValue();
+ responseFutures.put(url, threadPool.submit(new Callable<NamedList<?>>() {
+ @Override
+ public NamedList<?> call() throws Exception {
+ return lbServer.request(lbRequest).getResponse();
+ }
+ }));
+ }
+
+ for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) {
+ final String url = entry.getKey();
+ final Future<NamedList<?>> responseFuture = entry.getValue();
+ try {
+ shardResponses.add(url, responseFuture.get());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ exceptions.add(url, e.getCause());
+ }
+ }
+
+ if (exceptions.size() > 0) {
+ throw new RouteException(ErrorCode.SERVER_ERROR, exceptions, routes);
+ }
+ } else {
+ for (Map.Entry<String, LBHttpSolrServer.Req> entry : routes.entrySet()) {
+ String url = entry.getKey();
+ LBHttpSolrServer.Req lbRequest = entry.getValue();
+ try {
+ NamedList rsp = lbServer.request(lbRequest).getResponse();
+ shardResponses.add(url, rsp);
+ } catch (Exception e) {
+ throw new SolrServerException(e);
+ }
+ }
+ }
+
+ UpdateRequest nonRoutableRequest = null;
+ List<String> deleteQuery = updateRequest.getDeleteQuery();
+ if (deleteQuery != null && deleteQuery.size() > 0) {
+ UpdateRequest deleteQueryRequest = new UpdateRequest();
+ deleteQueryRequest.setDeleteQuery(deleteQuery);
+ nonRoutableRequest = deleteQueryRequest;
+ }
+
+ Set<String> paramNames = nonRoutableParams.getParameterNames();
+
+ Set<String> intersection = new HashSet<String>(paramNames);
+ intersection.retainAll(NON_ROUTABLE_PARAMS);
+
+ if (nonRoutableRequest != null || intersection.size() > 0) {
+ if (nonRoutableRequest == null) {
+ nonRoutableRequest = new UpdateRequest();
+ }
+ nonRoutableRequest.setParams(nonRoutableParams);
+ List<String> urlList = new ArrayList<String>();
+ urlList.addAll(routes.keySet());
+ Collections.shuffle(urlList, rand);
+ LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(nonRoutableRequest, urlList);
+ try {
+ LBHttpSolrServer.Rsp rsp = lbServer.request(req);
+ shardResponses.add(urlList.get(0), rsp.getResponse());
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, urlList.get(0), e);
+ }
+ }
+
+ long end = System.nanoTime();
+
+ RouteResponse rr = condenseResponse(shardResponses, (long)((end - start)/1000000));
+ rr.setRouteResponses(shardResponses);
+ rr.setRoutes(routes);
+ return rr;
+ }
+
+ private Map<String,List<String>> buildUrlMap(DocCollection col) {
+ Map<String, List<String>> urlMap = new HashMap<String, List<String>>();
+ Collection<Slice> slices = col.getActiveSlices();
+ Iterator<Slice> sliceIterator = slices.iterator();
+ while (sliceIterator.hasNext()) {
+ Slice slice = sliceIterator.next();
+ String name = slice.getName();
+ List<String> urls = new ArrayList<String>();
+ Replica leader = slice.getLeader();
+ if (leader == null) {
+ // take unoptimized general path - we cannot find a leader yet
+ return null;
+ }
+ ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
+ String url = zkProps.getBaseUrl() + "/" + col.getName();
+ urls.add(url);
+ Collection<Replica> replicas = slice.getReplicas();
+ Iterator<Replica> replicaIterator = replicas.iterator();
+ while (replicaIterator.hasNext()) {
+ Replica replica = replicaIterator.next();
+ if (!replica.getNodeName().equals(leader.getNodeName()) &&
+ !replica.getName().equals(leader.getName())) {
+ ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
+ String url1 = zkProps1.getBaseUrl() + "/" + col.getName();
+ urls.add(url1);
+ }
+ }
+ urlMap.put(name, urls);
+ }
+ return urlMap;
+ }
+
+ public RouteResponse condenseResponse(NamedList response, long timeMillis) {
+ RouteResponse condensed = new RouteResponse();
+ int status = 0;
+ for(int i=0; i<response.size(); i++) {
+ NamedList shardResponse = (NamedList)response.getVal(i);
+ NamedList header = (NamedList)shardResponse.get("responseHeader");
+ Integer shardStatus = (Integer)header.get("status");
+ int s = shardStatus.intValue();
+ if(s > 0) {
+ status = s;
+ }
+ }
+
+ NamedList cheader = new NamedList();
+ cheader.add("status", status);
+ cheader.add("QTime", timeMillis);
+ condensed.add("responseHeader", cheader);
+ return condensed;
+ }
+
+ class RouteResponse extends NamedList {
+ private NamedList routeResponses;
+ private Map<String, LBHttpSolrServer.Req> routes;
+
+ public void setRouteResponses(NamedList routeResponses) {
+ this.routeResponses = routeResponses;
+ }
+
+ public NamedList getRouteResponses() {
+ return routeResponses;
+ }
+
+ public void setRoutes(Map<String, LBHttpSolrServer.Req> routes) {
+ this.routes = routes;
+ }
+
+ public Map<String, LBHttpSolrServer.Req> getRoutes() {
+ return routes;
+ }
+
+ }
+
+ class RouteException extends SolrException {
+
+ private NamedList exceptions;
+ private Map<String, LBHttpSolrServer.Req> routes;
+
+ public RouteException(ErrorCode errorCode, NamedList exceptions, Map<String, LBHttpSolrServer.Req> routes){
+ super(errorCode, ((Exception)exceptions.getVal(0)).getMessage(), (Exception)exceptions.getVal(0));
+ this.exceptions = exceptions;
+ this.routes = routes;
+ }
+
+ public NamedList getExceptions() {
+ return exceptions;
+ }
+
+ public Map<String, LBHttpSolrServer.Req> getRoutes() {
+ return this.routes;
+ }
+ }
+
@Override
public NamedList<Object> request(SolrRequest request)
throws SolrServerException, IOException {
connect();
- // TODO: if you can hash here, you could favor the shard leader
-
ClusterState clusterState = zkStateReader.getClusterState();
+
boolean sendToLeaders = false;
List<String> replicas = null;
- if (request instanceof IsUpdateRequest && updatesToLeaders) {
+ if (request instanceof IsUpdateRequest) {
+ if (request instanceof UpdateRequest) {
+ NamedList response = directUpdate((AbstractUpdateRequest) request,
+ clusterState);
+ if (response != null) {
+ return response;
+ }
+ }
sendToLeaders = true;
replicas = new ArrayList<String>();
}
@@ -200,13 +522,16 @@ public class CloudSolrServer extends Sol
reqParams = new ModifiableSolrParams();
}
List<String> theUrlList = new ArrayList<String>();
- if (request.getPath().equals("/admin/collections") || request.getPath().equals("/admin/cores")) {
+ if (request.getPath().equals("/admin/collections")
+ || request.getPath().equals("/admin/cores")) {
Set<String> liveNodes = clusterState.getLiveNodes();
for (String liveNode : liveNodes) {
int splitPointBetweenHostPortAndContext = liveNode.indexOf("_");
theUrlList.add("http://"
- + liveNode.substring(0, splitPointBetweenHostPortAndContext) + "/"
- + URLDecoder.decode(liveNode, "UTF-8").substring(splitPointBetweenHostPortAndContext + 1));
+ + liveNode.substring(0, splitPointBetweenHostPortAndContext)
+ + "/"
+ + URLDecoder.decode(liveNode, "UTF-8").substring(
+ splitPointBetweenHostPortAndContext + 1));
}
} else {
String collection = reqParams.get("collection", defaultCollection);
@@ -218,14 +543,15 @@ public class CloudSolrServer extends Sol
Set<String> collectionsList = getCollectionList(clusterState, collection);
if (collectionsList.size() == 0) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Could not find collection: " + collection);
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "Could not find collection: " + collection);
}
collection = collectionsList.iterator().next();
StringBuilder collectionString = new StringBuilder();
Iterator<String> it = collectionsList.iterator();
for (int i = 0; i < collectionsList.size(); i++) {
- String col = it.next();
+ String col = it.next();
collectionString.append(col);
if (i < collectionsList.size() - 1) {
collectionString.append(",");
@@ -240,75 +566,67 @@ public class CloudSolrServer extends Sol
// add it to the Map of slices.
Map<String,Slice> slices = new HashMap<String,Slice>();
for (String collectionName : collectionsList) {
- Collection<Slice> colSlices = clusterState.getActiveSlices(collectionName);
+ Collection<Slice> colSlices = clusterState
+ .getActiveSlices(collectionName);
if (colSlices == null) {
- throw new SolrServerException("Could not find collection:" + collectionName);
+ throw new SolrServerException("Could not find collection:"
+ + collectionName);
}
ClientUtils.addSlices(slices, collectionName, colSlices, true);
}
Set<String> liveNodes = clusterState.getLiveNodes();
- synchronized (cachLock) {
- List<String> leaderUrlList = leaderUrlLists.get(collection);
- List<String> urlList = urlLists.get(collection);
- List<String> replicasList = replicasLists.get(collection);
-
- if ((sendToLeaders && leaderUrlList == null)
- || (!sendToLeaders && urlList == null)
- || clusterState.hashCode() != this.lastClusterStateHashCode) {
- // build a map of unique nodes
- // TODO: allow filtering by group, role, etc
- Map<String,ZkNodeProps> nodes = new HashMap<String,ZkNodeProps>();
- List<String> urlList2 = new ArrayList<String>();
- for (Slice slice : slices.values()) {
- for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
- ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
- String node = coreNodeProps.getNodeName();
- if (!liveNodes.contains(coreNodeProps.getNodeName())
- || !coreNodeProps.getState().equals(ZkStateReader.ACTIVE)) continue;
- if (nodes.put(node, nodeProps) == null) {
- if (!sendToLeaders
- || (sendToLeaders && coreNodeProps.isLeader())) {
- String url = coreNodeProps.getCoreUrl();
- urlList2.add(url);
- } else if (sendToLeaders) {
- String url = coreNodeProps.getCoreUrl();
- replicas.add(url);
- }
- }
+ List<String> leaderUrlList = null;
+ List<String> urlList = null;
+ List<String> replicasList = null;
+
+ // build a map of unique nodes
+ // TODO: allow filtering by group, role, etc
+ Map<String,ZkNodeProps> nodes = new HashMap<String,ZkNodeProps>();
+ List<String> urlList2 = new ArrayList<String>();
+ for (Slice slice : slices.values()) {
+ for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
+ ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
+ String node = coreNodeProps.getNodeName();
+ if (!liveNodes.contains(coreNodeProps.getNodeName())
+ || !coreNodeProps.getState().equals(ZkStateReader.ACTIVE)) continue;
+ if (nodes.put(node, nodeProps) == null) {
+ if (!sendToLeaders || (sendToLeaders && coreNodeProps.isLeader())) {
+ String url = coreNodeProps.getCoreUrl();
+ urlList2.add(url);
+ } else if (sendToLeaders) {
+ String url = coreNodeProps.getCoreUrl();
+ replicas.add(url);
}
}
-
- if (sendToLeaders) {
- this.leaderUrlLists.put(collection, urlList2);
- leaderUrlList = urlList2;
- this.replicasLists.put(collection, replicas);
- replicasList = replicas;
- } else {
- this.urlLists.put(collection, urlList2);
- urlList = urlList2;
- }
- this.lastClusterStateHashCode = clusterState.hashCode();
- }
-
- if (sendToLeaders) {
- theUrlList = new ArrayList<String>(leaderUrlList.size());
- theUrlList.addAll(leaderUrlList);
- } else {
- theUrlList = new ArrayList<String>(urlList.size());
- theUrlList.addAll(urlList);
- }
- Collections.shuffle(theUrlList, rand);
- if (sendToLeaders) {
- ArrayList<String> theReplicas = new ArrayList<String>(
- replicasList.size());
- theReplicas.addAll(replicasList);
- Collections.shuffle(theReplicas, rand);
- // System.out.println("leaders:" + theUrlList);
- // System.out.println("replicas:" + theReplicas);
- theUrlList.addAll(theReplicas);
}
}
+
+ if (sendToLeaders) {
+ leaderUrlList = urlList2;
+ replicasList = replicas;
+ } else {
+ urlList = urlList2;
+ }
+
+ if (sendToLeaders) {
+ theUrlList = new ArrayList<String>(leaderUrlList.size());
+ theUrlList.addAll(leaderUrlList);
+ } else {
+ theUrlList = new ArrayList<String>(urlList.size());
+ theUrlList.addAll(urlList);
+ }
+ Collections.shuffle(theUrlList, rand);
+ if (sendToLeaders) {
+ ArrayList<String> theReplicas = new ArrayList<String>(
+ replicasList.size());
+ theReplicas.addAll(replicasList);
+ Collections.shuffle(theReplicas, rand);
+ // System.out.println("leaders:" + theUrlList);
+ // System.out.println("replicas:" + theReplicas);
+ theUrlList.addAll(theReplicas);
+ }
+
}
// System.out.println("########################## MAKING REQUEST TO " +
@@ -352,9 +670,18 @@ public class CloudSolrServer extends Sol
zkStateReader = null;
}
}
+
+ if (shutdownLBHttpSolrServer) {
+ lbServer.shutdown();
+ }
+
if (myClient!=null) {
myClient.getConnectionManager().shutdown();
}
+
+ if(this.threadPool != null && !this.threadPool.isShutdown()) {
+ this.threadPool.shutdown();
+ }
}
public LBHttpSolrServer getLbServer() {
@@ -365,19 +692,4 @@ public class CloudSolrServer extends Sol
return updatesToLeaders;
}
- // for tests
- Map<String,List<String>> getUrlLists() {
- return urlLists;
- }
-
- //for tests
- Map<String,List<String>> getLeaderUrlLists() {
- return leaderUrlLists;
- }
-
- //for tests
- Map<String,List<String>> getReplicasLists() {
- return replicasLists;
- }
-
}
Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java Mon Oct 21 18:58:24 2013
@@ -44,6 +44,8 @@ import org.apache.solr.client.solrj.Solr
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@@ -72,14 +74,15 @@ public class ConcurrentUpdateSolrServer
.getLogger(ConcurrentUpdateSolrServer.class);
private HttpSolrServer server;
final BlockingQueue<UpdateRequest> queue;
- final ExecutorService scheduler = Executors.newCachedThreadPool(
- new SolrjNamedThreadFactory("concurrentUpdateScheduler"));
+ final ExecutorService scheduler;
final Queue<Runner> runners;
volatile CountDownLatch lock = null; // used to block everything
final int threadCount;
+ boolean shutdownExecutor = false;
+ int pollQueueTime = 250;
/**
- * Uses an internaly managed HttpClient instance.
+ * Uses an internally managed HttpClient instance.
*
* @param solrServerUrl
* The Solr server URL
@@ -91,18 +94,27 @@ public class ConcurrentUpdateSolrServer
public ConcurrentUpdateSolrServer(String solrServerUrl, int queueSize,
int threadCount) {
this(solrServerUrl, null, queueSize, threadCount);
+ shutdownExecutor = true;
+ }
+
+ public ConcurrentUpdateSolrServer(String solrServerUrl,
+ HttpClient client, int queueSize, int threadCount) {
+ this(solrServerUrl, null, queueSize, threadCount, Executors.newCachedThreadPool(
+ new SolrjNamedThreadFactory("concurrentUpdateScheduler")));
+ shutdownExecutor = true;
}
/**
* Uses the supplied HttpClient to send documents to the Solr server.
*/
public ConcurrentUpdateSolrServer(String solrServerUrl,
- HttpClient client, int queueSize, int threadCount) {
+ HttpClient client, int queueSize, int threadCount, ExecutorService es) {
this.server = new HttpSolrServer(solrServerUrl, client);
this.server.setFollowRedirects(false);
queue = new LinkedBlockingQueue<UpdateRequest>(queueSize);
this.threadCount = threadCount;
runners = new LinkedList<Runner>();
+ scheduler = es;
}
/**
@@ -115,8 +127,7 @@ public class ConcurrentUpdateSolrServer
public void run() {
runnerLock.lock();
- // info is ok since this should only happen once for each thread
- log.info("starting runner: {}", this);
+ log.debug("starting runner: {}", this);
HttpPost method = null;
HttpResponse response = null;
try {
@@ -169,14 +180,15 @@ public class ConcurrentUpdateSolrServer
}
}
out.flush();
- req = queue.poll(250, TimeUnit.MILLISECONDS);
+ req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
}
if (isXml) {
out.write("</stream>".getBytes("UTF-8"));
}
} catch (InterruptedException e) {
- e.printStackTrace();
+ Thread.currentThread().interrupt();
+ log.warn("", e);
}
}
});
@@ -196,16 +208,13 @@ public class ConcurrentUpdateSolrServer
response = server.getHttpClient().execute(method);
int statusCode = response.getStatusLine().getStatusCode();
- log.info("Status for: "
- + updateRequest.getDocuments().get(0).getFieldValue("id")
- + " is " + statusCode);
if (statusCode != HttpStatus.SC_OK) {
StringBuilder msg = new StringBuilder();
msg.append(response.getStatusLine().getReasonPhrase());
msg.append("\n\n");
msg.append("\n\n");
msg.append("request: ").append(method.getURI());
- handleError(new Exception(msg.toString()));
+ handleError(new SolrException(ErrorCode.getErrorCode(statusCode), msg.toString()));
}
} finally {
try {
@@ -213,6 +222,7 @@ public class ConcurrentUpdateSolrServer
response.getEntity().getContent().close();
}
} catch (Exception ex) {
+ log.warn("", ex);
}
}
}
@@ -236,7 +246,7 @@ public class ConcurrentUpdateSolrServer
}
}
- log.info("finished: {}", this);
+ log.debug("finished: {}", this);
runnerLock.unlock();
}
}
@@ -357,16 +367,18 @@ public class ConcurrentUpdateSolrServer
@Override
public void shutdown() {
server.shutdown();
- scheduler.shutdown();
- try {
- if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+ if (shutdownExecutor) {
+ scheduler.shutdown();
+ try {
+ if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+ scheduler.shutdownNow();
+ if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log
+ .error("ExecutorService did not terminate");
+ }
+ } catch (InterruptedException ie) {
scheduler.shutdownNow();
- if (!scheduler.awaitTermination(60, TimeUnit.SECONDS))
- log.error("ExecutorService did not terminate");
+ Thread.currentThread().interrupt();
}
- } catch (InterruptedException ie) {
- scheduler.shutdownNow();
- Thread.currentThread().interrupt();
}
}
@@ -384,19 +396,30 @@ public class ConcurrentUpdateSolrServer
public void shutdownNow() {
server.shutdown();
- scheduler.shutdownNow(); // Cancel currently executing tasks
- try {
- if (!scheduler.awaitTermination(30, TimeUnit.SECONDS))
- log.error("ExecutorService did not terminate");
- } catch (InterruptedException ie) {
- scheduler.shutdownNow();
- Thread.currentThread().interrupt();
+ if (shutdownExecutor) {
+ scheduler.shutdownNow(); // Cancel currently executing tasks
+ try {
+ if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) log
+ .error("ExecutorService did not terminate");
+ } catch (InterruptedException ie) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
}
}
public void setParser(ResponseParser responseParser) {
server.setParser(responseParser);
}
+
+
+ /**
+ * @param pollQueueTime time for an open connection to wait for updates when
+ * the queue is empty.
+ */
+ public void setPollQueueTime(int pollQueueTime) {
+ this.pollQueueTime = pollQueueTime;
+ }
public void setRequestWriter(RequestWriter requestWriter) {
server.setRequestWriter(requestWriter);
Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java Mon Oct 21 18:58:24 2013
@@ -100,7 +100,9 @@ public class HttpClientUtil {
*/
public static HttpClient createClient(final SolrParams params) {
final ModifiableSolrParams config = new ModifiableSolrParams(params);
- logger.info("Creating new http client, config:" + config);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Creating new http client, config:" + config);
+ }
final DefaultHttpClient httpClient = new SystemDefaultHttpClient();
configureClient(httpClient, config);
return httpClient;
Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java Mon Oct 21 18:58:24 2013
@@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
@@ -98,14 +99,14 @@ public class HttpSolrServer extends Solr
*
* @see org.apache.solr.client.solrj.impl.BinaryResponseParser
*/
- protected ResponseParser parser;
+ protected volatile ResponseParser parser;
/**
* The RequestWriter used to write all requests to Solr
*
* @see org.apache.solr.client.solrj.request.RequestWriter
*/
- protected RequestWriter requestWriter = new RequestWriter();
+ protected volatile RequestWriter requestWriter = new RequestWriter();
private final HttpClient httpClient;
@@ -265,7 +266,7 @@ public class HttpSolrServer extends Solr
for (ContentStream content : streams) {
String contentType = content.getContentType();
if(contentType==null) {
- contentType = "application/octet-stream"; // default
+ contentType = BinaryResponseParser.BINARY_CONTENT_TYPE; // default
}
String name = content.getName();
if(name==null) {
@@ -359,7 +360,7 @@ public class HttpSolrServer extends Solr
InputStream respBody = null;
boolean shouldClose = true;
-
+ boolean success = false;
try {
// Execute the method.
final HttpResponse response = httpClient.execute(method);
@@ -367,6 +368,13 @@ public class HttpSolrServer extends Solr
// Read the contents
respBody = response.getEntity().getContent();
+ Header ctHeader = response.getLastHeader("content-type");
+ String contentType;
+ if (ctHeader != null) {
+ contentType = ctHeader.getValue();
+ } else {
+ contentType = "";
+ }
// handle some http level checks before trying to parse the response
switch (httpStatus) {
@@ -382,19 +390,46 @@ public class HttpSolrServer extends Solr
}
break;
default:
- throw new RemoteSolrException(httpStatus, "Server at " + getBaseURL()
- + " returned non ok status:" + httpStatus + ", message:"
- + response.getStatusLine().getReasonPhrase(), null);
+ if (processor == null) {
+ throw new RemoteSolrException(httpStatus, "Server at "
+ + getBaseURL() + " returned non ok status:" + httpStatus
+ + ", message:" + response.getStatusLine().getReasonPhrase(),
+ null);
+ }
}
if (processor == null) {
+
// no processor specified, return raw stream
NamedList<Object> rsp = new NamedList<Object>();
rsp.add("stream", respBody);
// Only case where stream should not be closed
shouldClose = false;
+ success = true;
return rsp;
}
+ String procCt = processor.getContentType();
+ if (procCt != null) {
+ if (!contentType.equals(procCt)) {
+ // unexpected content type
+ String msg = "Expected content type " + procCt + " but got " + contentType + ".";
+ Header encodingHeader = response.getEntity().getContentEncoding();
+ String encoding;
+ if (encodingHeader != null) {
+ encoding = encodingHeader.getValue();
+ } else {
+ encoding = "UTF-8"; // try UTF-8
+ }
+ try {
+ msg = msg + " " + IOUtils.toString(respBody, encoding);
+ } catch (IOException e) {
+ throw new RemoteSolrException(httpStatus, "Could not parse response with encoding " + encoding, e);
+ }
+ RemoteSolrException e = new RemoteSolrException(httpStatus, msg, null);
+ throw e;
+ }
+ }
+
// if(true) {
// ByteArrayOutputStream copy = new ByteArrayOutputStream();
// IOUtils.copy(respBody, copy);
@@ -403,8 +438,13 @@ public class HttpSolrServer extends Solr
// respBody = new ByteArrayInputStream(copy.toByteArray());
// }
+ NamedList<Object> rsp = null;
String charset = EntityUtils.getContentCharSet(response.getEntity());
- NamedList<Object> rsp = processor.processResponse(respBody, charset);
+ try {
+ rsp = processor.processResponse(respBody, charset);
+ } catch (Exception e) {
+ throw new RemoteSolrException(httpStatus, e.getMessage(), e);
+ }
if (httpStatus != HttpStatus.SC_OK) {
String reason = null;
try {
@@ -423,6 +463,7 @@ public class HttpSolrServer extends Solr
}
throw new RemoteSolrException(httpStatus, reason, null);
}
+ success = true;
return rsp;
} catch (ConnectException e) {
throw new SolrServerException("Server refused connection at: "
@@ -439,6 +480,9 @@ public class HttpSolrServer extends Solr
try {
respBody.close();
} catch (Throwable t) {} // ignore
+ if (!success) {
+ method.abort();
+ }
}
}
}
Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java Mon Oct 21 18:58:24 2013
@@ -18,6 +18,7 @@ package org.apache.solr.client.solrj.imp
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.*;
+import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
@@ -93,7 +94,8 @@ public class LBHttpSolrServer extends So
private final AtomicInteger counter = new AtomicInteger(-1);
private static final SolrQuery solrQuery = new SolrQuery("*:*");
- private final ResponseParser parser;
+ private volatile ResponseParser parser;
+ private volatile RequestWriter requestWriter;
static {
solrQuery.setRows(0);
@@ -219,11 +221,13 @@ public class LBHttpSolrServer extends So
}
protected HttpSolrServer makeServer(String server) throws MalformedURLException {
- return new HttpSolrServer(server, httpClient, parser);
+ HttpSolrServer s = new HttpSolrServer(server, httpClient, parser);
+ if (requestWriter != null) {
+ s.setRequestWriter(requestWriter);
+ }
+ return s;
}
-
-
/**
* Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.
* If a request fails due to an IOException, the server is moved to the dead pool for a certain period of
@@ -590,6 +594,22 @@ public class LBHttpSolrServer extends So
return httpClient;
}
+ public ResponseParser getParser() {
+ return parser;
+ }
+
+ public void setParser(ResponseParser parser) {
+ this.parser = parser;
+ }
+
+ public void setRequestWriter(RequestWriter requestWriter) {
+ this.requestWriter = requestWriter;
+ }
+
+ public RequestWriter getRequestWriter() {
+ return requestWriter;
+ }
+
@Override
protected void finalize() throws Throwable {
try {
@@ -603,4 +623,5 @@ public class LBHttpSolrServer extends So
// defaults
private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list
+
}
Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/XMLResponseParser.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/XMLResponseParser.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/XMLResponseParser.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/XMLResponseParser.java Mon Oct 21 18:58:24 2013
@@ -32,6 +32,7 @@ import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
+
import java.io.InputStream;
import java.io.Reader;
import java.util.ArrayList;
@@ -46,6 +47,7 @@ import java.util.Locale;
*/
public class XMLResponseParser extends ResponseParser
{
+ public static final String XML_CONTENT_TYPE = "application/xml; charset=UTF-8";
public static Logger log = LoggerFactory.getLogger(XMLResponseParser.class);
private static final XMLErrorLogger xmllog = new XMLErrorLogger(log);
@@ -78,6 +80,11 @@ public class XMLResponseParser extends R
{
return "xml";
}
+
+ @Override
+ public String getContentType() {
+ return XML_CONTENT_TYPE;
+ }
@Override
public NamedList<Object> processResponse(Reader in) {
Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java Mon Oct 21 18:58:24 2013
@@ -59,6 +59,7 @@ public class CoreAdminRequest extends So
private String coreNodeName;
private Boolean loadOnStartup;
private Boolean isTransient;
+ private String collectionConfigName;
public Create() {
action = CoreAdminAction.CREATE;
@@ -76,6 +77,7 @@ public class CoreAdminRequest extends So
public void setCoreNodeName(String coreNodeName) {this.coreNodeName = coreNodeName;}
public void setIsTransient(Boolean isTransient) { this.isTransient = isTransient; }
public void setIsLoadOnStartup(Boolean loadOnStartup) { this.loadOnStartup = loadOnStartup;}
+ public void setCollectionConfigName(String name) { this.collectionConfigName = name;}
public String getInstanceDir() { return instanceDir; }
public String getSchemaName() { return schemaName; }
@@ -88,7 +90,8 @@ public class CoreAdminRequest extends So
public String getCoreNodeName() { return coreNodeName; }
public Boolean getIsLoadOnStartup() { return loadOnStartup; }
public Boolean getIsTransient() { return isTransient; }
-
+ public String getCollectionConfigName() { return collectionConfigName;}
+
@Override
public SolrParams getParams() {
if( action == null ) {
@@ -137,6 +140,11 @@ public class CoreAdminRequest extends So
if (loadOnStartup != null) {
params.set(CoreAdminParams.LOAD_ON_STARTUP, loadOnStartup);
}
+
+ if (collectionConfigName != null) {
+ params.set("collection.configName", collectionConfigName);
+ }
+
return params;
}
Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java Mon Oct 21 18:58:24 2013
@@ -23,6 +23,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
@@ -63,11 +66,14 @@ public class JavaBinUpdateRequestCodec {
if(updateRequest.getDocIterator() != null){
docIter = updateRequest.getDocIterator();
}
+
+ Map<SolrInputDocument,Map<String,Object>> docMap = updateRequest.getDocumentsMap();
nl.add("params", params);// 0: params
- nl.add("delById", updateRequest.getDeleteById());
+ nl.add("delByIdMap", updateRequest.getDeleteByIdMap());
nl.add("delByQ", updateRequest.getDeleteQuery());
nl.add("docs", docIter);
+ nl.add("docsMap", docMap);
JavaBinCodec codec = new JavaBinCodec();
codec.marshal(nl, os);
}
@@ -86,7 +92,9 @@ public class JavaBinUpdateRequestCodec {
public UpdateRequest unmarshal(InputStream is, final StreamingUpdateHandler handler) throws IOException {
final UpdateRequest updateRequest = new UpdateRequest();
List<List<NamedList>> doclist;
+ Map<SolrInputDocument,Map<String,Object>> docMap;
List<String> delById;
+ Map<String,Map<String,Object>> delByIdMap;
List<String> delByQ;
final NamedList[] namedList = new NamedList[1];
JavaBinCodec codec = new JavaBinCodec() {
@@ -158,9 +166,11 @@ public class JavaBinUpdateRequestCodec {
}
}
delById = (List<String>) namedList[0].get("delById");
+ delByIdMap = (Map<String,Map<String,Object>>) namedList[0].get("delByIdMap");
delByQ = (List<String>) namedList[0].get("delByQ");
doclist = (List) namedList[0].get("docs");
-
+ docMap = (Map<SolrInputDocument,Map<String,Object>>) namedList[0].get("docsMap");
+
if (doclist != null && !doclist.isEmpty()) {
List<SolrInputDocument> solrInputDocs = new ArrayList<SolrInputDocument>();
for (Object o : doclist) {
@@ -172,11 +182,36 @@ public class JavaBinUpdateRequestCodec {
}
updateRequest.add(solrInputDocs);
}
+ if (docMap != null && !docMap.isEmpty()) {
+ Set<Entry<SolrInputDocument,Map<String,Object>>> entries = docMap.entrySet();
+ for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
+ Map<String,Object> map = entry.getValue();
+ Boolean overwrite = null;
+ Integer commitWithin = null;
+ if (map != null) {
+ overwrite = (Boolean) map.get(UpdateRequest.OVERWRITE);
+ commitWithin = (Integer) map.get(UpdateRequest.COMMIT_WITHIN);
+ }
+ updateRequest.add(entry.getKey(), commitWithin, overwrite);
+ }
+ }
if (delById != null) {
for (String s : delById) {
updateRequest.deleteById(s);
}
}
+ if (delByIdMap != null) {
+ for (Map.Entry<String,Map<String,Object>> entry : delByIdMap.entrySet()) {
+ Map<String,Object> params = entry.getValue();
+ if (params != null) {
+ Long version = (Long) params.get("ver");
+ updateRequest.deleteById(entry.getKey(), version);
+ } else {
+ updateRequest.deleteById(entry.getKey());
+ }
+
+ }
+ }
if (delByQ != null) {
for (String s : delByQ) {
updateRequest.deleteByQuery(s);
Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java Mon Oct 21 18:58:24 2013
@@ -26,6 +26,7 @@ import java.io.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.nio.charset.Charset;
/**
@@ -52,7 +53,7 @@ public class RequestWriter {
private boolean isEmpty(UpdateRequest updateRequest) {
return isNull(updateRequest.getDocuments()) &&
- isNull(updateRequest.getDeleteById()) &&
+ isNull(updateRequest.getDeleteByIdMap()) &&
isNull(updateRequest.getDeleteQuery()) &&
updateRequest.getDocIterator() == null;
}
@@ -137,4 +138,8 @@ public class RequestWriter {
protected boolean isNull(List l) {
return l == null || l.isEmpty();
}
+
+ protected boolean isNull(Map l) {
+ return l == null || l.isEmpty();
+ }
}
Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java Mon Oct 21 18:58:24 2013
@@ -22,203 +22,412 @@ import java.io.StringWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.LinkedHashMap;
+import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.XML;
/**
*
- *
+ *
* @since solr 1.3
*/
public class UpdateRequest extends AbstractUpdateRequest {
- private List<SolrInputDocument> documents = null;
+ private static final String VER = "ver";
+ public static final String OVERWRITE = "ow";
+ public static final String COMMIT_WITHIN = "cw";
+ private Map<SolrInputDocument,Map<String,Object>> documents = null;
private Iterator<SolrInputDocument> docIterator = null;
- private List<String> deleteById = null;
+ private Map<String,Map<String,Object>> deleteById = null;
private List<String> deleteQuery = null;
-
- public UpdateRequest()
- {
- super( METHOD.POST, "/update" );
+
+ public UpdateRequest() {
+ super(METHOD.POST, "/update");
}
-
+
public UpdateRequest(String url) {
super(METHOD.POST, url);
}
-
- //---------------------------------------------------------------------------
- //---------------------------------------------------------------------------
+
+ // ---------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------
/**
* clear the pending documents and delete commands
*/
- public void clear()
- {
- if( documents != null ) {
+ public void clear() {
+ if (documents != null) {
documents.clear();
}
- if( deleteById != null ) {
+ if (deleteById != null) {
deleteById.clear();
}
- if( deleteQuery != null ) {
+ if (deleteQuery != null) {
deleteQuery.clear();
}
}
- //---------------------------------------------------------------------------
- //---------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------
+
+ public UpdateRequest add(final SolrInputDocument doc) {
+ if (documents == null) {
+ documents = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
+ }
+ documents.put(doc, null);
+ return this;
+ }
+
+ public UpdateRequest add(final SolrInputDocument doc, Boolean overwrite) {
+ return add(doc, null, overwrite);
+ }
+
+ public UpdateRequest add(final SolrInputDocument doc, Integer commitWithin) {
+ return add(doc, commitWithin, null);
+ }
- public UpdateRequest add( final SolrInputDocument doc )
- {
- if( documents == null ) {
- documents = new ArrayList<SolrInputDocument>( 2 );
+ public UpdateRequest add(final SolrInputDocument doc, Integer commitWithin,
+ Boolean overwrite) {
+ if (documents == null) {
+ documents = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
}
- documents.add( doc );
+ Map<String,Object> params = new HashMap<String,Object>(2);
+ if (commitWithin != null) params.put(COMMIT_WITHIN, commitWithin);
+ if (overwrite != null) params.put(OVERWRITE, overwrite);
+
+ documents.put(doc, params);
+
return this;
}
- public UpdateRequest add( final Collection<SolrInputDocument> docs )
- {
- if( documents == null ) {
- documents = new ArrayList<SolrInputDocument>( docs.size()+1 );
+ public UpdateRequest add(final Collection<SolrInputDocument> docs) {
+ if (documents == null) {
+ documents = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
+ }
+ for (SolrInputDocument doc : docs) {
+ documents.put(doc, null);
}
- documents.addAll( docs );
return this;
}
- public UpdateRequest deleteById( String id )
- {
- if( deleteById == null ) {
- deleteById = new ArrayList<String>();
+ public UpdateRequest deleteById(String id) {
+ if (deleteById == null) {
+ deleteById = new LinkedHashMap<String,Map<String,Object>>();
}
- deleteById.add( id );
+ deleteById.put(id, null);
return this;
}
- public UpdateRequest deleteById( List<String> ids )
- {
- if( deleteById == null ) {
- deleteById = new ArrayList<String>(ids);
- } else {
- deleteById.addAll(ids);
+
+ public UpdateRequest deleteById(List<String> ids) {
+ if (deleteById == null) {
+ deleteById = new LinkedHashMap<String,Map<String,Object>>();
}
+
+ for (String id : ids) {
+ deleteById.put(id, null);
+ }
+
return this;
}
- public UpdateRequest deleteByQuery( String q )
- {
- if( deleteQuery == null ) {
+ public UpdateRequest deleteById(String id, Long version) {
+ if (deleteById == null) {
+ deleteById = new LinkedHashMap<String,Map<String,Object>>();
+ }
+ Map<String,Object> params = new HashMap<String,Object>(1);
+ params.put(VER, version);
+ deleteById.put(id, params);
+ return this;
+ }
+
+ public UpdateRequest deleteByQuery(String q) {
+ if (deleteQuery == null) {
deleteQuery = new ArrayList<String>();
}
- deleteQuery.add( q );
+ deleteQuery.add(q);
return this;
}
+
+ /**
+ * @param router to route updates with
+ * @param col DocCollection for the updates
+ * @param urlMap of the cluster
+ * @param params params to use
+ * @param idField the id field
+ * @return a Map of urls to requests
+ */
+ public Map<String,LBHttpSolrServer.Req> getRoutes(DocRouter router,
+ DocCollection col, Map<String,List<String>> urlMap,
+ ModifiableSolrParams params, String idField) {
+
+ if ((documents == null || documents.size() == 0)
+ && (deleteById == null || deleteById.size() == 0)) {
+ return null;
+ }
+
+ Map<String,LBHttpSolrServer.Req> routes = new HashMap<String,LBHttpSolrServer.Req>();
+ if (documents != null) {
+ Set<Entry<SolrInputDocument,Map<String,Object>>> entries = documents.entrySet();
+ for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
+ SolrInputDocument doc = entry.getKey();
+ Object id = doc.getFieldValue(idField);
+ if (id == null) {
+ return null;
+ }
+ Slice slice = router.getTargetSlice(id
+ .toString(), doc, null, col);
+ if (slice == null) {
+ return null;
+ }
+ List<String> urls = urlMap.get(slice.getName());
+ String leaderUrl = urls.get(0);
+ LBHttpSolrServer.Req request = (LBHttpSolrServer.Req) routes
+ .get(leaderUrl);
+ if (request == null) {
+ UpdateRequest updateRequest = new UpdateRequest();
+ updateRequest.setMethod(getMethod());
+ updateRequest.setCommitWithin(getCommitWithin());
+ updateRequest.setParams(params);
+ updateRequest.setPath(getPath());
+ request = new LBHttpSolrServer.Req(updateRequest, urls);
+ routes.put(leaderUrl, request);
+ }
+ UpdateRequest urequest = (UpdateRequest) request.getRequest();
+ urequest.add(doc);
+ }
+ }
+
+ // Route the deleteById's
+
+ if (deleteById != null) {
+
+ Iterator<Map.Entry<String,Map<String,Object>>> entries = deleteById.entrySet()
+ .iterator();
+ while (entries.hasNext()) {
+
+ Map.Entry<String,Map<String,Object>> entry = entries.next();
+
+ String deleteId = entry.getKey();
+ Map<String,Object> map = entry.getValue();
+ Long version = null;
+ if (map != null) {
+ version = (Long) map.get(VER);
+ }
+ Slice slice = router.getTargetSlice(deleteId, null, null, col);
+ if (slice == null) {
+ return null;
+ }
+ List<String> urls = urlMap.get(slice.getName());
+ String leaderUrl = urls.get(0);
+ LBHttpSolrServer.Req request = routes.get(leaderUrl);
+ if (request != null) {
+ UpdateRequest urequest = (UpdateRequest) request.getRequest();
+ urequest.deleteById(deleteId, version);
+ } else {
+ UpdateRequest urequest = new UpdateRequest();
+ urequest.deleteById(deleteId, version);
+ request = new LBHttpSolrServer.Req(urequest, urls);
+ routes.put(leaderUrl, request);
+ }
+ }
+ }
+ return routes;
+ }
+
public void setDocIterator(Iterator<SolrInputDocument> docIterator) {
this.docIterator = docIterator;
}
-
- //--------------------------------------------------------------------------
- //--------------------------------------------------------------------------
-
+
+ public void setDeleteQuery(List<String> deleteQuery) {
+ this.deleteQuery = deleteQuery;
+ }
+
+ // --------------------------------------------------------------------------
+ // --------------------------------------------------------------------------
+
@Override
public Collection<ContentStream> getContentStreams() throws IOException {
- return ClientUtils.toContentStreams( getXML(), ClientUtils.TEXT_XML );
+ return ClientUtils.toContentStreams(getXML(), ClientUtils.TEXT_XML);
}
-
+
public String getXML() throws IOException {
StringWriter writer = new StringWriter();
- writeXML( writer );
+ writeXML(writer);
writer.flush();
-
+
// If action is COMMIT or OPTIMIZE, it is sent with params
String xml = writer.toString();
- //System.out.println( "SEND:"+xml );
+ // System.out.println( "SEND:"+xml );
return (xml.length() > 0) ? xml : null;
}
+ private List<Map<SolrInputDocument,Map<String,Object>>> getDocLists(Map<SolrInputDocument,Map<String,Object>> documents) {
+ List<Map<SolrInputDocument,Map<String,Object>>> docLists = new ArrayList<Map<SolrInputDocument,Map<String,Object>>>();
+ Map<SolrInputDocument,Map<String,Object>> docList = null;
+ if (this.documents != null) {
+
+ Boolean lastOverwrite = true;
+ Integer lastCommitWithin = -1;
+
+ Set<Entry<SolrInputDocument,Map<String,Object>>> entries = this.documents
+ .entrySet();
+ for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
+ Map<String,Object> map = entry.getValue();
+ Boolean overwrite = null;
+ Integer commitWithin = null;
+ if (map != null) {
+ overwrite = (Boolean) entry.getValue().get(OVERWRITE);
+ commitWithin = (Integer) entry.getValue().get(COMMIT_WITHIN);
+ }
+ if (overwrite != lastOverwrite || commitWithin != lastCommitWithin
+ || docLists.size() == 0) {
+ docList = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
+ docLists.add(docList);
+ }
+ docList.put(entry.getKey(), entry.getValue());
+ lastCommitWithin = commitWithin;
+ lastOverwrite = overwrite;
+ }
+ }
+
+ if (docIterator != null) {
+ docList = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
+ docLists.add(docList);
+ while (docIterator.hasNext()) {
+ SolrInputDocument doc = docIterator.next();
+ if (doc != null) {
+ docList.put(doc, null);
+ }
+ }
+
+ }
+
+ return docLists;
+ }
+
/**
* @since solr 1.4
*/
- public void writeXML( Writer writer ) throws IOException {
- if( (documents != null && documents.size() > 0) || docIterator != null) {
- if( commitWithin > 0 ) {
- writer.write("<add commitWithin=\""+commitWithin+"\">");
- }
- else {
- writer.write("<add>");
- }
- if(documents != null) {
- for (SolrInputDocument doc : documents) {
- if (doc != null) {
- ClientUtils.writeXML(doc, writer);
- }
+ public void writeXML(Writer writer) throws IOException {
+ List<Map<SolrInputDocument,Map<String,Object>>> getDocLists = getDocLists(documents);
+
+ for (Map<SolrInputDocument,Map<String,Object>> docs : getDocLists) {
+
+ if ((docs != null && docs.size() > 0)) {
+ Entry<SolrInputDocument,Map<String,Object>> firstDoc = docs.entrySet()
+ .iterator().next();
+ Map<String,Object> map = firstDoc.getValue();
+ Integer cw = null;
+ Boolean ow = null;
+ if (map != null) {
+ cw = (Integer) firstDoc.getValue().get(COMMIT_WITHIN);
+ ow = (Boolean) firstDoc.getValue().get(OVERWRITE);
}
- }
- if (docIterator != null) {
- while (docIterator.hasNext()) {
- SolrInputDocument doc = docIterator.next();
- if (doc != null) {
- ClientUtils.writeXML(doc, writer);
- }
+ if (ow == null) ow = true;
+ int commitWithin = (cw != null && cw != -1) ? cw : this.commitWithin;
+ boolean overwrite = ow;
+ if (commitWithin > -1 || overwrite != true) {
+ writer.write("<add commitWithin=\"" + commitWithin + "\" "
+ + "overwrite=\"" + overwrite + "\">");
+ } else {
+ writer.write("<add>");
+ }
+
+ Set<Entry<SolrInputDocument,Map<String,Object>>> entries = docs
+ .entrySet();
+ for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
+ ClientUtils.writeXML(entry.getKey(), writer);
}
+
+ writer.write("</add>");
}
- writer.write("</add>");
}
// Add the delete commands
boolean deleteI = deleteById != null && deleteById.size() > 0;
boolean deleteQ = deleteQuery != null && deleteQuery.size() > 0;
- if( deleteI || deleteQ ) {
- if(commitWithin>0) {
- writer.append( "<delete commitWithin=\"" + commitWithin + "\">" );
+ if (deleteI || deleteQ) {
+ if (commitWithin > 0) {
+ writer.append("<delete commitWithin=\"" + commitWithin + "\">");
} else {
- writer.append( "<delete>" );
+ writer.append("<delete>");
}
- if( deleteI ) {
- for( String id : deleteById ) {
- writer.append( "<id>" );
- XML.escapeCharData( id, writer );
- writer.append( "</id>" );
+ if (deleteI) {
+ for (Map.Entry<String,Map<String,Object>> entry : deleteById.entrySet()) {
+ writer.append("<id");
+ Map<String,Object> map = entry.getValue();
+ if (map != null) {
+ Long version = (Long) map.get(VER);
+ if (version != null) {
+ writer.append(" version=\"" + version + "\"");
+ }
+ }
+ writer.append(">");
+
+ XML.escapeCharData(entry.getKey(), writer);
+ writer.append("</id>");
}
}
- if( deleteQ ) {
- for( String q : deleteQuery ) {
- writer.append( "<query>" );
- XML.escapeCharData( q, writer );
- writer.append( "</query>" );
+ if (deleteQ) {
+ for (String q : deleteQuery) {
+ writer.append("<query>");
+ XML.escapeCharData(q, writer);
+ writer.append("</query>");
}
}
- writer.append( "</delete>" );
+ writer.append("</delete>");
}
}
-
-
- //--------------------------------------------------------------------------
- //--------------------------------------------------------------------------
-
- //--------------------------------------------------------------------------
- //
- //--------------------------------------------------------------------------
-
+
+ // --------------------------------------------------------------------------
+ // --------------------------------------------------------------------------
+
+ // --------------------------------------------------------------------------
+ //
+ // --------------------------------------------------------------------------
+
public List<SolrInputDocument> getDocuments() {
+ if (documents == null) return null;
+ List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(documents.size());
+ docs.addAll(documents.keySet());
+ return docs;
+ }
+
+ public Map<SolrInputDocument,Map<String,Object>> getDocumentsMap() {
return documents;
}
-
+
public Iterator<SolrInputDocument> getDocIterator() {
return docIterator;
}
-
+
public List<String> getDeleteById() {
+ if (deleteById == null) return null;
+ List<String> deletes = new ArrayList<String>(deleteById.keySet());
+ return deletes;
+ }
+
+ public Map<String,Map<String,Object>> getDeleteByIdMap() {
return deleteById;
}
-
+
public List<String> getDeleteQuery() {
return deleteQuery;
}
-
+
}
Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/util/ClientUtils.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/util/ClientUtils.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/util/ClientUtils.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/util/ClientUtils.java Mon Oct 21 18:58:24 2013
@@ -133,9 +133,11 @@ public class ClientUtils
}
}
}
-
- for (SolrInputDocument childDocument : doc.getChildDocuments()) {
- writeXML(childDocument, writer);
+
+ if (doc.hasChildDocuments()) {
+ for (SolrInputDocument childDocument : doc.getChildDocuments()) {
+ writeXML(childDocument, writer);
+ }
}
writer.write("</doc>");
Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/SolrInputDocument.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/SolrInputDocument.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/SolrInputDocument.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/SolrInputDocument.java Mon Oct 21 18:58:24 2013
@@ -43,12 +43,10 @@ public class SolrInputDocument implement
public SolrInputDocument() {
_fields = new LinkedHashMap<String,SolrInputField>();
- _childDocuments = new ArrayList<SolrInputDocument>();
}
public SolrInputDocument(Map<String,SolrInputField> fields) {
_fields = fields;
- _childDocuments = new ArrayList<SolrInputDocument>();
}
/**
@@ -60,9 +58,7 @@ public class SolrInputDocument implement
if( _fields != null ) {
_fields.clear();
}
- if (_childDocuments != null) {
- _childDocuments.clear();
- }
+ _childDocuments = null;
}
///////////////////////////////////////////////////////////////////
@@ -198,7 +194,9 @@ public class SolrInputDocument implement
@Override
public String toString()
{
- return "SolrInputDocument(fields: " + _fields.values() + ", childs: " + _childDocuments + ")";
+ return "SolrInputDocument(fields: " + _fields.values()
+ + ( _childDocuments == null ? "" : (", children: " + _childDocuments) )
+ + ")";
}
public SolrInputDocument deepCopy() {
@@ -208,11 +206,13 @@ public class SolrInputDocument implement
clone._fields.put(fieldEntry.getKey(), fieldEntry.getValue().deepCopy());
}
clone._documentBoost = _documentBoost;
-
- clone._childDocuments = new ArrayList<SolrInputDocument>(_childDocuments.size());
- for (SolrInputDocument child : _childDocuments) {
- clone._childDocuments.add(child.deepCopy());
- }
+
+ if (_childDocuments != null) {
+ clone._childDocuments = new ArrayList<SolrInputDocument>(_childDocuments.size());
+ for (SolrInputDocument child : _childDocuments) {
+ clone._childDocuments.add(child.deepCopy());
+ }
+ }
return clone;
}
@@ -277,6 +277,9 @@ public class SolrInputDocument implement
}
public void addChildDocument(SolrInputDocument child) {
+ if (_childDocuments == null) {
+ _childDocuments = new ArrayList<SolrInputDocument>();
+ }
_childDocuments.add(child);
}
@@ -285,7 +288,8 @@ public class SolrInputDocument implement
addChildDocument(child);
}
}
-
+
+ /** Returns the list of child documents, or null if none. */
public List<SolrInputDocument> getChildDocuments() {
return _childDocuments;
}
Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java Mon Oct 21 18:58:24 2013
@@ -260,7 +260,18 @@ public class ClusterState implements JSO
objs.remove(DocCollection.SHARDS);
}
- DocRouter router = DocRouter.getDocRouter(props.get(DocCollection.DOC_ROUTER));
+ Object routerObj = props.get(DocCollection.DOC_ROUTER);
+ DocRouter router;
+ if (routerObj == null) {
+ router = DocRouter.DEFAULT;
+ } else if (routerObj instanceof String) {
+ // back compat with Solr4.4
+ router = DocRouter.getDocRouter((String)routerObj);
+ } else {
+ Map routerProps = (Map)routerObj;
+ router = DocRouter.getDocRouter(routerProps.get("name"));
+ }
+
return new DocCollection(name, slices, props, router);
}
Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java Mon Oct 21 18:58:24 2013
@@ -17,6 +17,7 @@ package org.apache.solr.common.cloud;
* limitations under the License.
*/
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.Hash;
@@ -33,10 +34,10 @@ import java.util.List;
public class CompositeIdRouter extends HashBasedRouter {
public static final String NAME = "compositeId";
- private int separator = '!';
+ public static final int separator = '!';
// separator used to optionally specify number of bits to allocate toward first part.
- private int bitsSeparator = '/';
+ public static final int bitsSeparator = '/';
private int bits = 16;
private int mask1 = 0xffff0000;
private int mask2 = 0x0000ffff;
@@ -59,17 +60,23 @@ public class CompositeIdRouter extends H
}
@Override
- public int sliceHash(String id, SolrInputDocument doc, SolrParams params) {
+ public int sliceHash(String id, SolrInputDocument doc, SolrParams params, DocCollection collection) {
+ String shardFieldName = getRouteField(collection);
+ if (shardFieldName != null && doc != null) {
+ Object o = doc.getFieldValue(shardFieldName);
+ if (o == null)
+ throw new SolrException (SolrException.ErrorCode.BAD_REQUEST, "No value for :"+shardFieldName + ". Unable to identify shard");
+ id = o.toString();
+ }
int idx = id.indexOf(separator);
if (idx < 0) {
return Hash.murmurhash3_x86_32(id, 0, id.length(), 0);
}
-
+ String part1 = id.substring(0, idx);
+ int commaIdx = part1.indexOf(bitsSeparator);
int m1 = mask1;
int m2 = mask2;
- String part1 = id.substring(0,idx);
- int commaIdx = part1.indexOf(bitsSeparator);
if (commaIdx > 0) {
int firstBits = getBits(part1, commaIdx);
if (firstBits >= 0) {
@@ -86,6 +93,32 @@ public class CompositeIdRouter extends H
return (hash1 & m1) | (hash2 & m2);
}
+ public Range keyHashRange(String routeKey) {
+ int idx = routeKey.indexOf(separator);
+ if (idx < 0) {
+ int hash = sliceHash(routeKey, null, null, null);
+ return new Range(hash, hash);
+ }
+ String part1 = routeKey.substring(0, idx);
+ int commaIdx = part1.indexOf(bitsSeparator);
+ int m1 = mask1;
+ int m2 = mask2;
+
+ if (commaIdx > 0) {
+ int firstBits = getBits(part1, commaIdx);
+ if (firstBits >= 0) {
+ m1 = firstBits==0 ? 0 : (-1 << (32-firstBits));
+ m2 = firstBits==32 ? 0 : (-1 >>> firstBits);
+ part1 = part1.substring(0, commaIdx);
+ }
+ }
+
+ int hash = Hash.murmurhash3_x86_32(part1, 0, part1.length(), 0);
+ int min = hash & m1;
+ int max = min | m2;
+ return new Range(min, max);
+ }
+
@Override
public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) {
if (shardKey == null) {
@@ -142,6 +175,27 @@ public class CompositeIdRouter extends H
return targetSlices;
}
+ public List<Range> partitionRangeByKey(String key, Range range) {
+ List<Range> result = new ArrayList<Range>(3);
+ Range keyRange = keyHashRange(key);
+ if (!keyRange.overlaps(range)) {
+ throw new IllegalArgumentException("Key range does not overlap given range");
+ }
+ if (keyRange.equals(range)) {
+ return Collections.singletonList(keyRange);
+ } else if (keyRange.isSubsetOf(range)) {
+ result.add(new Range(range.min, keyRange.min - 1));
+ result.add(keyRange);
+ result.add((new Range(keyRange.max + 1, range.max)));
+ } else if (range.includes(keyRange.max)) {
+ result.add(new Range(range.min, keyRange.max));
+ result.add(new Range(keyRange.max + 1, range.max));
+ } else {
+ result.add(new Range(range.min, keyRange.min - 1));
+ result.add(new Range(keyRange.min, range.max));
+ }
+ return result;
+ }
@Override
public List<Range> partitionRange(int partitions, Range range) {
Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java Mon Oct 21 18:58:24 2013
@@ -37,16 +37,14 @@ class ConnectionManager implements Watch
private boolean connected;
private final ZkClientConnectionStrategy connectionStrategy;
-
- private Object connectionUpdateLock = new Object();
- private String zkServerAddress;
+ private final String zkServerAddress;
- private int zkClientTimeout;
+ private final int zkClientTimeout;
- private SolrZkClient client;
+ private final SolrZkClient client;
- private OnReconnect onReconnect;
+ private final OnReconnect onReconnect;
private volatile boolean isClosed = false;
@@ -92,37 +90,35 @@ class ConnectionManager implements Watch
new ZkClientConnectionStrategy.ZkUpdate() {
@Override
public void update(SolrZooKeeper keeper) {
- // if keeper does not replace oldKeeper we must be sure to close it
- synchronized (connectionUpdateLock) {
- try {
- waitForConnected(Long.MAX_VALUE);
- } catch (Exception e1) {
- closeKeeper(keeper);
- throw new RuntimeException(e1);
- }
- log.info("Connection with ZooKeeper reestablished.");
- try {
- client.updateKeeper(keeper);
- } catch (InterruptedException e) {
- closeKeeper(keeper);
- Thread.currentThread().interrupt();
- // we must have been asked to stop
- throw new RuntimeException(e);
- } catch(Throwable t) {
- closeKeeper(keeper);
- throw new RuntimeException(t);
- }
-
- if (onReconnect != null) {
- onReconnect.command();
- }
- synchronized (ConnectionManager.this) {
- ConnectionManager.this.connected = true;
- }
+ try {
+ waitForConnected(Long.MAX_VALUE);
+ } catch (Exception e1) {
+ closeKeeper(keeper);
+ throw new RuntimeException(e1);
+ }
+
+ log.info("Connection with ZooKeeper reestablished.");
+ try {
+ client.updateKeeper(keeper);
+ } catch (InterruptedException e) {
+ closeKeeper(keeper);
+ Thread.currentThread().interrupt();
+ // we must have been asked to stop
+ throw new RuntimeException(e);
+ } catch (Throwable t) {
+ closeKeeper(keeper);
+ throw new RuntimeException(t);
+ }
+
+ if (onReconnect != null) {
+ onReconnect.command();
+ }
+
+ synchronized (ConnectionManager.this) {
+ ConnectionManager.this.connected = true;
}
}
-
});
} catch (Exception e) {
SolrException.log(log, "", e);
Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java Mon Oct 21 18:58:24 2013
@@ -34,21 +34,40 @@ public class DefaultConnectionStrategy e
@Override
public void connect(String serverAddress, int timeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException {
- updater.update(new SolrZooKeeper(serverAddress, timeout, watcher));
+ SolrZooKeeper zk = new SolrZooKeeper(serverAddress, timeout, watcher);
+ boolean success = false;
+ try {
+ updater.update(zk);
+ success = true;
+ } finally {
+ if (!success) {
+ zk.close();
+ }
+ }
}
@Override
public void reconnect(final String serverAddress, final int zkClientTimeout,
final Watcher watcher, final ZkUpdate updater) throws IOException {
log.info("Connection expired - starting a new one...");
-
+ SolrZooKeeper zk = new SolrZooKeeper(serverAddress, zkClientTimeout, watcher);
+ boolean success = false;
try {
updater
- .update(new SolrZooKeeper(serverAddress, zkClientTimeout, watcher));
+ .update(zk);
+ success = true;
log.info("Reconnected to ZooKeeper");
} catch (Exception e) {
SolrException.log(log, "Reconnect to ZooKeeper failed", e);
log.info("Reconnect to ZooKeeper failed");
+ } finally {
+ if (!success) {
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
}
Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java Mon Oct 21 18:58:24 2013
@@ -45,7 +45,7 @@ public class DocCollection extends ZkNod
* @param props The properties of the slice. This is used directly and a copy is not made.
*/
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
- super( props==null ? Collections.<String,Object>emptyMap() : props);
+ super( props==null ? props = new HashMap<String,Object>() : props);
this.name = name;
this.slices = slices;