You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/25 20:49:30 UTC
svn commit: r1235888 [10/12] - in /lucene/dev/trunk: dev-tools/eclipse/
dev-tools/maven/ solr/ solr/cloud-dev/
solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/
solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/d...
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,167 @@
+package org.apache.solr.update;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.BaseDistributedSearchTestCase;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.update.SolrCmdDistributor.Node;
+import org.apache.solr.update.SolrCmdDistributor.Response;
+import org.apache.solr.update.SolrCmdDistributor.StdNode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class PeerSyncTest extends BaseDistributedSearchTestCase {
+ private static int numVersions = 100; // number of versions to use when syncing
+ private ModifiableSolrParams seenLeader = params("leader","true");
+
+ public PeerSyncTest() {
+ fixShardCount = true;
+ shardCount = 3;
+ stress = 0;
+
+ // TODO: a better way to do this?
+ configString = "solrconfig-tlog.xml";
+ schemaString = "schema.xml";
+ }
+
+
+ @Override
+ public void doTest() throws Exception {
+ handle.clear();
+ handle.put("QTime", SKIPVAL);
+ handle.put("timestamp", SKIPVAL);
+ handle.put("score", SKIPVAL);
+ handle.put("maxScore", SKIPVAL);
+
+ SolrServer client0 = clients.get(0);
+ SolrServer client1 = clients.get(1);
+ SolrServer client2 = clients.get(2);
+
+ long v = 0;
+ add(client0, seenLeader, sdoc("id","1","_version_",++v));
+
+ // this fails because client0 has no context (i.e. no updates of it's own to judge if applying the updates
+ // from client1 will bring it into sync with client1)
+ assertSync(client1, numVersions, false, shardsArr[0]);
+
+ // bring client1 back into sync with client0 by adding the doc
+ add(client1, seenLeader, sdoc("id","1","_version_",v));
+
+ // both have the same version list, so sync should now return true
+ assertSync(client1, numVersions, true, shardsArr[0]);
+ // TODO: test that updates weren't necessary
+
+ client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*"), client0, client1);
+
+ add(client0, seenLeader, addRandFields(sdoc("id","2","_version_",++v)));
+
+ // now client1 has the context to sync
+ assertSync(client1, numVersions, true, shardsArr[0]);
+
+ client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*"), client0, client1);
+
+ add(client0, seenLeader, addRandFields(sdoc("id","3","_version_",++v)));
+ add(client0, seenLeader, addRandFields(sdoc("id","4","_version_",++v)));
+ add(client0, seenLeader, addRandFields(sdoc("id","5","_version_",++v)));
+ add(client0, seenLeader, addRandFields(sdoc("id","6","_version_",++v)));
+ add(client0, seenLeader, addRandFields(sdoc("id","7","_version_",++v)));
+ add(client0, seenLeader, addRandFields(sdoc("id","8","_version_",++v)));
+ add(client0, seenLeader, addRandFields(sdoc("id","9","_version_",++v)));
+ add(client0, seenLeader, addRandFields(sdoc("id","10","_version_",++v)));
+
+ assertSync(client1, numVersions, true, shardsArr[0]);
+
+ client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*"), client0, client1);
+
+ int toAdd = (int)(numVersions *.95);
+ for (int i=0; i<toAdd; i++) {
+ add(client0, seenLeader, sdoc("id",Integer.toString(i+11),"_version_",v+i+1));
+ }
+
+ // sync should fail since there's not enough overlap to give us confidence
+ assertSync(client1, numVersions, false, shardsArr[0]);
+
+ // add some of the docs that were missing... just enough to give enough overlap
+ int toAdd2 = (int)(numVersions * .25);
+ for (int i=0; i<toAdd2; i++) {
+ add(client1, seenLeader, sdoc("id",Integer.toString(i+11),"_version_",v+i+1));
+ }
+
+ assertSync(client1, numVersions, true, shardsArr[0]);
+ client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
+
+
+
+ // test delete and deleteByQuery
+ v=1000;
+ add(client0, seenLeader, sdoc("id","1000","_version_",++v));
+ add(client0, seenLeader, sdoc("id","1001","_version_",++v));
+ delQ(client0, params("leader","true","_version_",Long.toString(-++v)), "id:1001 OR id:1002");
+ add(client0, seenLeader, sdoc("id","1002","_version_",++v));
+ del(client0, params("leader","true","_version_",Long.toString(-++v)), "1000");
+
+ assertSync(client1, numVersions, true, shardsArr[0]);
+ client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
+
+ // test that delete by query is returned even if not requested, and that it doesn't delete newer stuff than it should
+ v=2000;
+ SolrServer client = client0;
+ add(client, seenLeader, sdoc("id","2000","_version_",++v));
+ add(client, seenLeader, sdoc("id","2001","_version_",++v));
+ delQ(client, params("leader","true","_version_",Long.toString(-++v)), "id:2001 OR id:2002");
+ add(client, seenLeader, sdoc("id","2002","_version_",++v));
+ del(client, params("leader","true","_version_",Long.toString(-++v)), "2000");
+
+ v=2000;
+ client = client1;
+ add(client, seenLeader, sdoc("id","2000","_version_",++v));
+ ++v; // pretend we missed the add of 2001. peersync should retrieve it, but should also retrieve any deleteByQuery objects after it
+ // add(client, seenLeader, sdoc("id","2001","_version_",++v));
+ delQ(client, params("leader","true","_version_",Long.toString(-++v)), "id:2001 OR id:2002");
+ add(client, seenLeader, sdoc("id","2002","_version_",++v));
+ del(client, params("leader","true","_version_",Long.toString(-++v)), "2000");
+
+ // assertSync(client1, numVersions, true, shardsArr[0]);
+ client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
+ }
+
+
+ void assertSync(SolrServer server, int numVersions, boolean expectedResult, String... syncWith) throws IOException, SolrServerException {
+ QueryRequest qr = new QueryRequest(params("qt","/get", "getVersions",Integer.toString(numVersions), "sync", StrUtils.join(Arrays.asList(syncWith), ',')));
+ NamedList rsp = server.request(qr);
+ assertEquals(expectedResult, (Boolean) rsp.get("sync"));
+ }
+
+}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SoftAutoCommitTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SoftAutoCommitTest.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SoftAutoCommitTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SoftAutoCommitTest.java Wed Jan 25 19:49:26 2012
@@ -96,8 +96,8 @@ public class SoftAutoCommitTest extends
assertNotNull("soft529 wasn't fast enough", soft529);
monitor.assertSaneOffers();
- // check for the searcher, should have happend right after soft commit
- Long searcher529 = monitor.searcher.poll(softCommitWaitMillis, MILLISECONDS);
+ // check for the searcher, should have happened right after soft commit
+ Long searcher529 = monitor.searcher.poll(softCommitWaitMillis * 3, MILLISECONDS);
assertNotNull("searcher529 wasn't fast enough", searcher529);
monitor.assertSaneOffers();
@@ -118,7 +118,7 @@ public class SoftAutoCommitTest extends
// however slow the machine was to do the soft commit compared to expected,
// assume newSearcher had some magnitude of that much overhead as well
- long slowTestFudge = Math.max(100, 6 * (soft529 - add529 - softCommitWaitMillis));
+ long slowTestFudge = Math.max(200, 12 * (soft529 - add529 - softCommitWaitMillis));
assertTrue("searcher529 wasn't soon enough after soft529: " +
searcher529 + " !< " + soft529 + " + " + slowTestFudge + " (fudge)",
searcher529 < soft529 + slowTestFudge );
@@ -197,10 +197,10 @@ public class SoftAutoCommitTest extends
assertNotNull("manCommit wasn't fast enough", manCommit);
assertTrue("forced manCommit didn't happen when it should have: " +
- manCommit + " !< " + postAdd529,
- manCommit < postAdd529);
+ manCommit + " !<= " + postAdd529,
+ manCommit <= postAdd529);
- Long hard529 = monitor.hard.poll(hardCommitWaitMillis, MILLISECONDS);
+ Long hard529 = monitor.hard.poll(hardCommitWaitMillis * 2, MILLISECONDS);
assertNotNull("hard529 wasn't fast enough", hard529);
monitor.assertSaneOffers();
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,169 @@
+package org.apache.solr.update;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.solr.BaseDistributedSearchTestCase;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.update.SolrCmdDistributor.Node;
+import org.apache.solr.update.SolrCmdDistributor.Response;
+import org.apache.solr.update.SolrCmdDistributor.StdNode;
+
+public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
+
+
+ public SolrCmdDistributorTest() {
+ fixShardCount = true;
+ shardCount = 1;
+ stress = 0;
+ }
+
+
+ public static String getSchemaFile() {
+ return "schema.xml";
+ }
+
+ public static String getSolrConfigFile() {
+ // use this because it has /update and is minimal
+ return "solrconfig-tlog.xml";
+ }
+
+ // TODO: for now we redefine this method so that it pulls from the above
+ // we don't get helpful override behavior due to the method being static
+ protected void createServers(int numShards) throws Exception {
+ controlJetty = createJetty(testDir, testDir + "/control/data", null, getSolrConfigFile(), getSchemaFile());
+
+ controlClient = createNewSolrServer(controlJetty.getLocalPort());
+
+ shardsArr = new String[numShards];
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < numShards; i++) {
+ if (sb.length() > 0) sb.append(',');
+ JettySolrRunner j = createJetty(testDir,
+ testDir + "/shard" + i + "/data", null, getSolrConfigFile(),
+ getSchemaFile());
+ jettys.add(j);
+ clients.add(createNewSolrServer(j.getLocalPort()));
+ String shardStr = "localhost:" + j.getLocalPort() + context;
+ shardsArr[i] = shardStr;
+ sb.append(shardStr);
+ }
+
+ shards = sb.toString();
+ }
+
+ @Override
+ public void doTest() throws Exception {
+ //del("*:*");
+
+ SolrCmdDistributor cmdDistrib = new SolrCmdDistributor();
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ List<Node> nodes = new ArrayList<Node>();
+
+ ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
+ ((CommonsHttpSolrServer) controlClient).getBaseURL(),
+ ZkStateReader.CORE_NAME_PROP, "");
+ nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
+
+ // add one doc to controlClient
+
+ AddUpdateCommand cmd = new AddUpdateCommand(null);
+ cmd.solrDoc = sdoc("id", 1);
+ cmdDistrib.distribAdd(cmd, nodes, params);
+
+ CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
+ cmdDistrib.distribCommit(ccmd, nodes, params);
+ cmdDistrib.finish();
+
+ Response response = cmdDistrib.getResponse();
+
+ assertEquals(response.errors.toString(), 0, response.errors.size());
+
+ long numFound = controlClient.query(new SolrQuery("*:*")).getResults()
+ .getNumFound();
+ assertEquals(1, numFound);
+
+ CommonsHttpSolrServer client = (CommonsHttpSolrServer) clients.get(0);
+ nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
+ client.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
+ nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
+
+ // add another 2 docs to control and 3 to client
+ cmdDistrib = new SolrCmdDistributor();
+ cmd.solrDoc = sdoc("id", 2);
+ cmdDistrib.distribAdd(cmd, nodes, params);
+
+ AddUpdateCommand cmd2 = new AddUpdateCommand(null);
+ cmd2.solrDoc = sdoc("id", 3);
+
+ cmdDistrib.distribAdd(cmd2, nodes, params);
+
+ AddUpdateCommand cmd3 = new AddUpdateCommand(null);
+ cmd3.solrDoc = sdoc("id", 4);
+
+ cmdDistrib.distribAdd(cmd3, Collections.singletonList(nodes.get(1)), params);
+
+ cmdDistrib.distribCommit(ccmd, nodes, params);
+ cmdDistrib.finish();
+ response = cmdDistrib.getResponse();
+
+ assertEquals(response.errors.toString(), 0, response.errors.size());
+
+ SolrDocumentList results = controlClient.query(new SolrQuery("*:*")).getResults();
+ numFound = results.getNumFound();
+ assertEquals(results.toString(), 3, numFound);
+
+ numFound = client.query(new SolrQuery("*:*")).getResults()
+ .getNumFound();
+ assertEquals(3, numFound);
+
+ // now delete doc 2 which is on both control and client1
+
+ DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null);
+ dcmd.id = "2";
+
+ cmdDistrib = new SolrCmdDistributor();
+ cmdDistrib.distribDelete(dcmd, nodes, params);
+
+ cmdDistrib.distribCommit(ccmd, nodes, params);
+ cmdDistrib.finish();
+
+ response = cmdDistrib.getResponse();
+
+ assertEquals(response.errors.toString(), 0, response.errors.size());
+
+ results = controlClient.query(new SolrQuery("*:*")).getResults();
+ numFound = results.getNumFound();
+ assertEquals(results.toString(), 2, numFound);
+
+ numFound = client.query(new SolrQuery("*:*")).getResults()
+ .getNumFound();
+ assertEquals(results.toString(), 2, numFound);
+ }
+}
Modified: lucene/dev/trunk/solr/example/solr/conf/schema.xml
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/example/solr/conf/schema.xml?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/example/solr/conf/schema.xml (original)
+++ lucene/dev/trunk/solr/example/solr/conf/schema.xml Wed Jan 25 19:49:26 2012
@@ -532,6 +532,9 @@
<field name="payloads" type="payloads" indexed="true" stored="true"/>
+
+ <field name="_version_" type="long" indexed="true" stored="true"/>
+
<!-- Uncommenting the following will create a "timestamp" field using
a default value of "NOW" to indicate when each document was indexed.
-->
Modified: lucene/dev/trunk/solr/example/solr/conf/solrconfig.xml
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/example/solr/conf/solrconfig.xml?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/example/solr/conf/solrconfig.xml (original)
+++ lucene/dev/trunk/solr/example/solr/conf/solrconfig.xml Wed Jan 25 19:49:26 2012
@@ -344,11 +344,9 @@
<!-- Enables a transaction log, currently used for real-time get.
"dir" - the target directory for transaction logs, defaults to the
solr data directory. -->
- <!--
- <updateLog class="solr.FSUpdateLog">
+ <updateLog>
<str name="dir">${solr.data.dir:}</str>
</updateLog>
- -->
</updateHandler>
@@ -1072,6 +1070,13 @@
</lst>
</requestHandler>
-->
+
+ <!-- Solr Replication for SolrCloud Recovery
+
+ This is the config need for SolrCloud's recovery replication.
+ -->
+ <requestHandler name="/replication" class="solr.ReplicationHandler" startup="lazy" />
+
<!-- Search Components
Modified: lucene/dev/trunk/solr/example/solr/solr.xml
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/example/solr/solr.xml?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/example/solr/solr.xml (original)
+++ lucene/dev/trunk/solr/example/solr/solr.xml Wed Jan 25 19:49:26 2012
@@ -28,7 +28,7 @@
adminPath: RequestHandler path to manage cores.
If 'null' (or absent), cores will not be manageable via request handler
-->
- <cores adminPath="/admin/cores" defaultCoreName="collection1">
- <core name="collection1" instanceDir="." shard="shard1"/>
+ <cores adminPath="/admin/cores" defaultCoreName="collection1" host="${host:}" hostPort="${jetty.port:}">
+ <core name="collection1" instanceDir="." />
</cores>
</solr>
Modified: lucene/dev/trunk/solr/lib/apache-solr-noggit-pom.xml.template
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/lib/apache-solr-noggit-pom.xml.template?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/lib/apache-solr-noggit-pom.xml.template (original)
+++ lucene/dev/trunk/solr/lib/apache-solr-noggit-pom.xml.template Wed Jan 25 19:49:26 2012
@@ -31,6 +31,6 @@
<artifactId>solr-noggit</artifactId>
<name>Solr Specific Noggit</name>
<version>@version@</version>
- <description>Solr Specific Noggit r1209632</description>
+ <description>Solr Specific Noggit r1211150</description>
<packaging>jar</packaging>
</project>
Added: lucene/dev/trunk/solr/lib/apache-solr-noggit-r1211150.jar
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/lib/apache-solr-noggit-r1211150.jar?rev=1235888&view=auto
==============================================================================
Binary file - no diff available.
Added: lucene/dev/trunk/solr/lib/zookeeper-3.3.4.jar
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/lib/zookeeper-3.3.4.jar?rev=1235888&view=auto
==============================================================================
Binary file - no diff available.
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Wed Jan 25 19:49:26 2012
@@ -28,16 +28,23 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.KeeperException;
public class CloudSolrServer extends SolrServer {
@@ -48,12 +55,14 @@ public class CloudSolrServer extends Sol
private String defaultCollection;
private LBHttpSolrServer lbServer;
Random rand = new Random();
-
+ private MultiThreadedHttpConnectionManager connManager;
/**
* @param zkHost The address of the zookeeper quorum containing the cloud state
*/
public CloudSolrServer(String zkHost) throws MalformedURLException {
- this(zkHost, new LBHttpSolrServer());
+ connManager = new MultiThreadedHttpConnectionManager();
+ this.zkHost = zkHost;
+ this.lbServer = new LBHttpSolrServer(new HttpClient(connManager));
}
/**
@@ -88,42 +97,58 @@ public class CloudSolrServer extends Sol
* @throws InterruptedException
*/
public void connect() {
- if (zkStateReader != null) return;
- synchronized(this) {
- if (zkStateReader != null) return;
- try {
- ZkStateReader zk = new ZkStateReader(zkHost, zkConnectTimeout, zkClientTimeout);
- zk.makeCollectionsNodeWatches();
- zk.makeShardZkNodeWatches(false);
- zk.updateCloudState(true);
- zkStateReader = zk;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (KeeperException e) {
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-
- } catch (IOException e) {
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-
- } catch (TimeoutException e) {
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+ if (zkStateReader == null) {
+ synchronized (this) {
+ if (zkStateReader == null) {
+ try {
+ ZkStateReader zk = new ZkStateReader(zkHost, zkConnectTimeout,
+ zkClientTimeout);
+ zk.createClusterStateWatchersAndUpdate();
+ zkStateReader = zk;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (KeeperException e) {
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (IOException e) {
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (TimeoutException e) {
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+ }
}
}
}
-
@Override
public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException {
connect();
+ // TODO: if you can hash here, you could favor the shard leader
+
CloudState cloudState = zkStateReader.getCloudState();
- String collection = request.getParams().get("collection", defaultCollection);
-
- // TODO: allow multiple collections to be specified via comma separated list
+ SolrParams reqParams = request.getParams();
+ if (reqParams == null) {
+ reqParams = new ModifiableSolrParams();
+ }
+ String collection = reqParams.get("collection", defaultCollection);
+
+ // Extract each comma separated collection name and store in a List.
+ List<String> collectionList = StrUtils.splitSmart(collection, ",", true);
+
+ // Retrieve slices from the cloud state and, for each collection specified,
+ // add it to the Map of slices.
+ Map<String,Slice> slices = new HashMap<String,Slice>();
+ for (int i = 0; i < collectionList.size(); i++) {
+ String coll= collectionList.get(i);
+ ClientUtils.appendMap(coll, slices, cloudState.getSlices(coll));
+ }
- Map<String,Slice> slices = cloudState.getSlices(collection);
Set<String> liveNodes = cloudState.getLiveNodes();
// IDEA: have versions on various things... like a global cloudState version
@@ -136,18 +161,21 @@ public class CloudSolrServer extends Sol
List<String> urlList = new ArrayList<String>();
for (Slice slice : slices.values()) {
for (ZkNodeProps nodeProps : slice.getShards().values()) {
- String node = nodeProps.get(ZkStateReader.NODE_NAME);
- if (!liveNodes.contains(node)) continue;
+ ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
+ String node = coreNodeProps.getNodeName();
+ if (!liveNodes.contains(coreNodeProps.getNodeName())
+ || !coreNodeProps.getState().equals(
+ ZkStateReader.ACTIVE)) continue;
if (nodes.put(node, nodeProps) == null) {
- String url = nodeProps.get(ZkStateReader.URL_PROP);
+ String url = coreNodeProps.getCoreUrl();
urlList.add(url);
}
}
}
Collections.shuffle(urlList, rand);
- // System.out.println("########################## MAKING REQUEST TO " + urlList);
- // TODO: set distrib=true if we detected more than one shard?
+ //System.out.println("########################## MAKING REQUEST TO " + urlList);
+
LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(request, urlList);
LBHttpSolrServer.Rsp rsp = lbServer.request(req);
return rsp.getResponse();
@@ -161,5 +189,12 @@ public class CloudSolrServer extends Sol
zkStateReader = null;
}
}
+ if (connManager != null) {
+ connManager.shutdown();
+ }
+ }
+
+ public LBHttpSolrServer getLbServer() {
+ return lbServer;
}
}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java Wed Jan 25 19:49:26 2012
@@ -139,6 +139,8 @@ public class CommonsHttpSolrServer exten
* with single-part requests.
*/
private boolean useMultiPartPost;
+
+ private boolean shutdownHttpClient = false;
/**
* @param solrServerUrl The URL of the Solr server. For
@@ -204,6 +206,7 @@ public class CommonsHttpSolrServer exten
}
if (client == null) {
+ shutdownHttpClient = true;
_httpClient = new HttpClient(new MultiThreadedHttpConnectionManager()) ;
// prevent retries (note: this didn't work when set on mgr.. needed to be set on client)
@@ -669,4 +672,12 @@ public class CommonsHttpSolrServer exten
req.setCommitWithin(commitWithinMs);
return req.process(this);
}
+
+ public void shutdown() {
+ if (shutdownHttpClient && _httpClient != null
+ && _httpClient.getHttpConnectionManager() instanceof MultiThreadedHttpConnectionManager) {
+ ((MultiThreadedHttpConnectionManager) _httpClient
+ .getHttpConnectionManager()).shutdown();
+ }
+ }
}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java Wed Jan 25 19:49:26 2012
@@ -28,6 +28,8 @@ import org.apache.solr.common.SolrExcept
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.MalformedURLException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
import java.net.URL;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -249,16 +251,26 @@ public class LBHttpSolrServer extends So
rsp.rsp = server.request(req.getRequest());
return rsp; // SUCCESS
} catch (SolrException e) {
- // Server is alive but the request was malformed or invalid
- throw e;
+ // we retry on 404 or 403 or 503 - you can see this on solr shutdown
+ if (e.code() == 404 || e.code() == 403 || e.code() == 503 || e.code() == 500) {
+ ex = addZombie(server, e);
+ } else {
+ // Server is alive but the request was likely malformed or invalid
+ throw e;
+ }
+
+ // TODO: consider using below above - currently does cause a problem with distrib updates:
+ // seems to match up against a failed forward to leader exception as well...
+ // || e.getMessage().contains("java.net.SocketException")
+ // || e.getMessage().contains("java.net.ConnectException")
+ } catch (SocketException e) {
+ ex = addZombie(server, e);
+ } catch (SocketTimeoutException e) {
+ ex = addZombie(server, e);
} catch (SolrServerException e) {
- if (e.getRootCause() instanceof IOException) {
- ex = e;
- wrapper = new ServerWrapper(server);
- wrapper.lastUsed = System.currentTimeMillis();
- wrapper.standard = false;
- zombieServers.put(wrapper.getKey(), wrapper);
- startAliveCheckExecutor();
+ Throwable rootCause = e.getRootCause();
+ if (rootCause instanceof IOException) {
+ ex = addZombie(server, e);
} else {
throw e;
}
@@ -274,11 +286,23 @@ public class LBHttpSolrServer extends So
zombieServers.remove(wrapper.getKey());
return rsp; // SUCCESS
} catch (SolrException e) {
- // Server is alive but the request was malformed or invalid
- zombieServers.remove(wrapper.getKey());
- throw e;
+ // we retry on 404 or 403 or 503 - you can see this on solr shutdown
+ if (e.code() == 404 || e.code() == 403 || e.code() == 503 || e.code() == 500) {
+ ex = e;
+ // already a zombie, no need to re-add
+ } else {
+ // Server is alive but the request was malformed or invalid
+ zombieServers.remove(wrapper.getKey());
+ throw e;
+ }
+
+ } catch (SocketException e) {
+ ex = e;
+ } catch (SocketTimeoutException e) {
+ ex = e;
} catch (SolrServerException e) {
- if (e.getRootCause() instanceof IOException) {
+ Throwable rootCause = e.getRootCause();
+ if (rootCause instanceof IOException) {
ex = e;
// already a zombie, no need to re-add
} else {
@@ -293,9 +317,22 @@ public class LBHttpSolrServer extends So
if (ex == null) {
throw new SolrServerException("No live SolrServers available to handle this request");
} else {
- throw new SolrServerException("No live SolrServers available to handle this request", ex);
+ throw new SolrServerException("No live SolrServers available to handle this request:" + zombieServers.keySet(), ex);
}
+ }
+
+ private Exception addZombie(CommonsHttpSolrServer server,
+ Exception e) {
+
+ ServerWrapper wrapper;
+
+ wrapper = new ServerWrapper(server);
+ wrapper.lastUsed = System.currentTimeMillis();
+ wrapper.standard = false;
+ zombieServers.put(wrapper.getKey(), wrapper);
+ startAliveCheckExecutor();
+ return e;
}
@@ -363,6 +400,12 @@ public class LBHttpSolrServer extends So
public void setSoTimeout(int timeout) {
httpClient.getParams().setSoTimeout(timeout);
}
+
+ public void shutdown() {
+ if (aliveCheckExecutor != null) {
+ aliveCheckExecutor.shutdownNow();
+ }
+ }
/**
* Tries to query a live server. A SolrServerException is thrown if all servers are dead.
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java Wed Jan 25 19:49:26 2012
@@ -20,8 +20,14 @@ package org.apache.solr.client.solrj.imp
import java.io.IOException;
import java.io.OutputStream;
import java.net.MalformedURLException;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -31,13 +37,10 @@ import org.apache.commons.httpclient.met
import org.apache.commons.httpclient.methods.RequestEntity;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.util.ClientUtils;
-import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.NamedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -300,4 +303,33 @@ public class StreamingUpdateSolrServer e
{
log.error( "error", ex );
}
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ scheduler.shutdown();
+ try {
+ if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+ scheduler.shutdownNow();
+ if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log
+ .error("ExecutorService did not terminate");
+ }
+ } catch (InterruptedException ie) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+
+
+ public void shutdownNow() {
+ super.shutdown();
+ scheduler.shutdownNow(); // Cancel currently executing tasks
+ try {
+ if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) log
+ .error("ExecutorService did not terminate");
+ } catch (InterruptedException ie) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java Wed Jan 25 19:49:26 2012
@@ -89,6 +89,74 @@ public class CoreAdminRequest extends So
return params;
}
}
+
+ public static class PrepRecovery extends CoreAdminRequest {
+ protected String nodeName;
+ protected String coreNodeName;
+
+ public PrepRecovery() {
+ action = CoreAdminAction.PREPRECOVERY;
+ }
+
+ public void setNodeName(String nodeName) {
+ this.nodeName = nodeName;
+ }
+
+ public String getNodeName() {
+ return nodeName;
+ }
+
+ public String getCoreNodeName() {
+ return coreNodeName;
+ }
+
+ public void setCoreNodeName(String coreNodeName) {
+ this.coreNodeName = coreNodeName;
+ }
+
+ @Override
+ public SolrParams getParams() {
+ if( action == null ) {
+ throw new RuntimeException( "no action specified!" );
+ }
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set( CoreAdminParams.ACTION, action.toString() );
+
+ params.set( CoreAdminParams.CORE, core );
+
+ if (nodeName != null) {
+ params.set( "nodeName", nodeName);
+ }
+
+ if (coreNodeName != null) {
+ params.set( "coreNodeName", coreNodeName);
+ }
+
+ return params;
+ }
+
+ }
+
+ public static class RequestRecovery extends CoreAdminRequest {
+
+ public RequestRecovery() {
+ action = CoreAdminAction.REQUESTRECOVERY;
+ }
+
+ @Override
+ public SolrParams getParams() {
+ if( action == null ) {
+ throw new RuntimeException( "no action specified!" );
+ }
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set( CoreAdminParams.ACTION, action.toString() );
+
+ params.set( CoreAdminParams.CORE, core );
+
+ return params;
+ }
+ }
+
//a persist core request
public static class Persist extends CoreAdminRequest {
protected String fileName = null;
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,252 @@
+package org.apache.solr.client.solrj.request;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ContentStream;
+import org.apache.solr.common.util.XML;
+
+// TODO: bake this into UpdateRequest
+public class UpdateRequestExt extends AbstractUpdateRequest {
+
+ private List<SolrDoc> documents = null;
+ private Map<String,Long> deleteById = null;
+ private List<String> deleteQuery = null;
+
+ private class SolrDoc {
+ @Override
+ public String toString() {
+ return "SolrDoc [document=" + document + ", commitWithin=" + commitWithin
+ + ", overwrite=" + overwrite + "]";
+ }
+ SolrInputDocument document;
+ int commitWithin;
+ boolean overwrite;
+ }
+
+ public UpdateRequestExt() {
+ super(METHOD.POST, "/update");
+ }
+
+ public UpdateRequestExt(String url) {
+ super(METHOD.POST, url);
+ }
+
+ // ---------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------
+
+ /**
+ * clear the pending documents and delete commands
+ */
+ public void clear() {
+ if (documents != null) {
+ documents.clear();
+ }
+ if (deleteById != null) {
+ deleteById.clear();
+ }
+ if (deleteQuery != null) {
+ deleteQuery.clear();
+ }
+ }
+
+ // ---------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------
+
+ public UpdateRequestExt add(final SolrInputDocument doc) {
+ if (documents == null) {
+ documents = new ArrayList<SolrDoc>(2);
+ }
+ SolrDoc solrDoc = new SolrDoc();
+ solrDoc.document = doc;
+ solrDoc.commitWithin = -1;
+ solrDoc.overwrite = true;
+ documents.add(solrDoc);
+
+ return this;
+ }
+
+ public UpdateRequestExt add(final SolrInputDocument doc, int commitWithin,
+ boolean overwrite) {
+ if (documents == null) {
+ documents = new ArrayList<SolrDoc>(2);
+ }
+ SolrDoc solrDoc = new SolrDoc();
+ solrDoc.document = doc;
+ solrDoc.commitWithin = commitWithin;
+ solrDoc.overwrite = overwrite;
+ documents.add(solrDoc);
+
+ return this;
+ }
+
+ public UpdateRequestExt deleteById(String id) {
+ if (deleteById == null) {
+ deleteById = new HashMap<String,Long>();
+ }
+ deleteById.put(id, null);
+ return this;
+ }
+
+ public UpdateRequestExt deleteById(String id, Long version) {
+ if (deleteById == null) {
+ deleteById = new HashMap<String,Long>();
+ }
+ deleteById.put(id, version);
+ return this;
+ }
+
+ public UpdateRequestExt deleteById(List<String> ids) {
+ if (deleteById == null) {
+ deleteById = new HashMap<String,Long>();
+ } else {
+ for (String id : ids) {
+ deleteById.put(id, null);
+ }
+ }
+ return this;
+ }
+
+ public UpdateRequestExt deleteByQuery(String q) {
+ if (deleteQuery == null) {
+ deleteQuery = new ArrayList<String>();
+ }
+ deleteQuery.add(q);
+ return this;
+ }
+
+ // --------------------------------------------------------------------------
+ // --------------------------------------------------------------------------
+
+ @Override
+ public Collection<ContentStream> getContentStreams() throws IOException {
+ return ClientUtils.toContentStreams(getXML(), ClientUtils.TEXT_XML);
+ }
+
+ public String getXML() throws IOException {
+ StringWriter writer = new StringWriter();
+ writeXML(writer);
+ writer.flush();
+
+ String xml = writer.toString();
+
+ return (xml.length() > 0) ? xml : null;
+ }
+
+ public void writeXML(Writer writer) throws IOException {
+ List<List<SolrDoc>> getDocLists = getDocLists(documents);
+
+ for (List<SolrDoc> docs : getDocLists) {
+
+ if ((docs != null && docs.size() > 0)) {
+ SolrDoc firstDoc = docs.get(0);
+ int commitWithin = firstDoc.commitWithin != -1 ? firstDoc.commitWithin : this.commitWithin;
+ boolean overwrite = firstDoc.overwrite;
+ if (commitWithin > -1 || overwrite != true) {
+ writer.write("<add commitWithin=\"" + commitWithin + "\" " + "overwrite=\"" + overwrite + "\">");
+ } else {
+ writer.write("<add>");
+ }
+ if (documents != null) {
+ for (SolrDoc doc : documents) {
+ if (doc != null) {
+ ClientUtils.writeXML(doc.document, writer);
+ }
+ }
+ }
+
+ writer.write("</add>");
+ }
+ }
+
+ // Add the delete commands
+ boolean deleteI = deleteById != null && deleteById.size() > 0;
+ boolean deleteQ = deleteQuery != null && deleteQuery.size() > 0;
+ if (deleteI || deleteQ) {
+ writer.append("<delete>");
+ if (deleteI) {
+ for (Map.Entry<String,Long> entry : deleteById.entrySet()) {
+ writer.append("<id");
+ Long version = entry.getValue();
+ if (version != null) {
+ writer.append(" version=\"" + version + "\"");
+ }
+ writer.append(">");
+
+ XML.escapeCharData(entry.getKey(), writer);
+ writer.append("</id>");
+ }
+ }
+ if (deleteQ) {
+ for (String q : deleteQuery) {
+ writer.append("<query>");
+ XML.escapeCharData(q, writer);
+ writer.append("</query>");
+ }
+ }
+ writer.append("</delete>");
+ }
+ }
+
+ private List<List<SolrDoc>> getDocLists(List<SolrDoc> documents) {
+ List<List<SolrDoc>> docLists = new ArrayList<List<SolrDoc>>();
+ if (this.documents == null) {
+ return docLists;
+ }
+ boolean lastOverwrite = true;
+ int lastCommitWithin = -1;
+ List<SolrDoc> docList = null;
+ for (SolrDoc doc : this.documents) {
+ if (doc.overwrite != lastOverwrite
+ || doc.commitWithin != lastCommitWithin || docLists.size() == 0) {
+ docList = new ArrayList<SolrDoc>();
+ docLists.add(docList);
+ }
+ docList.add(doc);
+ lastCommitWithin = doc.commitWithin;
+ lastOverwrite = doc.overwrite;
+ }
+
+ return docLists;
+ }
+
+ public Map<String,Long> getDeleteById() {
+ return deleteById;
+ }
+
+ public List<String> getDeleteQuery() {
+ return deleteQuery;
+ }
+
+ @Override
+ public String toString() {
+ return "UpdateRequestExt [documents=" + documents + ", deleteById="
+ + deleteById + ", deleteQuery=" + deleteQuery + "]";
+ }
+
+}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/util/ClientUtils.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/util/ClientUtils.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/util/ClientUtils.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/util/ClientUtils.java Wed Jan 25 19:49:26 2012
@@ -27,7 +27,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
import java.util.TimeZone;
+import java.util.Map.Entry;
import java.nio.ByteBuffer;
import org.apache.commons.httpclient.util.DateParseException;
@@ -35,6 +38,7 @@ import org.apache.commons.httpclient.uti
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.*;
@@ -229,4 +233,11 @@ public class ClientUtils
catch (IOException e) {throw new RuntimeException(e);} // can't happen
return sb.toString();
}
+
+ public static void appendMap(String collection, Map<String,Slice> map1, Map<String,Slice> map2) {
+ Set<Entry<String,Slice>> entrySet = map2.entrySet();
+ for (Entry<String,Slice> entry : entrySet) {
+ map1.put(collection + "_" + entry.getKey(), entry.getValue());
+ }
+ }
}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/SolrException.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/SolrException.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/SolrException.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/SolrException.java Wed Jan 25 19:49:26 2012
@@ -17,14 +17,14 @@
package org.apache.solr.common;
-import org.slf4j.Logger;
-
import java.io.CharArrayWriter;
import java.io.PrintWriter;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.slf4j.Logger;
+
/**
*
*/
@@ -74,7 +74,7 @@ public class SolrException extends Runti
public void log(Logger log) { log(log,this); }
public static void log(Logger log, Throwable e) {
String stackTrace = toStr(e);
- String ignore = doIgnore(stackTrace);
+ String ignore = doIgnore(e, stackTrace);
if (ignore != null) {
log.info(ignore);
return;
@@ -85,14 +85,23 @@ public class SolrException extends Runti
public static void log(Logger log, String msg, Throwable e) {
String stackTrace = msg + ':' + toStr(e);
- String ignore = doIgnore(stackTrace);
+ String ignore = doIgnore(e, stackTrace);
+ if (ignore != null) {
+ log.info(ignore);
+ return;
+ }
+ log.error(stackTrace);
+ }
+
+ public static void log(Logger log, String msg) {
+ String stackTrace = msg;
+ String ignore = doIgnore(null, stackTrace);
if (ignore != null) {
log.info(ignore);
return;
}
log.error(stackTrace);
}
-
// public String toString() { return toStr(this); } // oops, inf loop
@Override
@@ -120,17 +129,30 @@ public class SolrException extends Runti
public static Set<String> ignorePatterns;
/** Returns null if this exception does not match any ignore patterns, or a message string to use if it does. */
- public static String doIgnore(String m) {
+ public static String doIgnore(Throwable t, String m) {
if (ignorePatterns == null || m == null) return null;
+ if (t != null && t instanceof AssertionError) return null;
for (String regex : ignorePatterns) {
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(m);
+
if (matcher.find()) return "Ignoring exception matching " + regex;
}
return null;
}
-
+
+ public static Throwable getRootCause(Throwable t) {
+ while (true) {
+ Throwable cause = t.getCause();
+ if (cause!=null) {
+ t = cause;
+ } else {
+ break;
+ }
+ }
+ return t;
+ }
}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java Wed Jan 25 19:49:26 2012
@@ -17,149 +17,222 @@ package org.apache.solr.common.cloud;
* limitations under the License.
*/
-import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
+import org.apache.noggit.JSONWriter;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.HashPartitioner.Range;
import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-// immutable
-public class CloudState {
- protected static Logger log = LoggerFactory.getLogger(CloudState.class);
-
- private final Map<String,Map<String,Slice>> collectionStates;
- private final Set<String> liveNodes;
-
- public CloudState(Set<String> liveNodes, Map<String,Map<String,Slice>> collectionStates) {
- this.liveNodes = liveNodes;
- this.collectionStates = collectionStates;
- }
-
- public Map<String,Slice> getSlices(String collection) {
- Map<String,Slice> collectionState = collectionStates.get(collection);
- if(collectionState == null) {
- return null;
- }
- return Collections.unmodifiableMap(collectionState);
- }
-
- public Set<String> getCollections() {
- return Collections.unmodifiableSet(collectionStates.keySet());
- }
-
- public Map<String,Map<String,Slice>> getCollectionStates() {
- return Collections.unmodifiableMap(collectionStates);
- }
-
- public Set<String> getLiveNodes() {
- return Collections.unmodifiableSet(liveNodes);
- }
-
- public boolean liveNodesContain(String name) {
- return liveNodes.contains(name);
- }
-
- public static CloudState buildCloudState(SolrZkClient zkClient, CloudState oldCloudState, boolean onlyLiveNodes) throws KeeperException, InterruptedException, IOException {
- Map<String,Map<String,Slice>> collectionStates;
- if (!onlyLiveNodes) {
- List<String> collections = zkClient.getChildren(
- ZkStateReader.COLLECTIONS_ZKNODE, null);
-
- collectionStates = new HashMap<String,Map<String,Slice>>();
- for (String collection : collections) {
- String shardIdPaths = ZkStateReader.COLLECTIONS_ZKNODE + "/"
- + collection + ZkStateReader.SHARDS_ZKNODE;
- List<String> shardIdNames;
- try {
- shardIdNames = zkClient.getChildren(shardIdPaths, null);
- } catch (KeeperException.NoNodeException e) {
- // node is not valid currently
- continue;
- }
- Map<String,Slice> slices = new HashMap<String,Slice>();
- for (String shardIdZkPath : shardIdNames) {
- Slice oldSlice = null;
- if (oldCloudState.getCollectionStates().containsKey(collection)
- && oldCloudState.getCollectionStates().get(collection)
- .containsKey(shardIdZkPath)) {
- oldSlice = oldCloudState.getCollectionStates().get(collection)
- .get(shardIdZkPath);
+public class CloudState implements JSONWriter.Writable {
+ private final Map<String, Map<String,Slice>> collectionStates; // Map<collectionName, Map<sliceName,Slice>>
+ private final Set<String> liveNodes;
+
+ private final HashPartitioner hp = new HashPartitioner();
+
+ private final Map<String,RangeInfo> rangeInfos = new HashMap<String,RangeInfo>();
+ public Map<String,Map<String,ZkNodeProps>> leaders = new HashMap<String,Map<String,ZkNodeProps>>();
+
+
+ public CloudState() {
+ this.liveNodes = new HashSet<String>();
+ this.collectionStates = new HashMap<String,Map<String,Slice>>(0);
+ addRangeInfos(collectionStates.keySet());
+ getShardLeaders();
+ }
+
+ public CloudState(Set<String> liveNodes,
+ Map<String, Map<String,Slice>> collectionStates) {
+ this.liveNodes = new HashSet<String>(liveNodes.size());
+ this.liveNodes.addAll(liveNodes);
+ this.collectionStates = new HashMap<String, Map<String,Slice>>(collectionStates.size());
+ this.collectionStates.putAll(collectionStates);
+ addRangeInfos(collectionStates.keySet());
+ getShardLeaders();
+ }
+
+ private void getShardLeaders() {
+ Set<Entry<String,Map<String,Slice>>> collections = collectionStates.entrySet();
+ for (Entry<String,Map<String,Slice>> collection : collections) {
+ Map<String,Slice> state = collection.getValue();
+ Set<Entry<String,Slice>> slices = state.entrySet();
+ for (Entry<String,Slice> sliceEntry : slices) {
+ Slice slice = sliceEntry.getValue();
+ Map<String,ZkNodeProps> shards = slice.getShards();
+ Set<Entry<String,ZkNodeProps>> shardsEntries = shards.entrySet();
+ for (Entry<String,ZkNodeProps> shardEntry : shardsEntries) {
+ ZkNodeProps props = shardEntry.getValue();
+ if (props.containsKey(ZkStateReader.LEADER_PROP)) {
+ Map<String,ZkNodeProps> leadersForCollection = leaders.get(collection.getKey());
+ if (leadersForCollection == null) {
+ leadersForCollection = new HashMap<String,ZkNodeProps>();
+
+ leaders.put(collection.getKey(), leadersForCollection);
+ }
+ leadersForCollection.put(sliceEntry.getKey(), props);
}
-
- Map<String,ZkNodeProps> shardsMap = readShards(zkClient, shardIdPaths
- + "/" + shardIdZkPath, oldSlice);
- Slice slice = new Slice(shardIdZkPath, shardsMap);
- slices.put(shardIdZkPath, slice);
}
- collectionStates.put(collection, slices);
}
- } else {
- collectionStates = oldCloudState.getCollectionStates();
+ }
+ }
+
+ public ZkNodeProps getLeader(String collection, String shard) {
+ Map<String,ZkNodeProps> collectionLeaders = leaders.get(collection);
+ if (collectionLeaders == null) return null;
+ return collectionLeaders.get(shard);
+ }
+
+ private void addRangeInfos(Set<String> collections) {
+ for (String collection : collections) {
+ addRangeInfo(collection);
+ }
+ }
+
+ public Slice getSlice(String collection, String slice) {
+ if (collectionStates.containsKey(collection)
+ && collectionStates.get(collection).containsKey(slice))
+ return collectionStates.get(collection).get(slice);
+ return null;
+ }
+
+ public Map<String, Slice> getSlices(String collection) {
+ if(!collectionStates.containsKey(collection))
+ return null;
+ return Collections.unmodifiableMap(collectionStates.get(collection));
+ }
+
+ public Set<String> getCollections() {
+ return Collections.unmodifiableSet(collectionStates.keySet());
+ }
+
+ public Map<String, Map<String, Slice>> getCollectionStates() {
+ return Collections.unmodifiableMap(collectionStates);
+ }
+
+ public Set<String> getLiveNodes() {
+ return Collections.unmodifiableSet(liveNodes);
+ }
+
+ public String getShardId(String coreNodeName) {
+ for (Entry<String, Map<String, Slice>> states: collectionStates.entrySet()){
+ for(Entry<String, Slice> slices: states.getValue().entrySet()) {
+ for(Entry<String, ZkNodeProps> shards: slices.getValue().getShards().entrySet()){
+ if(coreNodeName.equals(shards.getKey())) {
+ return slices.getKey();
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public boolean liveNodesContain(String name) {
+ return liveNodes.contains(name);
+ }
+
+ public RangeInfo getRanges(String collection) {
+ // TODO: store this in zk
+ RangeInfo rangeInfo = rangeInfos.get(collection);
+
+ return rangeInfo;
+ }
+
+ private RangeInfo addRangeInfo(String collection) {
+ List<Range> ranges;
+ RangeInfo rangeInfo;
+ rangeInfo = new RangeInfo();
+
+ Map<String,Slice> slices = getSlices(collection);
+
+ if (slices == null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Can not find collection "
+ + collection + " in " + this);
}
- CloudState cloudInfo = new CloudState(getLiveNodes(zkClient), collectionStates);
+ Set<String> shards = slices.keySet();
+ ArrayList<String> shardList = new ArrayList<String>(shards.size());
+ shardList.addAll(shards);
+ Collections.sort(shardList);
- return cloudInfo;
- }
-
- /**
- * @param zkClient
- * @param shardsZkPath
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- * @throws IOException
- */
- private static Map<String,ZkNodeProps> readShards(SolrZkClient zkClient, String shardsZkPath, Slice oldSlice)
- throws KeeperException, InterruptedException, IOException {
-
- Map<String,ZkNodeProps> shardNameToProps = new HashMap<String,ZkNodeProps>();
-
- if (zkClient.exists(shardsZkPath, null) == null) {
- throw new IllegalStateException("Cannot find zk shards node that should exist:"
- + shardsZkPath);
- }
-
- List<String> shardZkPaths = zkClient.getChildren(shardsZkPath, null);
-
- for (String shardPath : shardZkPaths) {
- ZkNodeProps props;
- if (oldSlice != null && oldSlice.getShards().containsKey(shardPath)) {
- props = oldSlice.getShards().get(shardPath);
- } else {
- byte[] data = zkClient.getData(shardsZkPath + "/" + shardPath, null,
- null);
-
- props = new ZkNodeProps();
- props.load(data);
+ ranges = hp.partitionRange(shards.size());
+
+ rangeInfo.ranges = ranges;
+ rangeInfo.shardList = shardList;
+ rangeInfos.put(collection, rangeInfo);
+ return rangeInfo;
+ }
+
+ public String getShard(int hash, String collection) {
+ RangeInfo rangInfo = getRanges(collection);
+
+ int cnt = 0;
+ for (Range range : rangInfo.ranges) {
+ if (hash < range.max) {
+ return rangInfo.shardList.get(cnt);
}
-
- shardNameToProps.put(shardPath, props);
+ cnt++;
}
-
- return Collections.unmodifiableMap(shardNameToProps);
+
+ throw new IllegalStateException("The HashPartitioner failed");
}
-
- private static Set<String> getLiveNodes(SolrZkClient zkClient) throws KeeperException, InterruptedException {
- List<String> liveNodes = zkClient.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, null);
- Set<String> liveNodesSet = new HashSet<String>(liveNodes.size());
- liveNodesSet.addAll(liveNodes);
- return liveNodesSet;
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("live nodes:" + liveNodes);
+ sb.append(" collections:" + collectionStates);
+ return sb.toString();
+ }
+
+ public static CloudState load(SolrZkClient zkClient, Set<String> liveNodes) throws KeeperException, InterruptedException {
+ byte[] state = zkClient.getData(ZkStateReader.CLUSTER_STATE,
+ null, null, true);
+ return load(state, liveNodes);
+ }
+
+ public static CloudState load(byte[] bytes, Set<String> liveNodes) throws KeeperException, InterruptedException {
+ if (bytes == null || bytes.length == 0) {
+ return new CloudState(liveNodes, Collections.<String, Map<String,Slice>>emptyMap());
+ }
+
+ LinkedHashMap<String, Object> stateMap = (LinkedHashMap<String, Object>) ZkStateReader.fromJSON(bytes);
+ HashMap<String,Map<String, Slice>> state = new HashMap<String,Map<String,Slice>>();
+
+ for(String collectionName: stateMap.keySet()){
+ Map<String, Object> collection = (Map<String, Object>)stateMap.get(collectionName);
+ Map<String, Slice> slices = new LinkedHashMap<String,Slice>();
+ for(String sliceName: collection.keySet()) {
+ Map<String, Map<String, String>> sliceMap = (Map<String, Map<String, String>>)collection.get(sliceName);
+ Map<String, ZkNodeProps> shards = new LinkedHashMap<String,ZkNodeProps>();
+ for(String shardName: sliceMap.keySet()) {
+ shards.put(shardName, new ZkNodeProps(sliceMap.get(shardName)));
+ }
+ Slice slice = new Slice(sliceName, shards);
+ slices.put(sliceName, slice);
+ }
+ state.put(collectionName, slices);
+ }
+ return new CloudState(liveNodes, state);
+ }
+
+ @Override
+ public void write(JSONWriter jsonWriter) {
+ jsonWriter.write(collectionStates);
}
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("live nodes:" + liveNodes);
- sb.append(" collections:" + collectionStates);
- return sb.toString();
+ class RangeInfo {
+ private List<Range> ranges;
+ private ArrayList<String> shardList;
}
+
}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java Wed Jan 25 19:49:26 2012
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
+import org.apache.solr.common.SolrException;
+import org.apache.zookeeper.SolrZooKeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -36,7 +38,7 @@ class ConnectionManager implements Watch
private KeeperState state;
private boolean connected;
- private ZkClientConnectionStrategy connectionStrategy;
+ private final ZkClientConnectionStrategy connectionStrategy;
private String zkServerAddress;
@@ -73,30 +75,33 @@ class ConnectionManager implements Watch
connected = true;
clientConnected.countDown();
} else if (state == KeeperState.Expired) {
-
connected = false;
- log.info("Attempting to reconnect to ZooKeeper...");
+ log.info("Attempting to reconnect to recover relationship with ZooKeeper...");
try {
- connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this, new ZkClientConnectionStrategy.ZkUpdate() {
- @Override
- public void update(SolrZooKeeper keeper) throws InterruptedException, TimeoutException, IOException {
- waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
- client.updateKeeper(keeper);
- if(onReconnect != null) {
- onReconnect.command();
- }
- ConnectionManager.this.connected = true;
- }
- });
+ connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,
+ new ZkClientConnectionStrategy.ZkUpdate() {
+ @Override
+ public void update(SolrZooKeeper keeper)
+ throws InterruptedException, TimeoutException, IOException {
+ synchronized (connectionStrategy) {
+ waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
+ client.updateKeeper(keeper);
+ if (onReconnect != null) {
+ onReconnect.command();
+ }
+ synchronized (ConnectionManager.this) {
+ ConnectionManager.this.connected = true;
+ }
+ }
+
+ }
+ });
} catch (Exception e) {
- log.error("", e);
+ SolrException.log(log, "", e);
}
-
log.info("Connected:" + connected);
} else if (state == KeeperState.Disconnected) {
- // ZooKeeper client will recover when it can
- // TODO: this needs to be investigated more
connected = false;
} else {
connected = false;
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CoreState.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CoreState.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CoreState.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,96 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.noggit.JSONWriter;
+
+public class CoreState implements JSONWriter.Writable {
+
+ private final Map<String, String> properties;
+
+ private CoreState(Map<String, String> props) {
+ this.properties = Collections.unmodifiableMap(props);
+ }
+
+ public CoreState(String coreName, String collectionName, Map<String,String> properties) {
+ HashMap<String,String> props = new HashMap<String,String>();
+ props.putAll(properties);
+ props.put(ZkStateReader.COLLECTION_PROP, collectionName);
+ props.put(ZkStateReader.CORE_NAME_PROP, coreName);
+ this.properties = Collections.unmodifiableMap(props);
+ }
+
+ public String getCoreName() {
+ return properties.get(ZkStateReader.CORE_NAME_PROP);
+ }
+
+ public String getCoreNodeName() {
+ return properties.get(ZkStateReader.NODE_NAME_PROP) + "_" + getCoreName();
+ }
+
+ public String getCollectionName() {
+ return properties.get(ZkStateReader.COLLECTION_PROP);
+ }
+
+ public Map<String,String> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public void write(JSONWriter jsonWriter) {
+ jsonWriter.write(properties);
+ }
+
+
+ public static CoreState[] load(byte[] bytes) {
+ List<Map<String, String>> stateMaps = (List<Map<String, String>>) ZkStateReader.fromJSON(bytes);
+
+ CoreState[] states = new CoreState[stateMaps.size()];
+ int i = 0;
+ for (Map<String,String> state : stateMaps) {
+ states[i++] = new CoreState(state);
+ }
+
+ return states;
+ }
+
+ @Override
+ public int hashCode() {
+ return properties.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if(other instanceof CoreState) {
+ CoreState otherState = (CoreState) other;
+ return this.getProperties().equals(otherState.getProperties());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "coll:" + getCollectionName() + " core:" + getCoreName() + " props:" + properties;
+ }
+
+}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java Wed Jan 25 19:49:26 2012
@@ -18,11 +18,10 @@ package org.apache.solr.common.cloud;
*/
import java.io.IOException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.solr.common.SolrException;
+import org.apache.zookeeper.SolrZooKeeper;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,7 +32,6 @@ import org.slf4j.LoggerFactory;
public class DefaultConnectionStrategy extends ZkClientConnectionStrategy {
private static Logger log = LoggerFactory.getLogger(DefaultConnectionStrategy.class);
- private ScheduledExecutorService executor;
@Override
public void connect(String serverAddress, int timeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException {
@@ -43,32 +41,17 @@ public class DefaultConnectionStrategy e
@Override
public void reconnect(final String serverAddress, final int zkClientTimeout,
final Watcher watcher, final ZkUpdate updater) throws IOException {
- log.info("Starting reconnect to ZooKeeper attempts ...");
- executor = Executors.newScheduledThreadPool(1);
- executor.schedule(new Runnable() {
- private int delay = 1000;
- public void run() {
- log.info("Attempting the connect...");
- boolean connected = false;
- try {
- updater.update(new SolrZooKeeper(serverAddress, zkClientTimeout, watcher));
- log.info("Reconnected to ZooKeeper");
- connected = true;
- } catch (Exception e) {
- log.error("", e);
- log.info("Reconnect to ZooKeeper failed");
- }
- if(connected) {
- executor.shutdownNow();
- } else {
- if(delay < 240000) {
- delay = delay * 2;
- }
- executor.schedule(this, delay, TimeUnit.MILLISECONDS);
- }
-
- }
- }, 1000, TimeUnit.MILLISECONDS);
+ log.info("Connection expired - starting a new one...");
+
+ try {
+ updater
+ .update(new SolrZooKeeper(serverAddress, zkClientTimeout, watcher));
+ log.info("Reconnected to ZooKeeper");
+ } catch (Exception e) {
+ SolrException.log(log, "Reconnect to ZooKeeper failed", e);
+ log.info("Reconnect to ZooKeeper failed");
+ }
+
}
}
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,63 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Class to partition int range into n ranges.
+ *
+ */
+public class HashPartitioner {
+
+ public static class Range {
+ public long min;
+ public long max;
+
+ public Range(long min, long max) {
+ this.min = min;
+ this.max = max;
+ }
+ }
+
+ /**
+ * works up to 65537 before requested num of ranges is one short
+ *
+ * @param partitions
+ * @return
+ */
+ public List<Range> partitionRange(int partitions) {
+ // some hokey code to partition the int space
+ long range = Integer.MAX_VALUE + (Math.abs((long) Integer.MIN_VALUE));
+ long srange = range / partitions;
+
+ List<Range> ranges = new ArrayList<Range>(partitions);
+
+ long end = 0;
+ long start = Integer.MIN_VALUE;
+
+ while (end < Integer.MAX_VALUE) {
+ end = start + srange;
+ ranges.add(new Range(start, end));
+ start = end + 1L;
+ }
+
+ return ranges;
+ }
+}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java Wed Jan 25 19:49:26 2012
@@ -17,12 +17,14 @@ package org.apache.solr.common.cloud;
* limitations under the License.
*/
+import org.apache.noggit.JSONWriter;
+
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
-
// immutable
-public class Slice {
+public class Slice implements JSONWriter.Writable {
private final Map<String,ZkNodeProps> shards;
private final String name;
@@ -34,8 +36,27 @@ public class Slice {
public Map<String,ZkNodeProps> getShards() {
return Collections.unmodifiableMap(shards);
}
+
+ public Map<String,ZkNodeProps> getShardsCopy() {
+ Map<String,ZkNodeProps> shards = new HashMap<String,ZkNodeProps>();
+ for (Map.Entry<String,ZkNodeProps> entry : this.shards.entrySet()) {
+ ZkNodeProps zkProps = new ZkNodeProps(entry.getValue());
+ shards.put(entry.getKey(), zkProps);
+ }
+ return shards;
+ }
public String getName() {
return name;
}
+
+ @Override
+ public String toString() {
+ return "Slice [shards=" + shards + ", name=" + name + "]";
+ }
+
+ @Override
+ public void write(JSONWriter jsonWriter) {
+ jsonWriter.write(shards);
+ }
}