You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2012/09/21 19:22:27 UTC
svn commit: r1388574 [34/45] - in /lucene/dev/branches/LUCENE-2878: ./
dev-tools/ dev-tools/eclipse/ dev-tools/eclipse/dot.settings/
dev-tools/idea/ dev-tools/idea/.idea/ dev-tools/idea/.idea/libraries/
dev-tools/idea/lucene/ dev-tools/idea/lucene/anal...
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/extraction/src/test/org/apache/solr/handler/extraction/ExtractingRequestHandlerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/extraction/src/test/org/apache/solr/handler/extraction/ExtractingRequestHandlerTest.java?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/extraction/src/test/org/apache/solr/handler/extraction/ExtractingRequestHandlerTest.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/extraction/src/test/org/apache/solr/handler/extraction/ExtractingRequestHandlerTest.java Fri Sep 21 17:21:34 2012
@@ -64,8 +64,7 @@ public class ExtractingRequestHandlerTes
"fmap.producer", "extractedProducer",
"fmap.creator", "extractedCreator", "fmap.Keywords", "extractedKeywords",
"fmap.Creation-Date", "extractedDate",
- "fmap.AAPL:Keywords", "ignored_a",
- "fmap.xmpTPg:NPages", "ignored_a",
+ "uprefix", "ignored_",
"fmap.Author", "extractedAuthor",
"fmap.content", "extractedContent",
"literal.id", "one",
@@ -81,6 +80,7 @@ public class ExtractingRequestHandlerTes
"fmap.Author", "extractedAuthor",
"fmap.language", "extractedLanguage",
"literal.id", "two",
+ "uprefix", "ignored_",
"fmap.content", "extractedContent",
"fmap.Last-Modified", "extractedDate"
);
@@ -136,6 +136,7 @@ public class ExtractingRequestHandlerTes
"fmap.creator", "extractedCreator", "fmap.Keywords", "extractedKeywords",
"fmap.Author", "extractedAuthor",
"literal.id", "three",
+ "uprefix", "ignored_",
"fmap.content", "extractedContent",
"fmap.language", "extractedLanguage",
"fmap.Last-Modified", "extractedDate"
@@ -144,7 +145,22 @@ public class ExtractingRequestHandlerTes
assertU(commit());
assertQ(req("stream_name:version_control.xml"), "//*[@numFound='1']");
-
+ loadLocal("extraction/word2003.doc", "fmap.created", "extractedDate", "fmap.producer", "extractedProducer",
+ "fmap.creator", "extractedCreator", "fmap.Keywords", "extractedKeywords",
+ "fmap.Author", "extractedAuthor",
+ "literal.id", "four",
+ "uprefix", "ignored_",
+ "fmap.content", "extractedContent",
+ "fmap.language", "extractedLanguage",
+ "fmap.Last-Modified", "extractedDate"
+ );
+ assertQ(req("title:\"Word 2003 Title\""), "//*[@numFound='0']");
+ // There is already a PDF file with this content:
+ assertQ(req("extractedContent:\"This is a test of PDF and Word extraction in Solr, it is only a test\""), "//*[@numFound='1']");
+ assertU(commit());
+ assertQ(req("title:\"Word 2003 Title\""), "//*[@numFound='1']");
+ // now 2 of them:
+ assertQ(req("extractedContent:\"This is a test of PDF and Word extraction in Solr, it is only a test\""), "//*[@numFound='2']");
}
@@ -162,8 +178,7 @@ public class ExtractingRequestHandlerTes
//"fmap.content_type", "abcxyz",
"commit", "true" // test immediate commit
);
- assertTrue(false);
-
+ fail("Should throw SolrException");
} catch (SolrException e) {
//do nothing
} finally {
@@ -206,6 +221,7 @@ public class ExtractingRequestHandlerTes
"fmap.Author", "extractedAuthor",
"fmap.content", "extractedContent",
"literal.id", "one",
+ "uprefix", "ignored_",
"fmap.language", "extractedLanguage",
"literal.extractionLiteralMV", "one",
"literal.extractionLiteralMV", "two",
@@ -374,9 +390,8 @@ public class ExtractingRequestHandlerTes
loadLocal("extraction/arabic.pdf", "fmap.created", "extractedDate", "fmap.producer", "extractedProducer",
"fmap.creator", "extractedCreator", "fmap.Keywords", "extractedKeywords",
"fmap.Creation-Date", "extractedDate",
- "fmap.AAPL:Keywords", "ignored_a",
- "fmap.xmpTPg:NPages", "ignored_a",
"fmap.Author", "extractedAuthor",
+ "uprefix", "ignored_",
"fmap.content", "wdf_nocase",
"literal.id", "one",
"fmap.Last-Modified", "extractedDate");
@@ -404,8 +419,7 @@ public class ExtractingRequestHandlerTes
loadLocal("extraction/password-is-solrcell.docx", "fmap.created", "extractedDate", "fmap.producer", "extractedProducer",
"fmap.creator", "extractedCreator", "fmap.Keywords", "extractedKeywords",
"fmap.Creation-Date", "extractedDate",
- "fmap.AAPL:Keywords", "ignored_a",
- "fmap.xmpTPg:NPages", "ignored_a",
+ "uprefix", "ignored_",
"fmap.Author", "extractedAuthor",
"fmap.content", "wdf_nocase",
"literal.id", "one",
@@ -462,8 +476,7 @@ public class ExtractingRequestHandlerTes
"fmap.content", "extractedContent",
"fmap.language", "extractedLanguage",
"fmap.Creation-Date", "extractedDate",
- "fmap.AAPL:Keywords", "ignored_a",
- "fmap.xmpTPg:NPages", "ignored_a",
+ "uprefix", "ignored_",
"fmap.Last-Modified", "extractedDate");
// Here the literal value should override the Tika-parsed title:
@@ -478,8 +491,7 @@ public class ExtractingRequestHandlerTes
"fmap.content", "extractedContent",
"fmap.language", "extractedLanguage",
"fmap.Creation-Date", "extractedDate",
- "fmap.AAPL:Keywords", "ignored_a",
- "fmap.xmpTPg:NPages", "ignored_a",
+ "uprefix", "ignored_",
"fmap.Last-Modified", "extractedDate");
// Here we mimic the old behaviour where literals are added, not overridden
@@ -498,8 +510,7 @@ public class ExtractingRequestHandlerTes
"fmap.content", "extractedContent",
"fmap.language", "extractedLanguage",
"fmap.Creation-Date", "extractedDate",
- "fmap.AAPL:Keywords", "ignored_a",
- "fmap.xmpTPg:NPages", "ignored_a",
+ "uprefix", "ignored_",
"fmap.Last-Modified", "extractedDate");
assertU(commit());
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/java/org/apache/solr/uima/processor/SolrUIMAConfiguration.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/java/org/apache/solr/uima/processor/SolrUIMAConfiguration.java?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/java/org/apache/solr/uima/processor/SolrUIMAConfiguration.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/java/org/apache/solr/uima/processor/SolrUIMAConfiguration.java Fri Sep 21 17:21:34 2012
@@ -40,7 +40,7 @@ public class SolrUIMAConfiguration {
private String logField;
- public SolrUIMAConfiguration(String aePath, String[] fieldsToAnalyze, boolean fieldsMerging,
+ SolrUIMAConfiguration(String aePath, String[] fieldsToAnalyze, boolean fieldsMerging,
Map<String, Map<String, MapField>> typesFeaturesFieldsMapping,
Map<String, Object> runtimeParameters, boolean ignoreErrors, String logField) {
this.aePath = aePath;
@@ -60,7 +60,7 @@ public class SolrUIMAConfiguration {
return fieldsMerging;
}
- public Map<String, Map<String, MapField>> getTypesFeaturesFieldsMapping() {
+ Map<String, Map<String, MapField>> getTypesFeaturesFieldsMapping() {
return typesFeaturesFieldsMapping;
}
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/java/org/apache/solr/uima/processor/UIMAToSolrMapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/java/org/apache/solr/uima/processor/UIMAToSolrMapper.java?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/java/org/apache/solr/uima/processor/UIMAToSolrMapper.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/java/org/apache/solr/uima/processor/UIMAToSolrMapper.java Fri Sep 21 17:21:34 2012
@@ -54,7 +54,7 @@ public class UIMAToSolrMapper {
* @param typeName name of UIMA type to map
* @param featureFieldsmapping
*/
- public void map(String typeName, Map<String, MapField> featureFieldsmapping) throws FieldMappingException {
+ void map(String typeName, Map<String, MapField> featureFieldsmapping) throws FieldMappingException {
try {
Type type = cas.getTypeSystem().getType(typeName);
for (FSIterator<FeatureStructure> iterator = cas.getFSIndexRepository().getAllIndexedFS(type); iterator
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/test/org/apache/solr/uima/ts/EntityAnnotation_Type.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/test/org/apache/solr/uima/ts/EntityAnnotation_Type.java?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/test/org/apache/solr/uima/ts/EntityAnnotation_Type.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/test/org/apache/solr/uima/ts/EntityAnnotation_Type.java Fri Sep 21 17:21:34 2012
@@ -1,4 +1,3 @@
-
/* First created by JCasGen Sat May 07 22:33:38 JST 2011 */
package org.apache.solr.uima.ts;
@@ -23,17 +22,17 @@ public class EntityAnnotation_Type exten
private final FSGenerator fsGenerator =
new FSGenerator() {
public FeatureStructure createFS(int addr, CASImpl cas) {
- if (EntityAnnotation_Type.this.useExistingInstance) {
- // Return eq fs instance if already created
- FeatureStructure fs = EntityAnnotation_Type.this.jcas.getJfsFromCaddr(addr);
- if (null == fs) {
- fs = new EntityAnnotation(addr, EntityAnnotation_Type.this);
- EntityAnnotation_Type.this.jcas.putJfsFromCaddr(addr, fs);
- return fs;
- }
- return fs;
+ if (EntityAnnotation_Type.this.useExistingInstance) {
+ // Return eq fs instance if already created
+ FeatureStructure fs = EntityAnnotation_Type.this.jcas.getJfsFromCaddr(addr);
+ if (null == fs) {
+ fs = new EntityAnnotation(addr, EntityAnnotation_Type.this);
+ EntityAnnotation_Type.this.jcas.putJfsFromCaddr(addr, fs);
+ return fs;
+ }
+ return fs;
} else return new EntityAnnotation(addr, EntityAnnotation_Type.this);
- }
+ }
};
/** @generated */
public final static int typeIndexID = EntityAnnotation.typeIndexID;
@@ -80,7 +79,7 @@ public class EntityAnnotation_Type exten
/** initialize variables to correspond with Cas Type and Features
- * @generated */
+ * @generated */
public EntityAnnotation_Type(JCas jcas, Type casType) {
super(jcas, casType);
casImpl.getFSClassRegistry().addGeneratorForType((TypeImpl)this.casType, getFSGenerator());
@@ -98,4 +97,4 @@ public class EntityAnnotation_Type exten
-
\ No newline at end of file
+
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/test/org/apache/solr/uima/ts/SentimentAnnotation_Type.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/test/org/apache/solr/uima/ts/SentimentAnnotation_Type.java?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/test/org/apache/solr/uima/ts/SentimentAnnotation_Type.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/test/org/apache/solr/uima/ts/SentimentAnnotation_Type.java Fri Sep 21 17:21:34 2012
@@ -1,4 +1,3 @@
-
/* First created by JCasGen Fri Mar 04 13:08:40 CET 2011 */
package org.apache.solr.uima.ts;
@@ -23,17 +22,17 @@ public class SentimentAnnotation_Type ex
private final FSGenerator fsGenerator =
new FSGenerator() {
public FeatureStructure createFS(int addr, CASImpl cas) {
- if (SentimentAnnotation_Type.this.useExistingInstance) {
- // Return eq fs instance if already created
- FeatureStructure fs = SentimentAnnotation_Type.this.jcas.getJfsFromCaddr(addr);
- if (null == fs) {
- fs = new SentimentAnnotation(addr, SentimentAnnotation_Type.this);
- SentimentAnnotation_Type.this.jcas.putJfsFromCaddr(addr, fs);
- return fs;
- }
- return fs;
+ if (SentimentAnnotation_Type.this.useExistingInstance) {
+ // Return eq fs instance if already created
+ FeatureStructure fs = SentimentAnnotation_Type.this.jcas.getJfsFromCaddr(addr);
+ if (null == fs) {
+ fs = new SentimentAnnotation(addr, SentimentAnnotation_Type.this);
+ SentimentAnnotation_Type.this.jcas.putJfsFromCaddr(addr, fs);
+ return fs;
+ }
+ return fs;
} else return new SentimentAnnotation(addr, SentimentAnnotation_Type.this);
- }
+ }
};
/** @generated */
public final static int typeIndexID = SentimentAnnotation.typeIndexID;
@@ -62,7 +61,7 @@ public class SentimentAnnotation_Type ex
/** initialize variables to correspond with Cas Type and Features
- * @generated */
+ * @generated */
public SentimentAnnotation_Type(JCas jcas, Type casType) {
super(jcas, casType);
casImpl.getFSClassRegistry().addGeneratorForType((TypeImpl)this.casType, getFSGenerator());
@@ -76,4 +75,4 @@ public class SentimentAnnotation_Type ex
-
\ No newline at end of file
+
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/velocity/src/java/org/apache/solr/response/PageTool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/velocity/src/java/org/apache/solr/response/PageTool.java?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/velocity/src/java/org/apache/solr/response/PageTool.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/velocity/src/java/org/apache/solr/response/PageTool.java Fri Sep 21 17:21:34 2012
@@ -58,7 +58,7 @@ public class PageTool {
results_found = doc_list.getNumFound();
start = doc_list.getStart();
} else {
- throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Unknown response type "+docs+". Expected one of DocSlice, ResultContext or SolrDocumentList");
+ throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Unknown response type "+docs+". Expected one of DocSlice, ResultContext or SolrDocumentList");
}
}
Modified: lucene/dev/branches/LUCENE-2878/solr/core/build.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/build.xml?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/build.xml (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/build.xml Fri Sep 21 17:21:34 2012
@@ -18,17 +18,28 @@
<project name="solr-core" default="default">
<description>Solr Core</description>
- <!-- hackidty-hack-hack -->
-
- <property name="ivy.retrieve.pattern" value="${common-solr.dir}/lib/[artifact]-[revision].[ext]"/>
- <!-- we cannot sync because solr/core and solr/solrj share the same lib/... clean this up! -->
- <property name="ivy.sync" value="false"/>
-
<!-- html file for testing -->
- <property name="rat.excludes" value="**/htmlStripReaderTest.html"/>
+ <property name="rat.excludes" value="**/htmlStripReaderTest.html,**/*.iml"/>
<import file="../common-build.xml"/>
<target name="compile-core" depends="compile-solrj,common-solr.compile-core"/>
+ <!-- specialized to ONLY depend on solrj -->
+ <target name="javadocs" depends="compile-core,define-lucene-javadoc-url,lucene-javadocs,javadocs-solrj">
+ <sequential>
+ <mkdir dir="${javadoc.dir}/${name}"/>
+ <solr-invoke-javadoc>
+ <solrsources>
+ <packageset dir="${src.dir}"/>
+ </solrsources>
+ <links>
+ <link href="../solr-solrj"/>
+ </links>
+ </solr-invoke-javadoc>
+ <solr-jarify basedir="${javadoc.dir}/${name}" destfile="${build.dir}/${final.name}-javadoc.jar"/>
+ </sequential>
+ </target>
+
+ <target name="dist-maven" depends="dist-maven-src-java"/>
</project>
Modified: lucene/dev/branches/LUCENE-2878/solr/core/ivy.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/ivy.xml?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/ivy.xml (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/ivy.xml Fri Sep 21 17:21:34 2012
@@ -20,21 +20,13 @@
<info organisation="org.apache.solr" module="core"/>
<dependencies>
- <dependency org="commons-codec" name="commons-codec" rev="1.6" transitive="false"/>
+ <dependency org="commons-codec" name="commons-codec" rev="1.7" transitive="false"/>
<dependency org="commons-fileupload" name="commons-fileupload" rev="1.2.1" transitive="false"/>
<dependency org="commons-cli" name="commons-cli" rev="1.2" transitive="false"/>
- <dependency org="org.apache.httpcomponents" name="httpcore" rev="4.1.4" transitive="false"/>
- <dependency org="org.apache.httpcomponents" name="httpclient" rev="4.1.3" transitive="false"/>
- <dependency org="org.apache.httpcomponents" name="httpmime" rev="4.1.3" transitive="false"/>
- <dependency org="org.slf4j" name="jcl-over-slf4j" rev="1.6.4" transitive="false"/>
- <dependency org="commons-io" name="commons-io" rev="2.1" transitive="false"/>
<dependency org="commons-lang" name="commons-lang" rev="2.6" transitive="false"/>
<dependency org="com.google.guava" name="guava" rev="r05" transitive="false"/>
- <dependency org="org.codehaus.woodstox" name="wstx-asl" rev="3.2.7" transitive="false"/>
<dependency org="org.easymock" name="easymock" rev="2.2" transitive="false"/>
- <dependency org="org.slf4j" name="slf4j-api" rev="1.6.4" transitive="false"/>
- <dependency org="org.slf4j" name="slf4j-jdk14" rev="1.6.4" transitive="false"/>
- <dependency org="com.spatial4j" name="spatial4j" rev="0.2" transitive="false"/>
+ <dependency org="com.spatial4j" name="spatial4j" rev="0.3" transitive="false"/>
<dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1" transitive="false"/>
<exclude org="*" ext="*" matcher="regexp" type="${ivy.exclude.types}"/>
</dependencies>
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/SolrLogFormatter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/SolrLogFormatter.java?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/SolrLogFormatter.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/SolrLogFormatter.java Fri Sep 21 17:21:34 2012
@@ -19,7 +19,7 @@ package org.apache.solr;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
@@ -99,7 +99,7 @@ public class SolrLogFormatter extends Fo
static int maxCoreNum;
String shortId;
String url;
- Map<String, String> coreProps;
+ Map<String, Object> coreProps;
}
Map<SolrCore, CoreInfo> coreInfoMap = new WeakHashMap<SolrCore, CoreInfo>(); // TODO: use something that survives across a core reload?
@@ -200,7 +200,7 @@ sb.append("(group_name=").append(tg.getN
info.coreProps = getCoreProps(zkController, core);
}
- Map<String, String> coreProps = getCoreProps(zkController, core);
+ Map<String, Object> coreProps = getCoreProps(zkController, core);
if(!coreProps.equals(info.coreProps)) {
info.coreProps = coreProps;
final String corePropsString = "coll:" + core.getCoreDescriptor().getCloudDescriptor().getCollectionName() + " core:" + core.getName() + " props:" + coreProps;
@@ -261,9 +261,9 @@ sb.append("(group_name=").append(tg.getN
return sb.toString();
}
- private Map<String,String> getCoreProps(ZkController zkController, SolrCore core) {
+ private Map<String,Object> getCoreProps(ZkController zkController, SolrCore core) {
final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- ZkNodeProps props = zkController.getClusterState().getShardProps(collection, ZkStateReader.getCoreNodeName(zkController.getNodeName(), core.getName()));
+ Replica props = zkController.getClusterState().getShardProps(collection, ZkStateReader.getCoreNodeName(zkController.getNodeName(), core.getName()));
if(props!=null) {
return props.getProperties();
}
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/analysis/LegacyHTMLStripCharFilter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/analysis/LegacyHTMLStripCharFilter.java?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/analysis/LegacyHTMLStripCharFilter.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/analysis/LegacyHTMLStripCharFilter.java Fri Sep 21 17:21:34 2012
@@ -103,7 +103,7 @@ public class LegacyHTMLStripCharFilter e
return ch;
}
numRead++;
- return in.read();
+ return input.read();
}
private int nextSkipWS() throws IOException {
@@ -118,7 +118,7 @@ public class LegacyHTMLStripCharFilter e
return pushed.charAt(len-1);
}
numRead++;
- int ch = in.read();
+ int ch = input.read();
push(ch);
return ch;
}
@@ -180,11 +180,11 @@ public class LegacyHTMLStripCharFilter e
private void saveState() throws IOException {
lastMark = numRead;
- in.mark(readAheadLimit);
+ input.mark(readAheadLimit);
}
private void restoreState() throws IOException {
- in.reset();
+ input.reset();
pushed.setLength(0);
}
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/analysis/TrieTokenizerFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/analysis/TrieTokenizerFactory.java?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/analysis/TrieTokenizerFactory.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/analysis/TrieTokenizerFactory.java Fri Sep 21 17:21:34 2012
@@ -72,15 +72,11 @@ final class TrieTokenizer extends Tokeni
this.type = type;
this.precisionStep = precisionStep;
this.ts = ts;
-
- setReader(input);
}
@Override
- public void setReader(Reader input) {
+ public void reset() {
try {
- super.setReader(input);
- input = super.input;
char[] buf = new char[32];
int len = input.read(buf);
this.startOfs = correctOffset(0);
@@ -113,6 +109,7 @@ final class TrieTokenizer extends Tokeni
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to create TrieIndexTokenizer", e);
}
+ ts.reset();
}
@Override
@@ -120,12 +117,6 @@ final class TrieTokenizer extends Tokeni
super.close();
ts.close();
}
-
- @Override
- public void reset() throws IOException {
- super.reset();
- ts.reset();
- }
@Override
public boolean incrementToken() {
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java Fri Sep 21 17:21:34 2012
@@ -19,15 +19,20 @@ package org.apache.solr.client.solrj.emb
import java.io.IOException;
import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
import java.util.Random;
import javax.servlet.DispatcherType;
+import javax.servlet.Filter;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.solr.servlet.SolrDispatchFilter;
-import org.eclipse.jetty.server.*;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.bio.SocketConnector;
import org.eclipse.jetty.server.handler.GzipHandler;
import org.eclipse.jetty.server.session.HashSessionIdManager;
@@ -36,6 +41,7 @@ import org.eclipse.jetty.servlet.Servlet
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jetty.util.thread.ThreadPool;
/**
* Run solr using jetty
@@ -218,9 +224,29 @@ public class JettySolrRunner {
}
public void stop() throws Exception {
- if (!server.isStopped() && !server.isStopping()) {
- server.stop();
+ // we try and do a bunch of extra stop stuff because
+ // jetty doesn't like to stop if it started
+ // and ended up in a failure state (like when it cannot get the port)
+ if (server.getState().equals(Server.FAILED)) {
+ Connector[] connectors = server.getConnectors();
+ for (Connector connector : connectors) {
+ connector.stop();
+ }
+ }
+ Filter filter = dispatchFilter.getFilter();
+ ThreadPool threadPool = server.getThreadPool();
+ server.getServer().stop();
+ server.stop();
+ if (threadPool instanceof QueuedThreadPool) {
+ ((QueuedThreadPool) threadPool).setMaxStopTimeMs(30000);
+ ((QueuedThreadPool) threadPool).stop();
+ ((QueuedThreadPool) threadPool).join();
+ }
+ //server.destroy();
+ if (server.getState().equals(Server.FAILED)) {
+ filter.destroy();
}
+
server.join();
}
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/AssignShard.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/AssignShard.java?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/AssignShard.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/AssignShard.java Fri Sep 21 17:21:34 2012
@@ -56,7 +56,7 @@ public class AssignShard {
// else figure out which shard needs more replicas
final Map<String, Integer> map = new HashMap<String, Integer>();
for (String shardId : shardIdNames) {
- int cnt = sliceMap.get(shardId).getShards().size();
+ int cnt = sliceMap.get(shardId).getReplicasMap().size();
map.put(shardId, cnt);
}
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java Fri Sep 21 17:21:34 2012
@@ -26,6 +26,17 @@ public class CloudDescriptor {
private String roles = null;
private Integer numShards;
+ volatile boolean isLeader = false;
+ volatile String lastPublished;
+
+ public String getLastPublished() {
+ return lastPublished;
+ }
+
+ public boolean isLeader() {
+ return isLeader;
+ }
+
public void setShardId(String shardId) {
this.shardId = shardId;
}
@@ -43,11 +54,11 @@ public class CloudDescriptor {
}
public String getRoles(){
- return roles;
+ return roles;
}
public void setRoles(String roles){
- this.roles = roles;
+ this.roles = roles;
}
/** Optional parameters that can change how a core is created. */
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Fri Sep 21 17:21:34 2012
@@ -6,6 +6,7 @@ import java.util.Map;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@@ -15,7 +16,6 @@ import org.apache.solr.core.CoreContaine
import org.apache.solr.core.SolrCore;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,12 +54,12 @@ public abstract class ElectionContext {
this.zkClient = zkClient;
}
+ public void close() {}
+
public void cancelElection() throws InterruptedException, KeeperException {
zkClient.delete(leaderSeqPath, -1, true);
}
- // the given core may or may not be null - if you need access to the current core, you must pass
- // the core container and core name to your context impl - then use this core ref if it is not null
- // else access it from the core container
+
abstract void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException, IOException;
}
@@ -82,31 +82,21 @@ class ShardLeaderElectionContextBase ext
}
@Override
- void runLeaderProcess(boolean weAreReplacement)
- throws KeeperException, InterruptedException, IOException {
-
- try {
- zkClient.makePath(leaderPath,
- leaderProps == null ? null : ZkStateReader.toJSON(leaderProps),
- CreateMode.EPHEMERAL, true);
- } catch (NodeExistsException e) {
- // if a previous leader ephemeral still exists for some reason, try and
- // remove it
- zkClient.delete(leaderPath, -1, true);
- zkClient.makePath(leaderPath,
- leaderProps == null ? null : ZkStateReader.toJSON(leaderProps),
- CreateMode.EPHEMERAL, true);
- }
+ void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
+ InterruptedException, IOException {
+
+ zkClient.makePath(leaderPath, ZkStateReader.toJSON(leaderProps),
+ CreateMode.EPHEMERAL, true);
- // TODO: above we make it looks like leaderProps could be true, but here
- // you would get an NPE if it was.
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
- "leader", ZkStateReader.SHARD_ID_PROP, shardId,
- ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.BASE_URL_PROP,
- leaderProps.getProperties().get(ZkStateReader.BASE_URL_PROP),
- ZkStateReader.CORE_NAME_PROP, leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP));
+ ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION, "leader",
+ ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP,
+ collection, ZkStateReader.BASE_URL_PROP, leaderProps.getProperties()
+ .get(ZkStateReader.BASE_URL_PROP), ZkStateReader.CORE_NAME_PROP,
+ leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP),
+ ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
- }
+
+ }
}
@@ -117,6 +107,8 @@ final class ShardLeaderElectionContext e
private ZkController zkController;
private CoreContainer cc;
private SyncStrategy syncStrategy = new SyncStrategy();
+
+ private volatile boolean isClosed = false;
public ShardLeaderElectionContext(LeaderElector leaderElector,
final String shardId, final String collection,
@@ -128,121 +120,227 @@ final class ShardLeaderElectionContext e
}
@Override
- void runLeaderProcess(boolean weAreReplacement)
- throws KeeperException, InterruptedException, IOException {
- if (cc != null) {
- String coreName = leaderProps.get(ZkStateReader.CORE_NAME_PROP);
- SolrCore core = null;
+ public void close() {
+ this.isClosed = true;
+ }
+
+ @Override
+ void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
+ InterruptedException, IOException {
+ log.info("Running the leader process.");
+
+ String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP);
+
+ // clear the leader in clusterstate
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "leader",
+ ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP,
+ collection);
+ Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
+
+ String leaderVoteWait = cc.getZkController().getLeaderVoteWait();
+ if (!weAreReplacement && leaderVoteWait != null) {
+ waitForReplicasToComeUp(weAreReplacement, leaderVoteWait);
+ }
+
+ SolrCore core = null;
+ try {
+
+ core = cc.getCore(coreName);
+
+ if (core == null) {
+ cancelElection();
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Fatal Error, SolrCore not found:" + coreName + " in "
+ + cc.getCoreNames());
+ }
+
+ // should I be leader?
+ if (weAreReplacement && !shouldIBeLeader(leaderProps, core)) {
+ rejoinLeaderElection(leaderSeqPath, core);
+ return;
+ }
+
+ log.info("I may be the new leader - try and sync");
+ // we are going to attempt to be the leader
+ // first cancel any current recovery
+ core.getUpdateHandler().getSolrCoreState().cancelRecovery();
+ boolean success = false;
try {
-
- core = cc.getCore(coreName);
+ success = syncStrategy.sync(zkController, core, leaderProps);
+ } catch (Throwable t) {
+ SolrException.log(log, "Exception while trying to sync", t);
+ success = false;
+ }
+
+ // if !success but no one else is in active mode,
+ // we are the leader anyway
+ // TODO: should we also be leader if there is only one other active?
+ // if we couldn't sync with it, it shouldn't be able to sync with us
+ // TODO: this needs to be moved to the election context - the logic does
+ // not belong here.
+ if (!success
+ && !areAnyOtherReplicasActive(zkController, leaderProps, collection,
+ shardId)) {
+ log.info("Sync was not a success but no one else is active! I am the leader");
+ success = true;
+ }
+
+ // solrcloud_debug
+ // try {
+ // RefCounted<SolrIndexSearcher> searchHolder =
+ // core.getNewestSearcher(false);
+ // SolrIndexSearcher searcher = searchHolder.get();
+ // try {
+ // System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName()
+ // + " synched "
+ // + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
+ // } finally {
+ // searchHolder.decref();
+ // }
+ // } catch (Exception e) {
+ //
+ // }
+ if (!success) {
+ rejoinLeaderElection(leaderSeqPath, core);
+ return;
+ }
- if (core == null) {
- cancelElection();
- throw new SolrException(ErrorCode.SERVER_ERROR, "Fatal Error, SolrCore not found:" + coreName + " in " + cc.getCoreNames());
- }
- // should I be leader?
- if (weAreReplacement && !shouldIBeLeader(leaderProps)) {
- // System.out.println("there is a better leader candidate it appears");
- rejoinLeaderElection(leaderSeqPath, core);
- return;
+ log.info("I am the new leader: "
+ + ZkCoreNodeProps.getCoreUrl(leaderProps));
+ core.getCoreDescriptor().getCloudDescriptor().isLeader = true;
+ } finally {
+ if (core != null) {
+ core.close();
+ }
+ }
+
+ try {
+ super.runLeaderProcess(weAreReplacement);
+ } catch (Throwable t) {
+ try {
+ core = cc.getCore(coreName);
+ core.getCoreDescriptor().getCloudDescriptor().isLeader = false;
+
+ // we could not publish ourselves as leader - rejoin election
+ rejoinLeaderElection(coreName, core);
+ } finally {
+ if (core != null) {
+ core.close();
}
+ }
+ }
+
+ }
+
+ private boolean areAnyOtherReplicasActive(ZkController zkController,
+ ZkNodeProps leaderProps, String collection, String shardId) {
+ ClusterState clusterState = zkController.getZkStateReader()
+ .getClusterState();
+ Map<String,Slice> slices = clusterState.getSlices(collection);
+ Slice slice = slices.get(shardId);
+ Map<String,Replica> replicasMap = slice.getReplicasMap();
+ for (Map.Entry<String,Replica> shard : replicasMap.entrySet()) {
+ String state = shard.getValue().getStr(ZkStateReader.STATE_PROP);
+ // System.out.println("state:"
+ // + state
+ // + shard.getValue().get(ZkStateReader.NODE_NAME_PROP)
+ // + " live: "
+ // + clusterState.liveNodesContain(shard.getValue().get(
+ // ZkStateReader.NODE_NAME_PROP)));
+ if (state.equals(ZkStateReader.ACTIVE)
+ && clusterState.liveNodesContain(shard.getValue().getStr(
+ ZkStateReader.NODE_NAME_PROP))
+ && !new ZkCoreNodeProps(shard.getValue()).getCoreUrl().equals(
+ new ZkCoreNodeProps(leaderProps).getCoreUrl())) {
+ return true;
+ }
+ }
+
+ return false;
+ }
- if (weAreReplacement) {
- if (zkClient.exists(leaderPath, true)) {
- zkClient.delete(leaderPath, -1, true);
- }
- log.info("I may be the new leader - try and sync");
- // we are going to attempt to be the leader
- // first cancel any current recovery
- core.getUpdateHandler().getSolrCoreState().cancelRecovery();
- boolean success = syncStrategy.sync(zkController, core, leaderProps);
- if (!success && anyoneElseActive()) {
- rejoinLeaderElection(leaderSeqPath, core);
- return;
- }
+ private void waitForReplicasToComeUp(boolean weAreReplacement,
+ String leaderVoteWait) throws InterruptedException {
+ int timeout = Integer.parseInt(leaderVoteWait);
+ long timeoutAt = System.currentTimeMillis() + timeout;
+ final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
+
+ Slice slices = zkController.getClusterState().getSlice(collection, shardId);
+
+ while (true && !isClosed) {
+ // wait for everyone to be up
+ if (slices != null) {
+ int found = 0;
+ try {
+ found = zkClient.getChildren(shardsElectZkPath, null, true).size();
+ } catch (KeeperException e) {
+ SolrException.log(log,
+ "Errir checking for the number of election participants", e);
}
- log.info("I am the new leader: " + ZkCoreNodeProps.getCoreUrl(leaderProps));
- // If I am going to be the leader I have to be active
- core.getUpdateHandler().getSolrCoreState().cancelRecovery();
- zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
+ // on startup and after connection timeout, wait for all known shards
+ if (found >= slices.getReplicasMap().size()) {
+ log.info("Enough replicas found to continue.");
+ return;
+ } else {
+ log.info("Waiting until we see more replicas up: total="
+ + slices.getReplicasMap().size() + " found=" + found
+ + " timeoutin=" + (timeoutAt - System.currentTimeMillis()));
+ }
- } finally {
- if (core != null ) {
- core.close();
+ if (System.currentTimeMillis() > timeoutAt) {
+ log.info("Was waiting for replicas to come up, but they are taking too long - assuming they won't come back till later");
+ return;
}
}
+ Thread.sleep(500);
+ slices = zkController.getClusterState().getSlice(collection, shardId);
}
-
- super.runLeaderProcess(weAreReplacement);
}
private void rejoinLeaderElection(String leaderSeqPath, SolrCore core)
throws InterruptedException, KeeperException, IOException {
// remove our ephemeral and re join the election
- // System.out.println("sync failed, delete our election node:"
- // + leaderSeqPath);
+ if (cc.isShutDown()) {
+ log.info("Not rejoining election because CoreContainer is shutdown");
+ return;
+ }
+
log.info("There is a better leader candidate than us - going back into recovery");
- zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
+ try {
+ zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
+ } catch (Throwable t) {
+ SolrException.log(log, "Error trying to publish down state", t);
+ }
cancelElection();
- core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getName());
+ try {
+ core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getName());
+ } catch (Throwable t) {
+ SolrException.log(log, "Error trying to start recovery", t);
+ }
leaderElector.joinElection(this);
}
-
- private boolean shouldIBeLeader(ZkNodeProps leaderProps) {
- ClusterState clusterState = zkController.getZkStateReader().getClusterState();
- Map<String,Slice> slices = clusterState.getSlices(this.collection);
- Slice slice = slices.get(shardId);
- Map<String,ZkNodeProps> shards = slice.getShards();
- boolean foundSomeoneElseActive = false;
- for (Map.Entry<String,ZkNodeProps> shard : shards.entrySet()) {
- String state = shard.getValue().get(ZkStateReader.STATE_PROP);
- if (new ZkCoreNodeProps(shard.getValue()).getCoreUrl().equals(
- new ZkCoreNodeProps(leaderProps).getCoreUrl())) {
- if (state.equals(ZkStateReader.ACTIVE)
- && clusterState.liveNodesContain(shard.getValue().get(
- ZkStateReader.NODE_NAME_PROP))) {
- // we are alive
- return true;
- }
- }
-
- if ((state.equals(ZkStateReader.ACTIVE))
- && clusterState.liveNodesContain(shard.getValue().get(
- ZkStateReader.NODE_NAME_PROP))
- && !new ZkCoreNodeProps(shard.getValue()).getCoreUrl().equals(
- new ZkCoreNodeProps(leaderProps).getCoreUrl())) {
- foundSomeoneElseActive = true;
- }
+ private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core) {
+ log.info("Checking if I should try and be the leader.");
+
+ if (isClosed) {
+ log.info("Bailing on leader process because we have been closed");
+ return false;
}
- return !foundSomeoneElseActive;
- }
-
- private boolean anyoneElseActive() {
- ClusterState clusterState = zkController.getZkStateReader().getClusterState();
- Map<String,Slice> slices = clusterState.getSlices(this.collection);
- Slice slice = slices.get(shardId);
- Map<String,ZkNodeProps> shards = slice.getShards();
-
- for (Map.Entry<String,ZkNodeProps> shard : shards.entrySet()) {
- String state = shard.getValue().get(ZkStateReader.STATE_PROP);
-
-
- if ((state.equals(ZkStateReader.ACTIVE))
- && clusterState.liveNodesContain(shard.getValue().get(
- ZkStateReader.NODE_NAME_PROP))) {
- return true;
- }
+ if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished().equals(ZkStateReader.ACTIVE)) {
+ log.info("My last published State was Active, it's okay to be the leader.");
+ return true;
}
+// TODO: and if no is a good candidate?
+
return false;
}
@@ -261,24 +359,16 @@ final class OverseerElectionContext exte
}
@Override
- void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException {
+ void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
+ InterruptedException {
- final String id = leaderSeqPath.substring(leaderSeqPath.lastIndexOf("/")+1);
+ final String id = leaderSeqPath
+ .substring(leaderSeqPath.lastIndexOf("/") + 1);
ZkNodeProps myProps = new ZkNodeProps("id", id);
-
- try {
- zkClient.makePath(leaderPath,
- ZkStateReader.toJSON(myProps),
- CreateMode.EPHEMERAL, true);
- } catch (NodeExistsException e) {
- // if a previous leader ephemeral still exists for some reason, try and
- // remove it
- zkClient.delete(leaderPath, -1, true);
- zkClient.makePath(leaderPath,
- ZkStateReader.toJSON(myProps),
- CreateMode.EPHEMERAL, true);
- }
-
+
+ zkClient.makePath(leaderPath, ZkStateReader.toJSON(myProps),
+ CreateMode.EPHEMERAL, true);
+
overseer.start(id);
}
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Fri Sep 21 17:21:34 2012
@@ -52,12 +52,11 @@ import org.slf4j.LoggerFactory;
* a watch on the next lowest node it finds, and if that node goes down,
* starts the whole process over by checking if it's the lowest sequential node, etc.
*
- * TODO: now we could just reuse the lock package code for leader election
*/
public class LeaderElector {
private static Logger log = LoggerFactory.getLogger(LeaderElector.class);
- private static final String ELECTION_NODE = "/election";
+ static final String ELECTION_NODE = "/election";
private final static Pattern LEADER_SEQ = Pattern.compile(".*?/?.*?-n_(\\d+)");
private final static Pattern SESSION_ID = Pattern.compile(".*?/?(.*?-.*?)-n_\\d+");
@@ -93,6 +92,13 @@ public class LeaderElector {
sortSeqs(seqs);
List<Integer> intSeqs = getSeqs(seqs);
if (seq <= intSeqs.get(0)) {
+ // first we delete the node advertising the old leader in case the ephem is still there
+ try {
+ zkClient.delete(context.leaderPath, -1, true);
+ } catch(Exception e) {
+ // fine
+ }
+
runIamLeaderProcess(context, replacement);
} else {
// I am not the leader - watch the node below me
@@ -138,6 +144,7 @@ public class LeaderElector {
} catch (KeeperException.SessionExpiredException e) {
throw e;
} catch (KeeperException e) {
+ SolrException.log(log, "Failed setting watch", e);
// we couldn't set our watch - the node before us may already be down?
// we need to check if we are the leader again
checkIfIamLeader(seq, context, true);
@@ -155,7 +162,7 @@ public class LeaderElector {
* Returns int given String of form n_0000000001 or n_0000000003, etc.
*
* @param nStringSequence
- * @return
+ * @return sequence number
*/
private int getSeq(String nStringSequence) {
int seq = 0;
@@ -184,8 +191,7 @@ public class LeaderElector {
/**
* Returns int list given list of form n_0000000001, n_0000000003, etc.
*
- * @param seqs
- * @return
+ * @return int seqs
*/
private List<Integer> getSeqs(List<String> seqs) {
List<Integer> intSeqs = new ArrayList<Integer>(seqs.size());
@@ -237,18 +243,31 @@ public class LeaderElector {
}
}
if (!foundId) {
- throw e;
+ cont = true;
+ if (tries++ > 20) {
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e2) {
+ Thread.currentThread().interrupt();
+ }
}
} catch (KeeperException.NoNodeException e) {
// we must have failed in creating the election node - someone else must
// be working on it, lets try again
- if (tries++ > 9) {
+ if (tries++ > 20) {
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
}
cont = true;
- Thread.sleep(50);
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e2) {
+ Thread.currentThread().interrupt();
+ }
}
}
int seq = getSeq(leaderSeqPath);
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/Overseer.java Fri Sep 21 17:21:34 2012
@@ -20,11 +20,16 @@ package org.apache.solr.cloud;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.noggit.JSONUtil;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ClosableThread;
+import org.apache.solr.common.cloud.HashPartitioner;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@@ -47,7 +52,7 @@ public class Overseer {
private static Logger log = LoggerFactory.getLogger(Overseer.class);
- private class ClusterStateUpdater implements Runnable {
+ private class ClusterStateUpdater implements Runnable, ClosableThread {
private static final String DELETECORE = "deletecore";
private final ZkStateReader reader;
@@ -58,6 +63,7 @@ public class Overseer {
//Internal queue where overseer stores events that have not yet been published into cloudstate
//If Overseer dies while extracting the main queue a new overseer will start from this queue
private final DistributedQueue workQueue;
+ private volatile boolean isClosed;
public ClusterStateUpdater(final ZkStateReader reader, final String myId) {
this.zkClient = reader.getZkClient();
@@ -70,7 +76,7 @@ public class Overseer {
@Override
public void run() {
- if(amILeader() && !Overseer.this.isClosed) {
+ if(!this.isClosed && amILeader()) {
// see if there's something left from the previous Overseer and re
// process all events that were not persisted into cloud state
synchronized (reader.getUpdateLock()) { //XXX this only protects against edits inside single node
@@ -85,7 +91,7 @@ public class Overseer {
while (head != null && amILeader()) {
final ZkNodeProps message = ZkNodeProps.load(head);
final String operation = message
- .get(QUEUE_OPERATION);
+ .getStr(QUEUE_OPERATION);
clusterState = processMessage(clusterState, message, operation);
zkClient.setData(ZkStateReader.CLUSTER_STATE,
ZkStateReader.toJSON(clusterState), true);
@@ -110,7 +116,7 @@ public class Overseer {
}
log.info("Starting to work on the main queue");
- while (amILeader() && !isClosed) {
+ while (!this.isClosed && amILeader()) {
synchronized (reader.getUpdateLock()) {
try {
byte[] head = stateUpdateQueue.peek();
@@ -121,11 +127,11 @@ public class Overseer {
while (head != null) {
final ZkNodeProps message = ZkNodeProps.load(head);
- final String operation = message.get(QUEUE_OPERATION);
+ final String operation = message.getStr(QUEUE_OPERATION);
clusterState = processMessage(clusterState, message, operation);
- byte[] processed = stateUpdateQueue.remove();
- workQueue.offer(processed);
+ workQueue.offer(head);
+ stateUpdateQueue.remove();
head = stateUpdateQueue.peek();
}
zkClient.setData(ZkStateReader.CLUSTER_STATE,
@@ -164,17 +170,19 @@ public class Overseer {
} else if (DELETECORE.equals(operation)) {
clusterState = removeCore(clusterState, message);
} else if (ZkStateReader.LEADER_PROP.equals(operation)) {
+
StringBuilder sb = new StringBuilder();
- String baseUrl = message.get(ZkStateReader.BASE_URL_PROP);
- String coreName = message.get(ZkStateReader.CORE_NAME_PROP);
+ String baseUrl = message.getStr(ZkStateReader.BASE_URL_PROP);
+ String coreName = message.getStr(ZkStateReader.CORE_NAME_PROP);
sb.append(baseUrl);
- if (!baseUrl.endsWith("/")) sb.append("/");
+ if (baseUrl != null && !baseUrl.endsWith("/")) sb.append("/");
sb.append(coreName == null ? "" : coreName);
- if (!(sb.substring(sb.length() - 1).equals("/"))) sb
- .append("/");
+ if (!(sb.substring(sb.length() - 1).equals("/"))) sb.append("/");
clusterState = setShardLeader(clusterState,
- message.get(ZkStateReader.COLLECTION_PROP),
- message.get(ZkStateReader.SHARD_ID_PROP), sb.toString());
+ message.getStr(ZkStateReader.COLLECTION_PROP),
+ message.getStr(ZkStateReader.SHARD_ID_PROP),
+ sb.length() > 0 ? sb.toString() : null);
+
} else {
throw new RuntimeException("unknown operation:" + operation
+ " contents:" + message.getProperties());
@@ -185,7 +193,7 @@ public class Overseer {
private boolean amILeader() {
try {
ZkNodeProps props = ZkNodeProps.load(zkClient.getData("/overseer_elect/leader", null, null, true));
- if(myId.equals(props.get("id"))) {
+ if(myId.equals(props.getStr("id"))) {
return true;
}
} catch (KeeperException e) {
@@ -200,9 +208,9 @@ public class Overseer {
* Try to assign core to the cluster.
*/
private ClusterState updateState(ClusterState state, final ZkNodeProps message) {
- final String collection = message.get(ZkStateReader.COLLECTION_PROP);
- final String zkCoreNodeName = message.get(ZkStateReader.NODE_NAME_PROP) + "_" + message.get(ZkStateReader.CORE_NAME_PROP);
- final Integer numShards = message.get(ZkStateReader.NUM_SHARDS_PROP)!=null?Integer.parseInt(message.get(ZkStateReader.NUM_SHARDS_PROP)):null;
+ final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+ final String zkCoreNodeName = message.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + message.getStr(ZkStateReader.CORE_NAME_PROP);
+ final Integer numShards = message.getStr(ZkStateReader.NUM_SHARDS_PROP)!=null?Integer.parseInt(message.getStr(ZkStateReader.NUM_SHARDS_PROP)):null;
//collection does not yet exist, create placeholders if num shards is specified
if (!state.getCollections().contains(collection)
@@ -211,48 +219,70 @@ public class Overseer {
}
// use the provided non null shardId
- String shardId = message.get(ZkStateReader.SHARD_ID_PROP);
- if (shardId == null) {
- String nodeName = message.get(ZkStateReader.NODE_NAME_PROP);
+ String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
+ if (sliceName == null) {
+ String nodeName = message.getStr(ZkStateReader.NODE_NAME_PROP);
//get shardId from ClusterState
- shardId = getAssignedId(state, nodeName, message);
+ sliceName = getAssignedId(state, nodeName, message);
}
- if(shardId == null) {
+ if(sliceName == null) {
//request new shardId
- shardId = AssignShard.assignShard(collection, state, numShards);
+ sliceName = AssignShard.assignShard(collection, state, numShards);
}
-
- Map<String,String> props = new HashMap<String,String>();
- Map<String,String> coreProps = new HashMap<String,String>(message.getProperties().size());
- coreProps.putAll(message.getProperties());
- // we don't put num_shards in the clusterstate
- coreProps.remove(ZkStateReader.NUM_SHARDS_PROP);
- coreProps.remove(QUEUE_OPERATION);
- for (Entry<String,String> entry : coreProps.entrySet()) {
- props.put(entry.getKey(), entry.getValue());
+
+ Slice slice = state.getSlice(collection, sliceName);
+ Map<String,Object> replicaProps = new LinkedHashMap<String,Object>();
+
+ replicaProps.putAll(message.getProperties());
+ // System.out.println("########## UPDATE MESSAGE: " + JSONUtil.toJSON(message));
+ if (slice != null) {
+ Replica oldReplica = slice.getReplicasMap().get(zkCoreNodeName);
+ if (oldReplica != null && oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
+ replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
}
- ZkNodeProps zkProps = new ZkNodeProps(props);
- Slice slice = state.getSlice(collection, shardId);
- Map<String,ZkNodeProps> shardProps;
- if (slice == null) {
- shardProps = new HashMap<String,ZkNodeProps>();
+ }
+
+ // we don't put num_shards in the clusterstate
+ replicaProps.remove(ZkStateReader.NUM_SHARDS_PROP);
+ replicaProps.remove(QUEUE_OPERATION);
+
+
+ Replica replica = new Replica(zkCoreNodeName, replicaProps);
+
+ // TODO: where do we get slice properties in this message? or should there be a separate create-slice message if we want that?
+
+ Map<String,Object> sliceProps = null;
+ Map<String,Replica> replicas;
+
+ if (slice != null) {
+ sliceProps = slice.getProperties();
+ replicas = slice.getReplicasCopy();
} else {
- shardProps = state.getSlice(collection, shardId).getShardsCopy();
+ replicas = new HashMap<String, Replica>(1);
}
- shardProps.put(zkCoreNodeName, zkProps);
- slice = new Slice(shardId, shardProps);
+ replicas.put(replica.getName(), replica);
+ slice = new Slice(sliceName, replicas, sliceProps);
+
ClusterState newClusterState = updateSlice(state, collection, slice);
return newClusterState;
}
private ClusterState createCollection(ClusterState state, String collectionName, int numShards) {
+ HashPartitioner hp = new HashPartitioner();
+ List<HashPartitioner.Range> ranges = hp.partitionRange(numShards, hp.fullRange());
+
+
Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String, Slice>>();
Map<String, Slice> newSlices = new LinkedHashMap<String,Slice>();
newStates.putAll(state.getCollectionStates());
for (int i = 0; i < numShards; i++) {
final String sliceName = "shard" + (i+1);
- newSlices.put(sliceName, new Slice(sliceName, Collections.EMPTY_MAP));
+
+ Map<String,Object> sliceProps = new LinkedHashMap<String,Object>(1);
+ sliceProps.put(Slice.RANGE, ranges.get(i));
+
+ newSlices.put(sliceName, new Slice(sliceName, null, sliceProps));
}
newStates.put(collectionName, newSlices);
ClusterState newClusterState = new ClusterState(state.getLiveNodes(), newStates);
@@ -264,11 +294,11 @@ public class Overseer {
*/
private String getAssignedId(final ClusterState state, final String nodeName,
final ZkNodeProps coreState) {
- final String key = coreState.get(ZkStateReader.NODE_NAME_PROP) + "_" + coreState.get(ZkStateReader.CORE_NAME_PROP);
- Map<String, Slice> slices = state.getSlices(coreState.get(ZkStateReader.COLLECTION_PROP));
+ final String key = coreState.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + coreState.getStr(ZkStateReader.CORE_NAME_PROP);
+ Map<String, Slice> slices = state.getSlices(coreState.getStr(ZkStateReader.COLLECTION_PROP));
if (slices != null) {
for (Slice slice : slices.values()) {
- if (slice.getShards().get(key) != null) {
+ if (slice.getReplicasMap().get(key) != null) {
return slice.getName();
}
}
@@ -277,80 +307,84 @@ public class Overseer {
}
private ClusterState updateSlice(ClusterState state, String collection, Slice slice) {
-
- final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
- newStates.putAll(state.getCollectionStates());
-
- if (!newStates.containsKey(collection)) {
- newStates.put(collection, new LinkedHashMap<String,Slice>());
- }
-
- final Map<String, Slice> slices = newStates.get(collection);
- if (!slices.containsKey(slice.getName())) {
- slices.put(slice.getName(), slice);
+ // System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates()));
+ // System.out.println("Updating slice:" + slice);
+
+ Map<String, Map<String, Slice>> newCollections = new LinkedHashMap<String,Map<String,Slice>>(state.getCollectionStates()); // make a shallow copy
+ Map<String, Slice> slices = newCollections.get(collection);
+ if (slices == null) {
+ slices = new HashMap<String, Slice>(1);
} else {
- final Map<String,ZkNodeProps> shards = new LinkedHashMap<String,ZkNodeProps>();
- final Slice existingSlice = slices.get(slice.getName());
- shards.putAll(existingSlice.getShards());
- //XXX preserve existing leader
- for(Entry<String, ZkNodeProps> edit: slice.getShards().entrySet()) {
- if(existingSlice.getShards().get(edit.getKey())!=null && existingSlice.getShards().get(edit.getKey()).containsKey(ZkStateReader.LEADER_PROP)) {
- HashMap<String, String> newProps = new HashMap<String,String>();
- newProps.putAll(edit.getValue().getProperties());
- newProps.put(ZkStateReader.LEADER_PROP, existingSlice.getShards().get(edit.getKey()).get(ZkStateReader.LEADER_PROP));
- shards.put(edit.getKey(), new ZkNodeProps(newProps));
- } else {
- shards.put(edit.getKey(), edit.getValue());
- }
- }
- final Slice updatedSlice = new Slice(slice.getName(), shards);
- slices.put(slice.getName(), updatedSlice);
+ slices = new LinkedHashMap<String, Slice>(slices); // make a shallow copy
}
- return new ClusterState(state.getLiveNodes(), newStates);
+ slices.put(slice.getName(), slice);
+ newCollections.put(collection, slices);
+
+ // System.out.println("###!!!### NEW CLUSTERSTATE: " + JSONUtil.toJSON(newCollections));
+
+ return new ClusterState(state.getLiveNodes(), newCollections);
}
private ClusterState setShardLeader(ClusterState state, String collection, String sliceName, String leaderUrl) {
-
- final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
- newStates.putAll(state.getCollectionStates());
-
- final Map<String, Slice> slices = newStates.get(collection);
+
+ final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>(state.getCollectionStates());
+
+ Map<String, Slice> slices = newStates.get(collection);
if(slices==null) {
log.error("Could not mark shard leader for non existing collection:" + collection);
return state;
}
-
- if (!slices.containsKey(sliceName)) {
+
+ // make a shallow copy and add it to the new collection
+ slices = new LinkedHashMap<String,Slice>(slices);
+ newStates.put(collection, slices);
+
+
+ Slice slice = slices.get(sliceName);
+ if (slice == null) {
log.error("Could not mark leader for non existing slice:" + sliceName);
return state;
} else {
- final Map<String,ZkNodeProps> newShards = new LinkedHashMap<String,ZkNodeProps>();
- for(Entry<String, ZkNodeProps> shard: slices.get(sliceName).getShards().entrySet()) {
- Map<String, String> newShardProps = new LinkedHashMap<String,String>();
- newShardProps.putAll(shard.getValue().getProperties());
-
- newShardProps.remove(ZkStateReader.LEADER_PROP); //clean any previously existed flag
-
- ZkCoreNodeProps zkCoreNodeProps = new ZkCoreNodeProps(new ZkNodeProps(newShardProps));
- if(leaderUrl!=null && leaderUrl.equals(zkCoreNodeProps.getCoreUrl())) {
- newShardProps.put(ZkStateReader.LEADER_PROP,"true");
+ // TODO: consider just putting the leader property on the shard, not on individual replicas
+
+ Replica oldLeader = slice.getLeader();
+
+ final Map<String,Replica> newReplicas = new LinkedHashMap<String,Replica>();
+
+ for (Replica replica : slice.getReplicas()) {
+
+ // TODO: this should only be calculated once and cached somewhere?
+ String coreURL = ZkCoreNodeProps.getCoreUrl(replica.getStr(ZkStateReader.BASE_URL_PROP), replica.getStr(ZkStateReader.CORE_NAME_PROP));
+
+ if (replica == oldLeader && !coreURL.equals(leaderUrl)) {
+ Map<String,Object> replicaProps = new LinkedHashMap<String,Object>(replica.getProperties());
+ replicaProps.remove(Slice.LEADER);
+ replica = new Replica(replica.getName(), replicaProps);
+ } else if (coreURL.equals(leaderUrl)) {
+ Map<String,Object> replicaProps = new LinkedHashMap<String,Object>(replica.getProperties());
+ replicaProps.put(Slice.LEADER, "true"); // TODO: allow booleans instead of strings
+ replica = new Replica(replica.getName(), replicaProps);
}
- newShards.put(shard.getKey(), new ZkNodeProps(newShardProps));
+
+ newReplicas.put(replica.getName(), replica);
}
- Slice slice = new Slice(sliceName, newShards);
- slices.put(sliceName, slice);
+
+ Map<String,Object> newSliceProps = slice.shallowCopy();
+ newSliceProps.put(Slice.REPLICAS, newReplicas);
+ Slice newSlice = new Slice(slice.getName(), newReplicas, slice.getProperties());
+ slices.put(newSlice.getName(), newSlice);
}
return new ClusterState(state.getLiveNodes(), newStates);
}
-
+
/*
* Remove core from cloudstate
*/
private ClusterState removeCore(final ClusterState clusterState, ZkNodeProps message) {
- final String coreNodeName = message.get(ZkStateReader.NODE_NAME_PROP) + "_" + message.get(ZkStateReader.CORE_NAME_PROP);
- final String collection = message.get(ZkStateReader.COLLECTION_PROP);
+ final String coreNodeName = message.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + message.getStr(ZkStateReader.CORE_NAME_PROP);
+ final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
final LinkedHashMap<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
for(String collectionName: clusterState.getCollections()) {
@@ -358,21 +392,18 @@ public class Overseer {
Map<String, Slice> slices = clusterState.getSlices(collection);
LinkedHashMap<String, Slice> newSlices = new LinkedHashMap<String, Slice>();
for(Slice slice: slices.values()) {
- if(slice.getShards().containsKey(coreNodeName)) {
- LinkedHashMap<String, ZkNodeProps> newShards = new LinkedHashMap<String, ZkNodeProps>();
- newShards.putAll(slice.getShards());
- newShards.remove(coreNodeName);
-
- Slice newSlice = new Slice(slice.getName(), newShards);
+ if(slice.getReplicasMap().containsKey(coreNodeName)) {
+ Map<String, Replica> newReplicas = slice.getReplicasCopy();
+ newReplicas.remove(coreNodeName);
+ Slice newSlice = new Slice(slice.getName(), newReplicas, slice.getProperties());
newSlices.put(slice.getName(), newSlice);
-
} else {
newSlices.put(slice.getName(), slice);
}
}
int cnt = 0;
for (Slice slice : newSlices.values()) {
- cnt+=slice.getShards().size();
+ cnt+=slice.getReplicasMap().size();
}
// TODO: if no nodes are left after this unload
// remove from zk - do we have a race where Overseer
@@ -399,12 +430,48 @@ public class Overseer {
ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newStates);
return newState;
}
+
+ @Override
+ public void close() {
+ this.isClosed = true;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return this.isClosed;
+ }
}
- private Thread ccThread;
+ class OverseerThread extends Thread implements ClosableThread {
+
+ private volatile boolean isClosed;
- private Thread updaterThread;
+ public OverseerThread(ThreadGroup tg,
+ ClusterStateUpdater clusterStateUpdater) {
+ super(tg, clusterStateUpdater);
+ }
+
+ public OverseerThread(ThreadGroup ccTg,
+ OverseerCollectionProcessor overseerCollectionProcessor, String string) {
+ super(ccTg, overseerCollectionProcessor, string);
+ }
+
+ @Override
+ public void close() {
+ this.isClosed = true;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return this.isClosed;
+ }
+
+ }
+
+ private OverseerThread ccThread;
+
+ private OverseerThread updaterThread;
private volatile boolean isClosed;
@@ -425,11 +492,11 @@ public class Overseer {
createOverseerNode(reader.getZkClient());
//launch cluster state updater thread
ThreadGroup tg = new ThreadGroup("Overseer state updater.");
- updaterThread = new Thread(tg, new ClusterStateUpdater(reader, id));
+ updaterThread = new OverseerThread(tg, new ClusterStateUpdater(reader, id));
updaterThread.setDaemon(true);
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
- ccThread = new Thread(ccTg, new OverseerCollectionProcessor(reader, id, shardHandler, adminPath),
+ ccThread = new OverseerThread(ccTg, new OverseerCollectionProcessor(reader, id, shardHandler, adminPath),
"Overseer-" + id);
ccThread.setDaemon(true);
@@ -439,6 +506,14 @@ public class Overseer {
public void close() {
isClosed = true;
+ if (updaterThread != null) {
+ updaterThread.close();
+ updaterThread.interrupt();
+ }
+ if (ccThread != null) {
+ ccThread.close();
+ ccThread.interrupt();
+ }
}
/**
@@ -467,11 +542,11 @@ public class Overseer {
} catch (KeeperException.NodeExistsException e) {
//ok
} catch (InterruptedException e) {
- log.error("Could not create Overseer node: " + e.getClass() + ":" + e.getMessage());
+ log.error("Could not create Overseer node", e);
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (KeeperException e) {
- log.error("Could not create Overseer node: " + e.getClass() + ":" + e.getMessage());
+ log.error("Could not create Overseer node", e);
throw new RuntimeException(e);
}
}
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1388574&r1=1388573&r2=1388574&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Fri Sep 21 17:21:34 2012
@@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -41,6 +42,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OverseerCollectionProcessor implements Runnable {
+ public static final String REPLICATION_FACTOR = "replicationFactor";
+
public static final String DELETECOLLECTION = "deletecollection";
public static final String CREATECOLLECTION = "createcollection";
@@ -84,13 +87,13 @@ public class OverseerCollectionProcessor
//if (head != null) { // should not happen since we block above
final ZkNodeProps message = ZkNodeProps.load(head);
- final String operation = message.get(QUEUE_OPERATION);
+ final String operation = message.getStr(QUEUE_OPERATION);
boolean success = processMessage(message, operation);
if (!success) {
// TODO: what to do on failure / partial failure
// if we fail, do we clean up then ?
- SolrException.log(log, "Collection creation of " + message.get("name") + " failed");
+ SolrException.log(log, "Collection creation of " + message.getStr("name") + " failed");
}
//}
workQueue.remove();
@@ -118,7 +121,7 @@ public class OverseerCollectionProcessor
try {
ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData(
"/overseer_elect/leader", null, null, true));
- if (myId.equals(props.get("id"))) {
+ if (myId.equals(props.getStr("id"))) {
return true;
}
} catch (KeeperException e) {
@@ -152,15 +155,15 @@ public class OverseerCollectionProcessor
// look at the replication factor and see if it matches reality
// if it does not, find best nodes to create more cores
- String numReplicasString = message.get("numReplicas");
+ String numReplicasString = message.getStr(REPLICATION_FACTOR);
int numReplicas;
try {
numReplicas = numReplicasString == null ? 0 : Integer.parseInt(numReplicasString);
} catch (Exception ex) {
- SolrException.log(log, "Could not parse numReplicas", ex);
+ SolrException.log(log, "Could not parse " + REPLICATION_FACTOR, ex);
return false;
}
- String numShardsString = message.get("numShards");
+ String numShardsString = message.getStr("numShards");
int numShards;
try {
numShards = numShardsString == null ? 0 : Integer.parseInt(numShardsString);
@@ -169,8 +172,8 @@ public class OverseerCollectionProcessor
return false;
}
- String name = message.get("name");
- String configName = message.get("collection.configName");
+ String name = message.getStr("name");
+ String configName = message.getStr("collection.configName");
// we need to look at every node and see how many cores it serves
// add our new cores to existing nodes serving the least number of cores
@@ -237,7 +240,7 @@ public class OverseerCollectionProcessor
private boolean collectionCmd(ClusterState clusterState, ZkNodeProps message, ModifiableSolrParams params) {
log.info("Executing Collection Cmd : " + params);
- String name = message.get("name");
+ String name = message.getStr("name");
Map<String,Slice> slices = clusterState.getCollectionStates().get(name);
@@ -247,14 +250,14 @@ public class OverseerCollectionProcessor
for (Map.Entry<String,Slice> entry : slices.entrySet()) {
Slice slice = entry.getValue();
- Map<String,ZkNodeProps> shards = slice.getShards();
- Set<Map.Entry<String,ZkNodeProps>> shardEntries = shards.entrySet();
- for (Map.Entry<String,ZkNodeProps> shardEntry : shardEntries) {
+ Map<String,Replica> shards = slice.getReplicasMap();
+ Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
+ for (Map.Entry<String,Replica> shardEntry : shardEntries) {
final ZkNodeProps node = shardEntry.getValue();
- if (clusterState.liveNodesContain(node.get(ZkStateReader.NODE_NAME_PROP))) {
- params.set(CoreAdminParams.CORE, node.get(ZkStateReader.CORE_NAME_PROP));
+ if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP))) {
+ params.set(CoreAdminParams.CORE, node.getStr(ZkStateReader.CORE_NAME_PROP));
- String replica = node.get(ZkStateReader.BASE_URL_PROP);
+ String replica = node.getStr(ZkStateReader.BASE_URL_PROP);
ShardRequest sreq = new ShardRequest();
// yes, they must use same admin handler path everywhere...
params.set("qt", adminPath);
@@ -265,7 +268,7 @@ public class OverseerCollectionProcessor
sreq.shards = new String[] {replica};
sreq.actualShards = sreq.shards;
sreq.params = params;
-
+ log.info("Collection Admin sending CoreAdmin cmd to " + replica);
shardHandler.submit(sreq, replica, sreq.params);
}
}