You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2013/10/21 20:58:44 UTC
svn commit: r1534320 [29/39] - in /lucene/dev/branches/lucene4956: ./
dev-tools/ dev-tools/idea/.idea/ dev-tools/idea/lucene/expressions/
dev-tools/idea/solr/contrib/velocity/ dev-tools/maven/
dev-tools/maven/lucene/ dev-tools/maven/lucene/expressions/...
Modified: lucene/dev/branches/lucene4956/solr/contrib/uima/src/java/org/apache/solr/uima/processor/UIMAUpdateRequestProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/contrib/uima/src/java/org/apache/solr/uima/processor/UIMAUpdateRequestProcessor.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/contrib/uima/src/java/org/apache/solr/uima/processor/UIMAUpdateRequestProcessor.java (original)
+++ lucene/dev/branches/lucene4956/solr/contrib/uima/src/java/org/apache/solr/uima/processor/UIMAUpdateRequestProcessor.java Mon Oct 21 18:58:24 2013
@@ -17,53 +17,49 @@ package org.apache.solr.uima.processor;
* limitations under the License.
*/
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.core.SolrCore;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.uima.processor.SolrUIMAConfiguration.MapField;
-import org.apache.lucene.analysis.uima.ae.AEProvider;
-import org.apache.lucene.analysis.uima.ae.AEProviderFactory;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.uima.analysis_engine.AnalysisEngine;
import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
import org.apache.uima.jcas.JCas;
import org.apache.uima.resource.ResourceInitializationException;
+import org.apache.uima.util.JCasPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Map;
-
/**
* Update document(s) to be indexed with UIMA extracted information
- *
+ *
*/
public class UIMAUpdateRequestProcessor extends UpdateRequestProcessor {
-
- private final Logger log = LoggerFactory.getLogger(UIMAUpdateRequestProcessor.class);
-
+
+ private final Logger log = LoggerFactory
+ .getLogger(UIMAUpdateRequestProcessor.class);
+
SolrUIMAConfiguration solrUIMAConfiguration;
-
- private AEProvider aeProvider;
- private SolrCore solrCore;
-
- public UIMAUpdateRequestProcessor(UpdateRequestProcessor next, SolrCore solrCore,
- SolrUIMAConfiguration config) {
+ private AnalysisEngine ae;
+
+ private JCasPool pool;
+
+ public UIMAUpdateRequestProcessor(UpdateRequestProcessor next,
+ String coreName, SolrUIMAConfiguration config, AnalysisEngine ae,
+ JCasPool pool) {
super(next);
- initialize(solrCore, config);
- }
-
- private void initialize(SolrCore solrCore, SolrUIMAConfiguration config) {
- this.solrCore = solrCore;
+ this.ae = ae;
+ this.pool = pool;
solrUIMAConfiguration = config;
- aeProvider = AEProviderFactory.getInstance().getAEProvider(solrCore.getName(),
- solrUIMAConfiguration.getAePath(), solrUIMAConfiguration.getRuntimeParameters());
}
-
+
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
String text = null;
@@ -76,54 +72,66 @@ public class UIMAUpdateRequestProcessor
for (String currentText : texts) {
text = currentText;
if (text != null && text.length() > 0) {
- /* process the text value */
- JCas jcas = processText(text);
-
- UIMAToSolrMapper uimaToSolrMapper = new UIMAToSolrMapper(solrInputDocument, jcas);
- /* get field mapping from config */
- Map<String, Map<String, MapField>> typesAndFeaturesFieldsMap = solrUIMAConfiguration
- .getTypesFeaturesFieldsMapping();
- /* map type features on fields */
- for (String typeFQN : typesAndFeaturesFieldsMap.keySet()) {
- uimaToSolrMapper.map(typeFQN, typesAndFeaturesFieldsMap.get(typeFQN));
+ /* create a JCas which contain the text to analyze */
+ JCas jcas = pool.getJCas(0);
+ try {
+ /* process the text value */
+ processText(text, jcas);
+
+ UIMAToSolrMapper uimaToSolrMapper = new UIMAToSolrMapper(
+ solrInputDocument, jcas);
+ /* get field mapping from config */
+ Map<String,Map<String,MapField>> typesAndFeaturesFieldsMap = solrUIMAConfiguration
+ .getTypesFeaturesFieldsMapping();
+ /* map type features on fields */
+ for (Entry<String,Map<String,MapField>> entry : typesAndFeaturesFieldsMap
+ .entrySet()) {
+ uimaToSolrMapper.map(entry.getKey(), entry.getValue());
+ }
+ } finally {
+ pool.releaseJCas(jcas);
}
}
}
} catch (Exception e) {
String logField = solrUIMAConfiguration.getLogField();
- if(logField == null){
- SchemaField uniqueKeyField = cmd.getReq().getSchema().getUniqueKeyField();
- if(uniqueKeyField != null){
+ if (logField == null) {
+ SchemaField uniqueKeyField = cmd.getReq().getSchema()
+ .getUniqueKeyField();
+ if (uniqueKeyField != null) {
logField = uniqueKeyField.getName();
}
}
- String optionalFieldInfo = logField == null ? "." :
- new StringBuilder(". ").append(logField).append("=")
- .append((String)cmd.getSolrInputDocument().getField(logField).getValue())
- .append(", ").toString();
+ String optionalFieldInfo = logField == null ? "."
+ : new StringBuilder(". ")
+ .append(logField)
+ .append("=")
+ .append(
+ (String) cmd.getSolrInputDocument().getField(logField)
+ .getValue()).append(", ").toString();
int len;
String debugString;
if (text != null && text.length() > 0) {
len = Math.min(text.length(), 100);
- debugString = new StringBuilder(" text=\"").append(text.substring(0, len)).append("...\"").toString();
- }
- else {
+ debugString = new StringBuilder(" text=\"")
+ .append(text.substring(0, len)).append("...\"").toString();
+ } else {
debugString = " null text";
}
if (solrUIMAConfiguration.isIgnoreErrors()) {
- log.warn("skip the text processing due to {}",new StringBuilder()
- .append(e.getLocalizedMessage()).append(optionalFieldInfo)
- .append(debugString));
+ log.warn(
+ "skip the text processing due to {}",
+ new StringBuilder().append(e.getLocalizedMessage())
+ .append(optionalFieldInfo).append(debugString));
} else {
- throw new SolrException(ErrorCode.SERVER_ERROR,
- new StringBuilder("processing error ")
- .append(e.getLocalizedMessage()).append(optionalFieldInfo)
- .append(debugString).toString(), e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, new StringBuilder(
+ "processing error ").append(e.getLocalizedMessage())
+ .append(optionalFieldInfo).append(debugString).toString(), e);
}
}
super.processAdd(cmd);
}
-
+
/*
* get the texts to analyze from the corresponding fields
*/
@@ -134,30 +142,31 @@ public class UIMAUpdateRequestProcessor
if (merge) {
StringBuilder unifiedText = new StringBuilder("");
for (String aFieldsToAnalyze : fieldsToAnalyze) {
- unifiedText.append(String.valueOf(solrInputDocument.getFieldValue(aFieldsToAnalyze)));
+ unifiedText.append(String.valueOf(solrInputDocument
+ .getFieldValue(aFieldsToAnalyze)));
}
textVals = new String[1];
textVals[0] = unifiedText.toString();
} else {
textVals = new String[fieldsToAnalyze.length];
for (int i = 0; i < fieldsToAnalyze.length; i++) {
- textVals[i] = String.valueOf(solrInputDocument.getFieldValue(fieldsToAnalyze[i]));
+ textVals[i] = String.valueOf(solrInputDocument
+ .getFieldValue(fieldsToAnalyze[i]));
}
}
return textVals;
}
-
- /* process a field value executing UIMA the CAS containing it as document text */
- private JCas processText(String textFieldValue) throws ResourceInitializationException,
- AnalysisEngineProcessException {
+
+ /*
+ * process a field value executing UIMA on the JCas containing it as document
+ * text
+ */
+ private void processText(String textFieldValue, JCas jcas)
+ throws ResourceInitializationException, AnalysisEngineProcessException {
if (log.isDebugEnabled()) {
log.debug("Analyzing text");
}
- /* get the UIMA analysis engine */
- AnalysisEngine ae = aeProvider.getAE();
- /* create a JCas which contain the text to analyze */
- JCas jcas = ae.newJCas();
jcas.setDocumentText(textFieldValue);
/* perform analysis on text field */
@@ -165,7 +174,6 @@ public class UIMAUpdateRequestProcessor
if (log.isDebugEnabled()) {
log.debug("Text processing completed");
}
- return jcas;
}
-
+
}
Modified: lucene/dev/branches/lucene4956/solr/contrib/uima/src/java/org/apache/solr/uima/processor/UIMAUpdateRequestProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/contrib/uima/src/java/org/apache/solr/uima/processor/UIMAUpdateRequestProcessorFactory.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/contrib/uima/src/java/org/apache/solr/uima/processor/UIMAUpdateRequestProcessorFactory.java (original)
+++ lucene/dev/branches/lucene4956/solr/contrib/uima/src/java/org/apache/solr/uima/processor/UIMAUpdateRequestProcessorFactory.java Mon Oct 21 18:58:24 2013
@@ -17,20 +17,29 @@ package org.apache.solr.uima.processor;
* limitations under the License.
*/
+import org.apache.lucene.analysis.uima.ae.AEProvider;
+import org.apache.lucene.analysis.uima.ae.AEProviderFactory;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
+import org.apache.uima.analysis_engine.AnalysisEngine;
+import org.apache.uima.resource.ResourceInitializationException;
+import org.apache.uima.util.JCasPool;
/**
* Factory for {@link UIMAUpdateRequestProcessor}
*
- *
+ *
*/
-public class UIMAUpdateRequestProcessorFactory extends UpdateRequestProcessorFactory {
+public class UIMAUpdateRequestProcessorFactory extends
+ UpdateRequestProcessorFactory {
private NamedList<Object> args;
+ private AnalysisEngine ae;
+ private JCasPool pool;
@SuppressWarnings("unchecked")
@Override
@@ -39,10 +48,26 @@ public class UIMAUpdateRequestProcessorF
}
@Override
- public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp,
- UpdateRequestProcessor next) {
- return new UIMAUpdateRequestProcessor(next, req.getCore(),
- new SolrUIMAConfigurationReader(args).readSolrUIMAConfiguration());
+ public UpdateRequestProcessor getInstance(SolrQueryRequest req,
+ SolrQueryResponse rsp, UpdateRequestProcessor next) {
+ SolrUIMAConfiguration configuration = new SolrUIMAConfigurationReader(args)
+ .readSolrUIMAConfiguration();
+ synchronized (this) {
+ if (ae == null && pool == null) {
+ AEProvider aeProvider = AEProviderFactory.getInstance().getAEProvider(
+ req.getCore().getName(), configuration.getAePath(),
+ configuration.getRuntimeParameters());
+ try {
+ ae = aeProvider.getAE();
+ pool = new JCasPool(10, ae);
+ } catch (ResourceInitializationException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+ }
+
+ return new UIMAUpdateRequestProcessor(next, req.getCore().getName(),
+ configuration, ae, pool);
}
-
+
}
Modified: lucene/dev/branches/lucene4956/solr/contrib/uima/src/test/org/apache/solr/uima/processor/UIMAUpdateRequestProcessorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/contrib/uima/src/test/org/apache/solr/uima/processor/UIMAUpdateRequestProcessorTest.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/contrib/uima/src/test/org/apache/solr/uima/processor/UIMAUpdateRequestProcessorTest.java (original)
+++ lucene/dev/branches/lucene4956/solr/contrib/uima/src/test/org/apache/solr/uima/processor/UIMAUpdateRequestProcessorTest.java Mon Oct 21 18:58:24 2013
@@ -93,7 +93,6 @@ public class UIMAUpdateRequestProcessorT
@Test
public void testProcessing() throws Exception {
-
addDoc("uima", adoc(
"id",
"2312312321312",
@@ -185,6 +184,13 @@ public class UIMAUpdateRequestProcessorT
}
}
+ @Test
+ public void testMultiplierProcessing() throws Exception {
+ for (int i = 0; i < RANDOM_MULTIPLIER; i++) {
+ testProcessing();
+ }
+ }
+
private void addDoc(String chain, String doc) throws Exception {
Map<String, String[]> params = new HashMap<String, String[]>();
params.put(UpdateParams.UPDATE_CHAIN, new String[] { chain });
Modified: lucene/dev/branches/lucene4956/solr/contrib/velocity/ivy.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/contrib/velocity/ivy.xml?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/contrib/velocity/ivy.xml (original)
+++ lucene/dev/branches/lucene4956/solr/contrib/velocity/ivy.xml Mon Oct 21 18:58:24 2013
@@ -19,10 +19,10 @@
<ivy-module version="2.0">
<info organisation="org.apache.solr" module="velocity"/>
<dependencies>
- <dependency org="commons-beanutils" name="commons-beanutils" rev="1.7.0" transitive="false"/>
- <dependency org="commons-collections" name="commons-collections" rev="3.2.1" transitive="false"/>
- <dependency org="org.apache.velocity" name="velocity" rev="1.7" transitive="false"/>
- <dependency org="org.apache.velocity" name="velocity-tools" rev="2.0" transitive="false"/>
+ <dependency org="commons-beanutils" name="commons-beanutils" rev="${/commons-beanutils/commons-beanutils}" transitive="false"/>
+ <dependency org="commons-collections" name="commons-collections" rev="${/commons-collections/commons-collections}" transitive="false"/>
+ <dependency org="org.apache.velocity" name="velocity" rev="${/org.apache.velocity/velocity}" transitive="false"/>
+ <dependency org="org.apache.velocity" name="velocity-tools" rev="${/org.apache.velocity/velocity-tools}" transitive="false"/>
<exclude org="*" ext="*" matcher="regexp" type="${ivy.exclude.types}"/>
</dependencies>
</ivy-module>
Modified: lucene/dev/branches/lucene4956/solr/contrib/velocity/src/java/org/apache/solr/response/VelocityResponseWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/contrib/velocity/src/java/org/apache/solr/response/VelocityResponseWriter.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/contrib/velocity/src/java/org/apache/solr/response/VelocityResponseWriter.java (original)
+++ lucene/dev/branches/lucene4956/solr/contrib/velocity/src/java/org/apache/solr/response/VelocityResponseWriter.java Mon Oct 21 18:58:24 2013
@@ -67,7 +67,6 @@ public class VelocityResponseWriter impl
} catch (ClassCastException e) {
// known edge case where QueryResponse's extraction assumes "response" is a SolrDocumentList
// (AnalysisRequestHandler emits a "response")
- e.printStackTrace();
rsp = new SolrResponseBase();
rsp.setResponse(parsedResponse);
}
@@ -121,25 +120,7 @@ public class VelocityResponseWriter impl
SolrVelocityResourceLoader resourceLoader =
new SolrVelocityResourceLoader(request.getCore().getSolrConfig().getResourceLoader());
engine.setProperty("solr.resource.loader.instance", resourceLoader);
-
- File fileResourceLoaderBaseDir = null;
- try {
- String template_root = request.getParams().get("v.base_dir");
- fileResourceLoaderBaseDir = new File(request.getCore().getResourceLoader().getConfigDir(), "velocity");
- if (template_root != null) {
- fileResourceLoaderBaseDir = new File(template_root);
- }
- } catch (SolrException e) {
- // no worries... probably in ZooKeeper mode and getConfigDir() isn't available, so we'll just ignore omit
- // the file system resource loader
- }
-
- if (fileResourceLoaderBaseDir != null) {
- engine.setProperty(RuntimeConstants.FILE_RESOURCE_LOADER_PATH, fileResourceLoaderBaseDir.getAbsolutePath());
- engine.setProperty(RuntimeConstants.RESOURCE_LOADER, "params,file,solr");
- } else {
- engine.setProperty(RuntimeConstants.RESOURCE_LOADER, "params,solr");
- }
+ engine.setProperty(RuntimeConstants.RESOURCE_LOADER, "params,solr");
// TODO: Externalize Velocity properties
String propFile = request.getParams().get("v.properties");
Modified: lucene/dev/branches/lucene4956/solr/core/ivy.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/ivy.xml?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/ivy.xml (original)
+++ lucene/dev/branches/lucene4956/solr/core/ivy.xml Mon Oct 21 18:58:24 2013
@@ -16,57 +16,59 @@
specific language governing permissions and limitations
under the License.
-->
-<!DOCTYPE ivy-module [
- <!ENTITY hadoop.version "2.0.5-alpha">
-]>
<ivy-module version="2.0" xmlns:maven="http://ant.apache.org/ivy/maven">
<info organisation="org.apache.solr" module="core"/>
<configurations>
<!-- artifacts in the "compile" and "compile.hadoop" configurations will go into solr/core/lib/ -->
<conf name="compile" transitive="false"/>
- <conf name="test" transitive="false"/>
<conf name="compile.hadoop" transitive="false"/>
<!-- artifacts in the "test" and "test.DfsMiniCluster" configuration will go into solr/core/test-lib/ -->
+ <conf name="test" transitive="false"/>
<conf name="test.DfsMiniCluster" transitive="false"/>
</configurations>
<dependencies>
- <dependency org="commons-codec" name="commons-codec" rev="1.7" conf="compile->*"/>
- <dependency org="commons-fileupload" name="commons-fileupload" rev="1.2.1" conf="compile->*"/>
- <dependency org="commons-cli" name="commons-cli" rev="1.2" conf="compile->*"/>
- <dependency org="commons-lang" name="commons-lang" rev="2.6" conf="compile->*"/>
- <dependency org="com.google.guava" name="guava" rev="14.0.1" conf="compile->*"/>
- <dependency org="com.spatial4j" name="spatial4j" rev="0.3" conf="compile->*"/>
- <dependency org="org.restlet.jee" name="org.restlet" rev="2.1.1" conf="compile->*"/>
- <dependency org="org.restlet.jee" name="org.restlet.ext.servlet" rev="2.1.1" conf="compile->*"/>
- <dependency org="joda-time" name="joda-time" rev="2.2" conf="compile->*"/>
- <dependency org="dom4j" name="dom4j" rev="1.6.1" transitive="false"/>
+ <dependency org="commons-codec" name="commons-codec" rev="${/commons-codec/commons-codec}" conf="compile->*"/>
+ <dependency org="commons-fileupload" name="commons-fileupload" rev="${/commons-fileupload/commons-fileupload}" conf="compile->*"/>
+ <dependency org="commons-cli" name="commons-cli" rev="${/commons-cli/commons-cli}" conf="compile->*"/>
+ <dependency org="commons-lang" name="commons-lang" rev="${/commons-lang/commons-lang}" conf="compile->*"/>
+ <dependency org="com.google.guava" name="guava" rev="${/com.google.guava/guava}" conf="compile->*"/>
+ <dependency org="com.spatial4j" name="spatial4j" rev="${/com.spatial4j/spatial4j}" conf="compile->*"/>
+ <dependency org="org.restlet.jee" name="org.restlet" rev="${/org.restlet.jee/org.restlet}" conf="compile->*"/>
+ <dependency org="org.restlet.jee" name="org.restlet.ext.servlet" rev="${/org.restlet.jee/org.restlet.ext.servlet}" conf="compile->*"/>
+ <dependency org="joda-time" name="joda-time" rev="${/joda-time/joda-time}" conf="compile->*"/>
+ <dependency org="dom4j" name="dom4j" rev="${/dom4j/dom4j}" transitive="false"/>
- <dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1" conf="test->*"/>
- <dependency org="org.easymock" name="easymock" rev="3.0" conf="test->*"/>
- <dependency org="cglib" name="cglib-nodep" rev="2.2" conf="test->*"/>
- <dependency org="org.objenesis" name="objenesis" rev="1.2" conf="test->*"/>
+ <dependency org="javax.servlet" name="javax.servlet-api" rev="${/javax.servlet/javax.servlet-api}" conf="test->*"/>
+ <dependency org="org.easymock" name="easymock" rev="${/org.easymock/easymock}" conf="test->*"/>
+ <dependency org="cglib" name="cglib-nodep" rev="${/cglib/cglib-nodep}" conf="test->*"/>
+ <dependency org="org.objenesis" name="objenesis" rev="${/org.objenesis/objenesis}" conf="test->*"/>
- <dependency org="org.apache.hadoop" name="hadoop-common" rev="&hadoop.version;" conf="compile.hadoop->*"/>
- <dependency org="org.apache.hadoop" name="hadoop-hdfs" rev="&hadoop.version;" conf="compile.hadoop->*"/>
- <dependency org="org.apache.hadoop" name="hadoop-annotations" rev="&hadoop.version;" conf="compile.hadoop->*"/>
- <dependency org="org.apache.hadoop" name="hadoop-auth" rev="&hadoop.version;" conf="compile.hadoop->*"/>
- <dependency org="commons-configuration" name="commons-configuration" rev="1.6" conf="compile.hadoop->*"/>
- <dependency org="com.google.protobuf" name="protobuf-java" rev="2.4.0a" conf="compile.hadoop->*"/>
- <dependency org="com.googlecode.concurrentlinkedhashmap" name="concurrentlinkedhashmap-lru" rev="1.2" conf="compile.hadoop->*"/>
+ <dependency org="org.apache.hadoop" name="hadoop-common" rev="${/org.apache.hadoop/hadoop-common}" conf="compile.hadoop->*"/>
+ <!--
+ hadoop-hdfs, hadoop-annotations and hadoop-auth are runtime dependencies,
+ so even though they are not compile-time dependencies, they are included
+ here as such so that they are included in the runtime distribution.
+ -->
+ <dependency org="org.apache.hadoop" name="hadoop-hdfs" rev="${/org.apache.hadoop/hadoop-hdfs}" conf="compile.hadoop->*"/>
+ <dependency org="org.apache.hadoop" name="hadoop-annotations" rev="${/org.apache.hadoop/hadoop-annotations}" conf="compile.hadoop->*"/>
+ <dependency org="org.apache.hadoop" name="hadoop-auth" rev="${/org.apache.hadoop/hadoop-auth}" conf="compile.hadoop->*"/>
+ <dependency org="commons-configuration" name="commons-configuration" rev="${/commons-configuration/commons-configuration}" conf="compile.hadoop->*"/>
+ <dependency org="com.google.protobuf" name="protobuf-java" rev="${/com.google.protobuf/protobuf-java}" conf="compile.hadoop->*"/>
+ <dependency org="com.googlecode.concurrentlinkedhashmap" name="concurrentlinkedhashmap-lru" rev="${/com.googlecode.concurrentlinkedhashmap/concurrentlinkedhashmap-lru}" conf="compile.hadoop->*"/>
<!-- Hadoop DfsMiniCluster Dependencies-->
- <dependency org="org.apache.hadoop" name="hadoop-common" rev="&hadoop.version;" conf="test.DfsMiniCluster->*">
+ <dependency org="org.apache.hadoop" name="hadoop-common" rev="${/org.apache.hadoop/hadoop-common}" conf="test.DfsMiniCluster->*">
<artifact name="hadoop-common" type="test" ext="jar" maven:classifier="tests" />
</dependency>
- <dependency org="org.apache.hadoop" name="hadoop-hdfs" rev="&hadoop.version;" conf="test.DfsMiniCluster->*">
+ <dependency org="org.apache.hadoop" name="hadoop-hdfs" rev="${/org.apache.hadoop/hadoop-hdfs}" conf="test.DfsMiniCluster->*">
<artifact name="hadoop-hdfs" type="test" ext="jar" maven:classifier="tests" />
</dependency>
- <dependency org="org.mortbay.jetty" name="jetty" rev="6.1.26" conf="test.DfsMiniCluster->*"/>
- <dependency org="org.mortbay.jetty" name="jetty-util" rev="6.1.26" conf="test.DfsMiniCluster->*"/>
- <dependency org="com.sun.jersey" name="jersey-core" rev="1.16" conf="test.DfsMiniCluster->*"/>
- <dependency org="commons-collections" name="commons-collections" rev="3.2.1" conf="test.DfsMiniCluster->*"/>
+ <dependency org="org.mortbay.jetty" name="jetty" rev="${/org.mortbay.jetty/jetty}" conf="test.DfsMiniCluster->*"/>
+ <dependency org="org.mortbay.jetty" name="jetty-util" rev="${/org.mortbay.jetty/jetty-util}" conf="test.DfsMiniCluster->*"/>
+ <dependency org="com.sun.jersey" name="jersey-core" rev="${/com.sun.jersey/jersey-core}" conf="test.DfsMiniCluster->*"/>
+ <dependency org="commons-collections" name="commons-collections" rev="${/commons-collections/commons-collections}" conf="test.DfsMiniCluster->*"/>
<exclude org="*" ext="*" matcher="regexp" type="${ivy.exclude.types}"/>
</dependencies>
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java Mon Oct 21 18:58:24 2013
@@ -26,7 +26,6 @@ import java.util.Random;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
-
import java.net.URL;
import java.net.MalformedURLException;
@@ -43,24 +42,21 @@ import javax.servlet.http.HttpServletRes
import org.apache.solr.servlet.SolrDispatchFilter;
import org.eclipse.jetty.server.Connector;
-import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.bio.SocketConnector;
-import org.eclipse.jetty.server.handler.ContextHandlerCollection;
+import org.eclipse.jetty.server.handler.GzipHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.server.session.HashSessionIdManager;
import org.eclipse.jetty.server.ssl.SslConnector;
-import org.eclipse.jetty.server.ssl.SslSocketConnector;
import org.eclipse.jetty.server.ssl.SslSelectChannelConnector;
-import org.eclipse.jetty.server.handler.GzipHandler;
-import org.eclipse.jetty.server.session.HashSessionIdManager;
+import org.eclipse.jetty.server.ssl.SslSocketConnector;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Logger;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
-import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
/**
* Run solr using jetty
@@ -98,6 +94,8 @@ public class JettySolrRunner {
/** Maps servlet holders (i.e. factories: class + init params) to path specs */
private SortedMap<ServletHolder,String> extraServlets = new TreeMap<ServletHolder,String>();
+ private SSLConfig sslConfig;
+
public static class DebugFilter implements Filter {
public int requestsToKeep = 10;
private AtomicLong nRequests = new AtomicLong();
@@ -174,6 +172,25 @@ public class JettySolrRunner {
this.solrConfigFilename = solrConfigFilename;
this.schemaFilename = schemaFileName;
}
+
+ public JettySolrRunner(String solrHome, String context, int port,
+ String solrConfigFilename, String schemaFileName, boolean stopAtShutdown,
+ SortedMap<ServletHolder,String> extraServlets, SSLConfig sslConfig) {
+ if (null != extraServlets) { this.extraServlets.putAll(extraServlets); }
+ this.init(solrHome, context, port, stopAtShutdown);
+ this.solrConfigFilename = solrConfigFilename;
+ this.schemaFilename = schemaFileName;
+ this.sslConfig = sslConfig;
+ }
+
+ public static class SSLConfig {
+ public boolean useSsl;
+ public boolean clientAuth;
+ public String keyStore;
+ public String keyStorePassword;
+ public String trustStore;
+ public String trustStorePassword;
+ }
private void init(String solrHome, String context, int port, boolean stopAtShutdown) {
this.context = context;
@@ -191,34 +208,16 @@ public class JettySolrRunner {
// if this property is true, then jetty will be configured to use SSL
// leveraging the same system properties as java to specify
- // the keystore/truststore if they are set
+ // the keystore/truststore if they are set unless specific config
+ // is passed via the constructor.
//
// This means we will use the same truststore, keystore (and keys) for
// the server as well as any client actions taken by this JVM in
// talking to that server, but for the purposes of testing that should
// be good enough
- final boolean useSsl = Boolean.getBoolean("tests.jettySsl");
+ final boolean useSsl = sslConfig == null ? false : sslConfig.useSsl;
final SslContextFactory sslcontext = new SslContextFactory(false);
-
- if (useSsl) {
- if (null != System.getProperty("javax.net.ssl.keyStore")) {
- sslcontext.setKeyStorePath
- (System.getProperty("javax.net.ssl.keyStore"));
- }
- if (null != System.getProperty("javax.net.ssl.keyStorePassword")) {
- sslcontext.setKeyStorePassword
- (System.getProperty("javax.net.ssl.keyStorePassword"));
- }
- if (null != System.getProperty("javax.net.ssl.trustStore")) {
- sslcontext.setTrustStore
- (System.getProperty("javax.net.ssl.trustStore"));
- }
- if (null != System.getProperty("javax.net.ssl.trustStorePassword")) {
- sslcontext.setTrustStorePassword
- (System.getProperty("javax.net.ssl.trustStorePassword"));
- }
- sslcontext.setNeedClientAuth(Boolean.getBoolean("tests.jettySsl.clientAuth"));
- }
+ sslInit(useSsl, sslcontext);
final Connector connector;
final QueuedThreadPool threadPool;
@@ -329,6 +328,48 @@ public class JettySolrRunner {
}
+ private void sslInit(final boolean useSsl, final SslContextFactory sslcontext) {
+ if (useSsl && sslConfig != null) {
+ if (null != sslConfig.keyStore) {
+ sslcontext.setKeyStorePath(sslConfig.keyStore);
+ }
+ if (null != sslConfig.keyStorePassword) {
+ sslcontext.setKeyStorePassword(System
+ .getProperty("solr.javax.net.ssl.keyStorePassword"));
+ }
+ if (null != sslConfig.trustStore) {
+ sslcontext.setTrustStore(System
+ .getProperty("solr.javax.net.ssl.trustStore"));
+ }
+ if (null != sslConfig.trustStorePassword) {
+ sslcontext.setTrustStorePassword(sslConfig.trustStorePassword);
+ }
+ sslcontext.setNeedClientAuth(sslConfig.clientAuth);
+ } else {
+ boolean jettySsl = Boolean.getBoolean("tests.jettySsl");
+
+ if (jettySsl) {
+ if (null != System.getProperty("javax.net.ssl.keyStore")) {
+ sslcontext.setKeyStorePath
+ (System.getProperty("javax.net.ssl.keyStore"));
+ }
+ if (null != System.getProperty("javax.net.ssl.keyStorePassword")) {
+ sslcontext.setKeyStorePassword
+ (System.getProperty("javax.net.ssl.keyStorePassword"));
+ }
+ if (null != System.getProperty("javax.net.ssl.trustStore")) {
+ sslcontext.setTrustStore
+ (System.getProperty("javax.net.ssl.trustStore"));
+ }
+ if (null != System.getProperty("javax.net.ssl.trustStorePassword")) {
+ sslcontext.setTrustStorePassword
+ (System.getProperty("javax.net.ssl.trustStorePassword"));
+ }
+ sslcontext.setNeedClientAuth(Boolean.getBoolean("tests.jettySsl.clientAuth"));
+ }
+ }
+ }
+
public FilterHolder getDispatchFilter() {
return dispatchFilter;
}
@@ -357,37 +398,40 @@ public class JettySolrRunner {
startedBefore = true;
}
- if( dataDir != null) {
+ if (dataDir != null) {
System.setProperty("solr.data.dir", dataDir);
}
- if( solrUlogDir != null) {
+ if (solrUlogDir != null) {
System.setProperty("solr.ulog.dir", solrUlogDir);
}
- if(shards != null) {
+ if (shards != null) {
System.setProperty("shard", shards);
}
if (coreNodeName != null) {
System.setProperty("coreNodeName", coreNodeName);
}
-
- if (!server.isRunning()) {
- server.start();
- }
- synchronized (JettySolrRunner.this) {
- int cnt = 0;
- while (!waitOnSolr) {
- this.wait(100);
- if (cnt++ == 5) {
- throw new RuntimeException("Jetty/Solr unresponsive");
+ try {
+
+ if (!server.isRunning()) {
+ server.start();
+ }
+ synchronized (JettySolrRunner.this) {
+ int cnt = 0;
+ while (!waitOnSolr) {
+ this.wait(100);
+ if (cnt++ == 5) {
+ throw new RuntimeException("Jetty/Solr unresponsive");
+ }
}
}
+ } finally {
+
+ System.clearProperty("shard");
+ System.clearProperty("solr.data.dir");
+ System.clearProperty("coreNodeName");
+ System.clearProperty("solr.ulog.dir");
}
- System.clearProperty("shard");
- System.clearProperty("solr.data.dir");
- System.clearProperty("coreNodeName");
- System.clearProperty("solr.ulog.dir");
-
}
public void stop() throws Exception {
@@ -499,6 +543,10 @@ public class JettySolrRunner {
public void setCoreNodeName(String coreNodeName) {
this.coreNodeName = coreNodeName;
}
+
+ public String getSolrHome() {
+ return solrHome;
+ }
}
class NoLog implements Logger {
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java Mon Oct 21 18:58:24 2013
@@ -38,6 +38,7 @@ public class CloudDescriptor {
* Use the values from {@link Slice} instead */
volatile String shardRange = null;
volatile String shardState = Slice.ACTIVE;
+ volatile String shardParent = null;
volatile boolean isLeader = false;
volatile String lastPublished = ZkStateReader.ACTIVE;
@@ -45,6 +46,7 @@ public class CloudDescriptor {
public static final String SHARD_STATE = "shardState";
public static final String NUM_SHARDS = "numShards";
public static final String SHARD_RANGE = "shardRange";
+ public static final String SHARD_PARENT = "shardParent";
public CloudDescriptor(String coreName, Properties props) {
this.shardId = props.getProperty(CoreDescriptor.CORE_SHARD, null);
@@ -55,6 +57,7 @@ public class CloudDescriptor {
this.shardState = props.getProperty(CloudDescriptor.SHARD_STATE, Slice.ACTIVE);
this.numShards = PropertiesUtil.toInteger(props.getProperty(CloudDescriptor.NUM_SHARDS), null);
this.shardRange = props.getProperty(CloudDescriptor.SHARD_RANGE, null);
+ this.shardParent = props.getProperty(CloudDescriptor.SHARD_PARENT, null);
}
public String getLastPublished() {
@@ -134,4 +137,12 @@ public class CloudDescriptor {
public void setShardState(String shardState) {
this.shardState = shardState;
}
+
+ public String getShardParent() {
+ return shardParent;
+ }
+
+ public void setShardParent(String shardParent) {
+ this.shardParent = shardParent;
+ }
}
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/Overseer.java Mon Oct 21 18:58:24 2013
@@ -17,8 +17,17 @@ package org.apache.solr.cloud;
* the License.
*/
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClosableThread;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@@ -36,16 +45,6 @@ import org.apache.zookeeper.KeeperExcept
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
/**
* Cluster leader. Responsible node assignments, cluster state file?
*/
@@ -57,9 +56,10 @@ public class Overseer {
private static final int STATE_UPDATE_DELAY = 1500; // delay between cloud state updates
-
private static Logger log = LoggerFactory.getLogger(Overseer.class);
+ static enum LeaderStatus { DONT_KNOW, NO, YES };
+
private class ClusterStateUpdater implements Runnable, ClosableThread {
private final ZkStateReader reader;
@@ -83,7 +83,12 @@ public class Overseer {
@Override
public void run() {
- if (!this.isClosed && amILeader()) {
+ LeaderStatus isLeader = amILeader();
+ while (isLeader == LeaderStatus.DONT_KNOW) {
+ log.debug("am_i_leader unclear {}", isLeader);
+ isLeader = amILeader(); // not a no, not a yes, try ask again
+ }
+ if (!this.isClosed && LeaderStatus.YES == isLeader) {
// see if there's something left from the previous Overseer and re
// process all events that were not persisted into cloud state
synchronized (reader.getUpdateLock()) { // XXX this only protects
@@ -97,14 +102,24 @@ public class Overseer {
ClusterState clusterState = reader.getClusterState();
log.info("Replaying operations from work queue.");
- while (head != null && amILeader()) {
- final ZkNodeProps message = ZkNodeProps.load(head);
- final String operation = message.getStr(QUEUE_OPERATION);
- clusterState = processMessage(clusterState, message, operation);
- zkClient.setData(ZkStateReader.CLUSTER_STATE,
- ZkStateReader.toJSON(clusterState), true);
-
- workQueue.poll();
+ while (head != null) {
+ isLeader = amILeader();
+ if (LeaderStatus.NO == isLeader) {
+ break;
+ }
+ else if (LeaderStatus.YES == isLeader) {
+ final ZkNodeProps message = ZkNodeProps.load(head);
+ final String operation = message.getStr(QUEUE_OPERATION);
+ clusterState = processMessage(clusterState, message, operation);
+ zkClient.setData(ZkStateReader.CLUSTER_STATE,
+ ZkStateReader.toJSON(clusterState), true);
+
+ workQueue.poll(); // poll-ing removes the element we got by peek-ing
+ }
+ else {
+ log.info("am_i_leader unclear {}", isLeader);
+ // re-peek below in case our 'head' value is out-of-date by now
+ }
head = workQueue.peek();
}
@@ -127,7 +142,15 @@ public class Overseer {
}
log.info("Starting to work on the main queue");
- while (!this.isClosed && amILeader()) {
+ while (!this.isClosed) {
+ isLeader = amILeader();
+ if (LeaderStatus.NO == isLeader) {
+ break;
+ }
+ else if (LeaderStatus.YES != isLeader) {
+ log.debug("am_i_leader unclear {}", isLeader);
+ continue; // not a no, not a yes, try ask again
+ }
synchronized (reader.getUpdateLock()) {
try {
byte[] head = stateUpdateQueue.peek();
@@ -222,7 +245,7 @@ public class Overseer {
ArrayList<String> shardNames = new ArrayList<String>();
- if(ImplicitDocRouter.NAME.equals( message.getStr("router",DocRouter.DEFAULT_NAME))){
+ if(ImplicitDocRouter.NAME.equals( message.getStr("router.name",DocRouter.DEFAULT_NAME))){
getShardNames(shardNames,message.getStr("shards",DocRouter.DEFAULT_NAME));
} else {
int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, -1);
@@ -235,7 +258,7 @@ public class Overseer {
private ClusterState updateShardState(ClusterState clusterState, ZkNodeProps message) {
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
- log.info("Update shard state invoked for collection: " + collection);
+ log.info("Update shard state invoked for collection: " + collection + " with message: " + message);
for (String key : message.keySet()) {
if (ZkStateReader.COLLECTION_PROP.equals(key)) continue;
if (QUEUE_OPERATION.equals(key)) continue;
@@ -246,6 +269,9 @@ public class Overseer {
}
log.info("Update shard state " + key + " to " + message.getStr(key));
Map<String, Object> props = slice.shallowCopy();
+ if (Slice.RECOVERY.equals(props.get(Slice.STATE)) && Slice.ACTIVE.equals(message.getStr(key))) {
+ props.remove(Slice.PARENT);
+ }
props.put(Slice.STATE, message.getStr(key));
Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
clusterState = updateSlice(clusterState, collection, newSlice);
@@ -273,20 +299,28 @@ public class Overseer {
return clusterState;
}
- private boolean amILeader() {
- try {
- ZkNodeProps props = ZkNodeProps.load(zkClient.getData("/overseer_elect/leader", null, null, true));
- if(myId.equals(props.getStr("id"))) {
- return true;
- }
- } catch (KeeperException e) {
+ private LeaderStatus amILeader() {
+ try {
+ ZkNodeProps props = ZkNodeProps.load(zkClient.getData(
+ "/overseer_elect/leader", null, null, true));
+ if (myId.equals(props.getStr("id"))) {
+ return LeaderStatus.YES;
+ }
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.error("", e);
+ return LeaderStatus.DONT_KNOW;
+ } else if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
+ log.info("", e);
+ } else {
log.warn("", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
}
- log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
- return false;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
+ log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
+ return LeaderStatus.NO;
+ }
/**
* Try to assign core to the cluster.
@@ -295,17 +329,6 @@ public class Overseer {
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
assert collection.length() > 0 : message;
- try {
- if (!zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
- log.warn("Could not find collection node for " + collection + ", skipping publish state");
- }
- } catch (KeeperException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
-
String coreNodeName = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
if (coreNodeName == null) {
coreNodeName = getAssignedCoreNodeName(state, message);
@@ -320,18 +343,12 @@ public class Overseer {
Integer numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, null);
log.info("Update state numShards={} message={}", numShards, message);
- String router = message.getStr(OverseerCollectionProcessor.ROUTER,DocRouter.DEFAULT_NAME);
List<String> shardNames = new ArrayList<String>();
//collection does not yet exist, create placeholders if num shards is specified
boolean collectionExists = state.getCollections().contains(collection);
if (!collectionExists && numShards!=null) {
- if(ImplicitDocRouter.NAME.equals(router)){
- getShardNames(shardNames, message.getStr("shards",null));
- numShards = shardNames.size();
- }else {
- getShardNames(numShards, shardNames);
- }
+ getShardNames(numShards, shardNames);
state = createCollection(state, collection, shardNames, message);
}
@@ -362,14 +379,6 @@ public class Overseer {
replicaProps.putAll(message.getProperties());
// System.out.println("########## UPDATE MESSAGE: " + JSONUtil.toJSON(message));
if (slice != null) {
- String sliceState = slice.getState();
-
- // throw an exception if the slice is not yet active.
-
- //if(!sliceState.equals(Slice.ACTIVE)) {
- // throw new SolrException(ErrorCode.BAD_REQUEST, "Can not assign core to a non-active slice [" + slice.getName() + "]");
- //}
-
Replica oldReplica = slice.getReplicasMap().get(coreNodeName);
if (oldReplica != null && oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
@@ -398,6 +407,7 @@ public class Overseer {
// remove shard specific properties
String shardRange = (String) replicaProps.remove(ZkStateReader.SHARD_RANGE_PROP);
String shardState = (String) replicaProps.remove(ZkStateReader.SHARD_STATE_PROP);
+ String shardParent = (String) replicaProps.remove(ZkStateReader.SHARD_PARENT_PROP);
Replica replica = new Replica(coreNodeName, replicaProps);
@@ -408,6 +418,9 @@ public class Overseer {
Map<String,Replica> replicas;
if (slice != null) {
+ state = checkAndCompleteShardSplit(state, collection, coreNodeName, sliceName, replicaProps);
+ // get the current slice again because it may have been updated due to checkAndCompleteShardSplit method
+ slice = state.getSlice(collection, sliceName);
sliceProps = slice.getProperties();
replicas = slice.getReplicasCopy();
} else {
@@ -415,6 +428,7 @@ public class Overseer {
sliceProps = new HashMap<String, Object>();
sliceProps.put(Slice.RANGE, shardRange);
sliceProps.put(Slice.STATE, shardState);
+ sliceProps.put(Slice.PARENT, shardParent);
}
replicas.put(replica.getName(), replica);
@@ -424,10 +438,76 @@ public class Overseer {
return newClusterState;
}
- private ClusterState createCollection(ClusterState state, String collectionName, List<String> shards , ZkNodeProps message) {
- log.info("Create collection {} with shards {}", collectionName, shards);;
+ private ClusterState checkAndCompleteShardSplit(ClusterState state, String collection, String coreNodeName, String sliceName, Map<String,Object> replicaProps) {
+ Slice slice = state.getSlice(collection, sliceName);
+ Map<String, Object> sliceProps = slice.getProperties();
+ String sliceState = slice.getState();
+ if (Slice.RECOVERY.equals(sliceState)) {
+ log.info("Shard: {} is in recovery state", sliceName);
+ // is this replica active?
+ if (ZkStateReader.ACTIVE.equals(replicaProps.get(ZkStateReader.STATE_PROP))) {
+ log.info("Shard: {} is in recovery state and coreNodeName: {} is active", sliceName, coreNodeName);
+ // are all other replicas also active?
+ boolean allActive = true;
+ for (Entry<String, Replica> entry : slice.getReplicasMap().entrySet()) {
+ if (coreNodeName.equals(entry.getKey())) continue;
+ if (!Slice.ACTIVE.equals(entry.getValue().getStr(Slice.STATE))) {
+ allActive = false;
+ break;
+ }
+ }
+ if (allActive) {
+ log.info("Shard: {} - all replicas are active. Finding status of fellow sub-shards", sliceName);
+ // find out about other sub shards
+ Map<String, Slice> allSlicesCopy = new HashMap<String, Slice>(state.getSlicesMap(collection));
+ List<Slice> subShardSlices = new ArrayList<Slice>();
+ outer:
+ for (Entry<String, Slice> entry : allSlicesCopy.entrySet()) {
+ if (sliceName.equals(entry.getKey()))
+ continue;
+ Slice otherSlice = entry.getValue();
+ if (Slice.RECOVERY.equals(otherSlice.getState())) {
+ if (slice.getParent() != null && slice.getParent().equals(otherSlice.getParent())) {
+ log.info("Shard: {} - Fellow sub-shard: {} found", sliceName, otherSlice.getName());
+ // this is a fellow sub shard so check if all replicas are active
+ for (Entry<String, Replica> sliceEntry : otherSlice.getReplicasMap().entrySet()) {
+ if (!ZkStateReader.ACTIVE.equals(sliceEntry.getValue().getStr(ZkStateReader.STATE_PROP))) {
+ allActive = false;
+ break outer;
+ }
+ }
+ log.info("Shard: {} - Fellow sub-shard: {} has all replicas active", sliceName, otherSlice.getName());
+ subShardSlices.add(otherSlice);
+ }
+ }
+ }
+ if (allActive) {
+ // hurray, all sub shard replicas are active
+ log.info("Shard: {} - All replicas across all fellow sub-shards are now ACTIVE. Preparing to switch shard states.", sliceName);
+ String parentSliceName = (String) sliceProps.remove(Slice.PARENT);
+
+ Map<String, Object> propMap = new HashMap<String, Object>();
+ propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
+ propMap.put(parentSliceName, Slice.INACTIVE);
+ propMap.put(sliceName, Slice.ACTIVE);
+ for (Slice subShardSlice : subShardSlices) {
+ propMap.put(subShardSlice.getName(), Slice.ACTIVE);
+ }
+ propMap.put(ZkStateReader.COLLECTION_PROP, collection);
+ ZkNodeProps m = new ZkNodeProps(propMap);
+ state = updateShardState(state, m);
+ }
+ }
+ }
+ }
+ return state;
+ }
- String routerName = message.getStr(OverseerCollectionProcessor.ROUTER,DocRouter.DEFAULT_NAME);
+ private ClusterState createCollection(ClusterState state, String collectionName, List<String> shards , ZkNodeProps message) {
+ log.info("Create collection {} with shards {}", collectionName, shards);
+
+ Map<String, Object> routerSpec = DocRouter.getRouterSpec(message);
+ String routerName = routerSpec.get("name") == null ? DocRouter.DEFAULT_NAME : (String) routerSpec.get("name");
DocRouter router = DocRouter.getDocRouter(routerName);
List<DocRouter.Range> ranges = router.partitionRange(shards.size(), router.fullRange());
@@ -459,7 +539,7 @@ public class Overseer {
}
if(val != null) collectionProps.put(e.getKey(),val);
}
- collectionProps.put(DocCollection.DOC_ROUTER, routerName);
+ collectionProps.put(DocCollection.DOC_ROUTER, routerSpec);
DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router);
@@ -518,7 +598,7 @@ public class Overseer {
// without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router.
slices = new HashMap<String, Slice>(1);
props = new HashMap<String,Object>(1);
- props.put(DocCollection.DOC_ROUTER, ImplicitDocRouter.NAME);
+ props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name",ImplicitDocRouter.NAME));
router = new ImplicitDocRouter();
} else {
props = coll.getProperties();
@@ -610,10 +690,11 @@ public class Overseer {
* Remove collection slice from cloudstate
*/
private ClusterState removeShard(final ClusterState clusterState, ZkNodeProps message) {
-
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
final String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
+ log.info("Removing collection: " + collection + " shard: " + sliceId + " from clusterstate");
+
final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
DocCollection coll = newCollections.get(collection);
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Mon Oct 21 18:58:24 2013
@@ -25,11 +25,13 @@ import org.apache.solr.client.solrj.requ
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
+import org.apache.solr.cloud.Overseer.LeaderStatus;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClosableThread;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
@@ -56,6 +58,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -64,7 +67,6 @@ import java.util.Set;
import static org.apache.solr.cloud.Assign.Node;
import static org.apache.solr.cloud.Assign.getNodesForNewShard;
-import static org.apache.solr.common.cloud.DocRouter.ROUTE_FIELD;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
@@ -102,11 +104,10 @@ public class OverseerCollectionProcessor
public static final String COLL_CONF = "collection.configName";
- public static final Map<String,Object> COLL_PROPS = asMap(
- ROUTER,DocRouter.DEFAULT_NAME,
+ public static final Map<String,Object> COLL_PROPS = ZkNodeProps.makeMap(
+ ROUTER, DocRouter.DEFAULT_NAME,
REPLICATION_FACTOR, "1",
- MAX_SHARDS_PER_NODE,"1",
- ROUTE_FIELD,null);
+ MAX_SHARDS_PER_NODE, "1");
// TODO: use from Overseer?
@@ -142,8 +143,22 @@ public class OverseerCollectionProcessor
@Override
public void run() {
log.info("Process current queue of collection creations");
- while (amILeader() && !isClosed) {
+ LeaderStatus isLeader = amILeader();
+ while (isLeader == LeaderStatus.DONT_KNOW) {
+ log.debug("am_i_leader unclear {}", isLeader);
+ isLeader = amILeader(); // not a no, not a yes, try ask again
+ }
+ while (!this.isClosed) {
try {
+ isLeader = amILeader();
+ if (LeaderStatus.NO == isLeader) {
+ break;
+ }
+ else if (LeaderStatus.YES != isLeader) {
+ log.debug("am_i_leader unclear {}", isLeader);
+ continue; // not a no, not a yes, try asking again
+ }
+
QueueEvent head = workQueue.peek(true);
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString());
@@ -174,20 +189,27 @@ public class OverseerCollectionProcessor
isClosed = true;
}
- protected boolean amILeader() {
+ protected LeaderStatus amILeader() {
try {
ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData(
"/overseer_elect/leader", null, null, true));
if (myId.equals(props.getStr("id"))) {
- return true;
+ return LeaderStatus.YES;
}
} catch (KeeperException e) {
- log.warn("", e);
+ if (e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.error("", e);
+ return LeaderStatus.DONT_KNOW;
+ } else if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
+ log.info("", e);
+ } else {
+ log.warn("", e);
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
- return false;
+ return LeaderStatus.NO;
}
@@ -232,31 +254,57 @@ public class OverseerCollectionProcessor
return new OverseerSolrResponse(results);
}
- private void deleteCollection(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
- params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
- params.set(CoreAdminParams.DELETE_DATA_DIR, true);
- collectionCmd(zkStateReader.getClusterState(), message, params, results, null);
-
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
- Overseer.REMOVECOLLECTION, "name", message.getStr("name"));
- Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
-
- // wait for a while until we don't see the collection
- long now = System.currentTimeMillis();
- long timeout = now + 30000;
- boolean removed = false;
- while (System.currentTimeMillis() < timeout) {
- Thread.sleep(100);
- removed = !zkStateReader.getClusterState().getCollections().contains(message.getStr("name"));
- if (removed) {
- Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
- break;
+ private void deleteCollection(ZkNodeProps message, NamedList results)
+ throws KeeperException, InterruptedException {
+ String collection = message.getStr("name");
+ try {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
+ params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
+ params.set(CoreAdminParams.DELETE_DATA_DIR, true);
+ collectionCmd(zkStateReader.getClusterState(), message, params, results,
+ null);
+
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
+ Overseer.REMOVECOLLECTION, "name", collection);
+ Overseer.getInQueue(zkStateReader.getZkClient()).offer(
+ ZkStateReader.toJSON(m));
+
+ // wait for a while until we don't see the collection
+ long now = System.currentTimeMillis();
+ long timeout = now + 30000;
+ boolean removed = false;
+ while (System.currentTimeMillis() < timeout) {
+ Thread.sleep(100);
+ removed = !zkStateReader.getClusterState().getCollections()
+ .contains(message.getStr("name"));
+ if (removed) {
+ Thread.sleep(100); // just a bit of time so it's more likely other
+ // readers see on return
+ break;
+ }
+ }
+ if (!removed) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Could not fully remove collection: " + message.getStr("name"));
+ }
+
+ } finally {
+
+ try {
+ if (zkStateReader.getZkClient().exists(
+ ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
+ zkStateReader.getZkClient().clean(
+ ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection);
+ }
+ } catch (InterruptedException e) {
+ SolrException.log(log, "Cleaning up collection in zk was interrupted:"
+ + collection, e);
+ Thread.currentThread().interrupt();
+ } catch (KeeperException e) {
+ SolrException.log(log, "Problem cleaning up collection in zk:"
+ + collection, e);
}
- }
- if (!removed) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully remove collection: " + message.getStr("name"));
}
}
@@ -362,7 +410,7 @@ public class OverseerCollectionProcessor
}
private boolean createShard(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
- log.info("create shard invoked");
+ log.info("Create shard invoked: {}", message);
String collectionName = message.getStr(COLLECTION_PROP);
String shard = message.getStr(SHARD_ID_PROP);
if(collectionName == null || shard ==null)
@@ -371,19 +419,18 @@ public class OverseerCollectionProcessor
DocCollection collection = clusterState.getCollection(collectionName);
int maxShardsPerNode = collection.getInt(MAX_SHARDS_PER_NODE, 1);
- int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(MAX_SHARDS_PER_NODE, 1));
-// int minReplicas = message.getInt("minReplicas",repFactor);
- String createNodeSetStr =message.getStr(CREATE_NODE_SET);
+ int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1));
+ String createNodeSetStr = message.getStr(CREATE_NODE_SET);
ArrayList<Node> sortedNodeList = getNodesForNewShard(clusterState, collectionName, numSlices, maxShardsPerNode, repFactor, createNodeSetStr);
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message));
- // wait for a while until we don't see the collection
+ // wait for a while until we see the shard
long waitUntil = System.currentTimeMillis() + 30000;
boolean created = false;
while (System.currentTimeMillis() < waitUntil) {
Thread.sleep(100);
- created = zkStateReader.getClusterState().getCollection(collectionName).getSlice(shard) !=null;
+ created = zkStateReader.getClusterState().getCollection(collectionName).getSlice(shard) != null;
if (created) break;
}
if (!created)
@@ -442,8 +489,34 @@ public class OverseerCollectionProcessor
log.info("Split shard invoked");
String collectionName = message.getStr("collection");
String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
- Slice parentSlice = clusterState.getSlice(collectionName, slice);
-
+ String splitKey = message.getStr("split.key");
+
+ DocCollection collection = clusterState.getCollection(collectionName);
+ DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
+
+ Slice parentSlice = null;
+
+ if (slice == null) {
+ if (router instanceof CompositeIdRouter) {
+ Collection<Slice> searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection);
+ if (searchSlices.isEmpty()) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey);
+ }
+ if (searchSlices.size() > 1) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "Splitting a split.key: " + splitKey + " which spans multiple shards is not supported");
+ }
+ parentSlice = searchSlices.iterator().next();
+ slice = parentSlice.getName();
+ log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice);
+ } else {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "Split by route key can only be used with CompositeIdRouter or subclass. Found router: " + router.getClass().getName());
+ }
+ } else {
+ parentSlice = clusterState.getSlice(collectionName, slice);
+ }
+
if (parentSlice == null) {
if(clusterState.getCollections().contains(collectionName)) {
throw new SolrException(ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
@@ -454,16 +527,59 @@ public class OverseerCollectionProcessor
// find the leader for the shard
Replica parentShardLeader = clusterState.getLeader(collectionName, slice);
- DocCollection collection = clusterState.getCollection(collectionName);
- DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
DocRouter.Range range = parentSlice.getRange();
if (range == null) {
range = new PlainIdRouter().fullRange();
}
- // todo: fixed to two partitions?
- // todo: accept the range as a param to api?
- List<DocRouter.Range> subRanges = router.partitionRange(2, range);
+ List<DocRouter.Range> subRanges = null;
+ String rangesStr = message.getStr(CoreAdminParams.RANGES);
+ if (rangesStr != null) {
+ String[] ranges = rangesStr.split(",");
+ if (ranges.length == 0 || ranges.length == 1) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "There must be at least two ranges specified to split a shard");
+ } else {
+ subRanges = new ArrayList<DocRouter.Range>(ranges.length);
+ for (int i = 0; i < ranges.length; i++) {
+ String r = ranges[i];
+ try {
+ subRanges.add(DocRouter.DEFAULT.fromString(r));
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Exception in parsing hexadecimal hash range: " + r, e);
+ }
+ if (!subRanges.get(i).isSubsetOf(range)) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "Specified hash range: " + r + " is not a subset of parent shard's range: " + range.toString());
+ }
+ }
+ }
+ } else if (splitKey != null) {
+ if (router instanceof CompositeIdRouter) {
+ CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
+ subRanges = compositeIdRouter.partitionRangeByKey(splitKey, range);
+ if (subRanges.size() == 1) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "The split.key: " + splitKey + " has a hash range that is exactly equal to hash range of shard: " + slice);
+ }
+ for (DocRouter.Range subRange : subRanges) {
+ if (subRange.min == subRange.max) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "The split.key: " + splitKey + " must be a compositeId");
+ }
+ }
+ log.info("Partitioning parent shard " + slice + " range: " + parentSlice.getRange() + " yields: " + subRanges);
+ rangesStr = "";
+ for (int i = 0; i < subRanges.size(); i++) {
+ DocRouter.Range subRange = subRanges.get(i);
+ rangesStr += subRange.toString();
+ if (i < subRanges.size() - 1)
+ rangesStr += ',';
+ }
+ }
+ } else {
+ // todo: fixed to two partitions?
+ subRanges = router.partitionRange(2, range);
+ }
+
try {
List<String> subSlices = new ArrayList<String>(subRanges.size());
List<String> subShardNames = new ArrayList<String>(subRanges.size());
@@ -478,18 +594,19 @@ public class OverseerCollectionProcessor
if (oSlice != null) {
if (Slice.ACTIVE.equals(oSlice.getState())) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
- } else if (Slice.CONSTRUCTION.equals(oSlice.getState())) {
- for (Replica replica : oSlice.getReplicas()) {
- if (clusterState.liveNodesContain(replica.getNodeName())) {
- String core = replica.getStr("core");
- log.info("Unloading core: " + core + " from node: " + replica.getNodeName());
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
- params.set(CoreAdminParams.CORE, core);
- params.set(CoreAdminParams.DELETE_INDEX, "true");
- sendShardRequest(replica.getNodeName(), params);
- } else {
- log.warn("Replica {} exists in shard {} but is not live and cannot be unloaded", replica, oSlice);
+ } else if (Slice.CONSTRUCTION.equals(oSlice.getState()) || Slice.RECOVERY.equals(oSlice.getState())) {
+ // delete the shards
+ for (String sub : subSlices) {
+ log.info("Sub-shard: {} already exists therefore requesting its deletion", sub);
+ Map<String, Object> propMap = new HashMap<String, Object>();
+ propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
+ propMap.put(COLLECTION_PROP, collectionName);
+ propMap.put(SHARD_ID_PROP, sub);
+ ZkNodeProps m = new ZkNodeProps(propMap);
+ try {
+ deleteShard(clusterState, m, new NamedList());
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to delete already existing sub shard: " + sub, e);
}
}
}
@@ -518,7 +635,7 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.SHARD, subSlice);
params.set(CoreAdminParams.SHARD_RANGE, subRange.toString());
params.set(CoreAdminParams.SHARD_STATE, Slice.CONSTRUCTION);
- //params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices); todo: is it necessary, we're not creating collections?
+ params.set(CoreAdminParams.SHARD_PARENT, parentSlice.getName());
sendShardRequest(nodeName, params);
}
@@ -557,6 +674,7 @@ public class OverseerCollectionProcessor
String subShardName = subShardNames.get(i);
params.add(CoreAdminParams.TARGET_CORE, subShardName);
}
+ params.set(CoreAdminParams.RANGES, rangesStr);
sendShardRequest(parentShardLeader.getNodeName(), params);
collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command");
@@ -643,7 +761,7 @@ public class OverseerCollectionProcessor
cmd.setCoreName(subShardNames.get(i-1));
cmd.setNodeName(subShardNodeName);
cmd.setCoreNodeName(coreNodeName);
- cmd.setState(ZkStateReader.ACTIVE);
+ cmd.setState(ZkStateReader.RECOVERING);
cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true);
sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
@@ -653,32 +771,46 @@ public class OverseerCollectionProcessor
collectShardResponses(results, true,
"SPLTSHARD failed to create subshard replicas or timed out waiting for them to come up");
+ log.info("Successfully created all replica shards for all sub-slices " + subSlices);
+
+ log.info("Calling soft commit to make sub shard updates visible");
String coreUrl = new ZkCoreNodeProps(parentShardLeader).getCoreUrl();
// HttpShardHandler is hard coded to send a QueryRequest hence we go direct
// and we force open a searcher so that we have documents to show upon switching states
UpdateResponse updateResponse = null;
try {
- updateResponse = commit(coreUrl, true);
+ updateResponse = softCommit(coreUrl);
processResponse(results, null, coreUrl, updateResponse, slice);
} catch (Exception e) {
processResponse(results, e, coreUrl, updateResponse, slice);
- throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to call distrib commit on: " + coreUrl, e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to call distrib softCommit on: " + coreUrl, e);
}
- log.info("Successfully created all replica shards for all sub-slices "
- + subSlices);
-
- log.info("Requesting update shard state");
- DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
- Map<String, Object> propMap = new HashMap<String, Object>();
- propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
- propMap.put(slice, Slice.INACTIVE);
- for (String subSlice : subSlices) {
- propMap.put(subSlice, Slice.ACTIVE);
- }
- propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
- ZkNodeProps m = new ZkNodeProps(propMap);
- inQueue.offer(ZkStateReader.toJSON(m));
+ if (repFactor == 1) {
+ // switch sub shard states to 'active'
+ log.info("Replication factor is 1 so switching shard states");
+ DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
+ Map<String, Object> propMap = new HashMap<String, Object>();
+ propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
+ propMap.put(slice, Slice.INACTIVE);
+ for (String subSlice : subSlices) {
+ propMap.put(subSlice, Slice.ACTIVE);
+ }
+ propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
+ ZkNodeProps m = new ZkNodeProps(propMap);
+ inQueue.offer(ZkStateReader.toJSON(m));
+ } else {
+ log.info("Requesting shard state be set to 'recovery'");
+ DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
+ Map<String, Object> propMap = new HashMap<String, Object>();
+ propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
+ for (String subSlice : subSlices) {
+ propMap.put(subSlice, Slice.RECOVERY);
+ }
+ propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
+ ZkNodeProps m = new ZkNodeProps(propMap);
+ inQueue.offer(ZkStateReader.toJSON(m));
+ }
return true;
} catch (SolrException e) {
@@ -689,16 +821,15 @@ public class OverseerCollectionProcessor
}
}
- static UpdateResponse commit(String url, boolean openSearcher) throws SolrServerException, IOException {
+ static UpdateResponse softCommit(String url) throws SolrServerException, IOException {
HttpSolrServer server = null;
try {
server = new HttpSolrServer(url);
server.setConnectionTimeout(30000);
- server.setSoTimeout(60000);
+ server.setSoTimeout(120000);
UpdateRequest ureq = new UpdateRequest();
ureq.setParams(new ModifiableSolrParams());
- ureq.getParams().set(UpdateParams.OPEN_SEARCHER, openSearcher);
- ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true);
+ ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true, true);
return ureq.process(server);
} finally {
if (server != null) {
@@ -773,7 +904,7 @@ public class OverseerCollectionProcessor
}
// For now, only allow for deletions of Inactive slices or custom hashes (range==null).
// TODO: Add check for range gaps on Slice deletion
- if (!(slice.getRange() == null || slice.getState().equals(Slice.INACTIVE))) {
+ if (!(slice.getRange() == null || slice.getState().equals(Slice.INACTIVE) || slice.getState().equals(Slice.RECOVERY))) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"The slice: " + slice.getName() + " is currently "
+ slice.getState() + ". Only INACTIVE (or custom-hashed) slices can be deleted.");
@@ -794,7 +925,8 @@ public class OverseerCollectionProcessor
} while (srsp != null);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
- Overseer.REMOVESHARD, ZkStateReader.COLLECTION_PROP, collection);
+ Overseer.REMOVESHARD, ZkStateReader.COLLECTION_PROP, collection,
+ ZkStateReader.SHARD_ID_PROP, sliceId);
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
// wait for a while until we don't see the shard
@@ -803,7 +935,7 @@ public class OverseerCollectionProcessor
boolean removed = false;
while (System.currentTimeMillis() < timeout) {
Thread.sleep(100);
- removed = zkStateReader.getClusterState().getSlice(collection, message.getStr("name")) == null;
+ removed = zkStateReader.getClusterState().getSlice(collection, sliceId) == null;
if (removed) {
Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
break;
@@ -811,15 +943,15 @@ public class OverseerCollectionProcessor
}
if (!removed) {
throw new SolrException(ErrorCode.SERVER_ERROR,
- "Could not fully remove collection: " + collection + " shard: " + message.getStr("name"));
+ "Could not fully remove collection: " + collection + " shard: " + sliceId);
}
- log.info("Successfully deleted collection " + collection + ", shard: " + message.getStr("name"));
+ log.info("Successfully deleted collection: " + collection + ", shard: " + sliceId);
} catch (SolrException e) {
throw e;
} catch (Exception e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Error executing delete operation for collection: " + collection + " shard: " + message.getStr("name"), e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Error executing delete operation for collection: " + collection + " shard: " + sliceId, e);
}
}
@@ -848,7 +980,7 @@ public class OverseerCollectionProcessor
int repFactor = message.getInt( REPLICATION_FACTOR, 1);
Integer numSlices = message.getInt(NUM_SLICES, null);
- String router = message.getStr(ROUTER, DocRouter.DEFAULT_NAME);
+ String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
List<String> shardNames = new ArrayList<>();
if(ImplicitDocRouter.NAME.equals(router)){
Overseer.getShardNames(shardNames, message.getStr("shards",null));
@@ -916,8 +1048,6 @@ public class OverseerCollectionProcessor
+ " shards to be created (higher than the allowed number)");
}
-// ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
-// Overseer.CREATECOLLECTION, "name", message.getStr("name"));
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message));
// wait for a while until we don't see the collection
@@ -992,11 +1122,6 @@ public class OverseerCollectionProcessor
DocCollection coll = clusterState.getCollection(collectionName);
- if (coll == null) {
- throw new SolrException(ErrorCode.BAD_REQUEST,
- "Could not find collection:" + collectionName);
- }
-
for (Map.Entry<String,Slice> entry : coll.getSlicesMap().entrySet()) {
Slice slice = entry.getValue();
sliceCmd(clusterState, params, stateMatcher, slice);
@@ -1080,11 +1205,4 @@ public class OverseerCollectionProcessor
return isClosed;
}
- public static Map<String, Object> asMap(Object... vals) {
- HashMap<String, Object> m = new HashMap<String, Object>();
- for(int i=0; i<vals.length; i+=2) {
- m.put(String.valueOf(vals[i]), vals[i+1]);
- }
- return m;
- }
}
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Mon Oct 21 18:58:24 2013
@@ -168,35 +168,41 @@ public class RecoveryStrategy extends Th
}
- private void commitOnLeader(String leaderUrl) throws SolrServerException, IOException {
+ private void commitOnLeader(String leaderUrl) throws SolrServerException,
+ IOException {
HttpSolrServer server = new HttpSolrServer(leaderUrl);
- server.setConnectionTimeout(30000);
- server.setSoTimeout(60000);
- UpdateRequest ureq = new UpdateRequest();
- ureq.setParams(new ModifiableSolrParams());
- ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
- ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
- ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
- server);
- server.shutdown();
+ try {
+ server.setConnectionTimeout(30000);
+ server.setSoTimeout(60000);
+ UpdateRequest ureq = new UpdateRequest();
+ ureq.setParams(new ModifiableSolrParams());
+ ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
+ ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
+ ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
+ server);
+ } finally {
+ server.shutdown();
+ }
}
- private void sendPrepRecoveryCmd(String leaderBaseUrl,
- String leaderCoreName) throws SolrServerException,
- IOException {
+ private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName)
+ throws SolrServerException, IOException {
HttpSolrServer server = new HttpSolrServer(leaderBaseUrl);
- server.setConnectionTimeout(45000);
- server.setSoTimeout(120000);
- WaitForState prepCmd = new WaitForState();
- prepCmd.setCoreName(leaderCoreName);
- prepCmd.setNodeName(zkController.getNodeName());
- prepCmd.setCoreNodeName(coreZkNodeName);
- prepCmd.setState(ZkStateReader.RECOVERING);
- prepCmd.setCheckLive(true);
- prepCmd.setOnlyIfLeader(true);
-
- server.request(prepCmd);
- server.shutdown();
+ try {
+ server.setConnectionTimeout(45000);
+ server.setSoTimeout(120000);
+ WaitForState prepCmd = new WaitForState();
+ prepCmd.setCoreName(leaderCoreName);
+ prepCmd.setNodeName(zkController.getNodeName());
+ prepCmd.setCoreNodeName(coreZkNodeName);
+ prepCmd.setState(ZkStateReader.RECOVERING);
+ prepCmd.setCheckLive(true);
+ prepCmd.setOnlyIfLeader(true);
+
+ server.request(prepCmd);
+ } finally {
+ server.shutdown();
+ }
}
@Override
Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java Mon Oct 21 18:58:24 2013
@@ -68,7 +68,7 @@ public class SyncStrategy {
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 20);
params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 15000);
- params.set(HttpClientUtil.PROP_SO_TIMEOUT, 30000);
+ params.set(HttpClientUtil.PROP_SO_TIMEOUT, 60000);
params.set(HttpClientUtil.PROP_USE_RETRY, false);
client = HttpClientUtil.createClient(params);
}
@@ -285,12 +285,14 @@ public class SyncStrategy {
recoverRequestCmd.setCoreName(coreName);
HttpSolrServer server = new HttpSolrServer(baseUrl, client);
- server.setConnectionTimeout(15000);
- server.setSoTimeout(30000);
try {
+ server.setConnectionTimeout(15000);
+ server.setSoTimeout(60000);
server.request(recoverRequestCmd);
} catch (Throwable t) {
SolrException.log(log, ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Could not tell a replica to recover", t);
+ } finally {
+ server.shutdown();
}
}
};