You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2013/12/19 18:48:57 UTC
svn commit: r1552377 [8/15] - in /lucene/dev/branches/lucene5339: ./
dev-tools/ dev-tools/idea/.idea/ dev-tools/idea/.idea/libraries/
dev-tools/idea/lucene/benchmark/src/ dev-tools/idea/lucene/demo/
dev-tools/idea/lucene/facet/ dev-tools/idea/solr/cont...
Modified: lucene/dev/branches/lucene5339/solr/common-build.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/common-build.xml?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/common-build.xml (original)
+++ lucene/dev/branches/lucene5339/solr/common-build.xml Thu Dec 19 17:48:47 2013
@@ -92,6 +92,7 @@
<pathelement location="${memory.jar}"/>
<pathelement location="${misc.jar}"/>
<pathelement location="${spatial.jar}"/>
+ <pathelement location="${expressions.jar}"/>
<pathelement location="${suggest.jar}"/>
<pathelement location="${grouping.jar}"/>
<pathelement location="${queries.jar}"/>
@@ -155,7 +156,7 @@
</target>
<target name="prep-lucene-jars"
- depends="jar-lucene-core, jar-analyzers-phonetic, jar-analyzers-kuromoji, jar-codecs, jar-suggest, jar-highlighter, jar-memory,
+ depends="jar-lucene-core, jar-analyzers-phonetic, jar-analyzers-kuromoji, jar-codecs,jar-expressions, jar-suggest, jar-highlighter, jar-memory,
jar-misc, jar-spatial, jar-grouping, jar-queries, jar-queryparser, jar-join">
<property name="solr.deps.compiled" value="true"/>
</target>
@@ -228,7 +229,7 @@
<property name="lucenedocs" location="${common.dir}/build/docs"/>
<!-- dependency to ensure all lucene javadocs are present -->
- <target name="lucene-javadocs" depends="javadocs-lucene-core,javadocs-analyzers-common,javadocs-analyzers-icu,javadocs-analyzers-kuromoji,javadocs-analyzers-phonetic,javadocs-analyzers-smartcn,javadocs-analyzers-morfologik,javadocs-analyzers-stempel,javadocs-analyzers-uima,javadocs-codecs,javadocs-suggest,javadocs-grouping,javadocs-queries,javadocs-queryparser,javadocs-highlighter,javadocs-memory,javadocs-misc,javadocs-spatial,javadocs-test-framework"/>
+ <target name="lucene-javadocs" depends="javadocs-lucene-core,javadocs-analyzers-common,javadocs-analyzers-icu,javadocs-analyzers-kuromoji,javadocs-analyzers-phonetic,javadocs-analyzers-smartcn,javadocs-analyzers-morfologik,javadocs-analyzers-stempel,javadocs-analyzers-uima,javadocs-codecs,javadocs-expressions,javadocs-suggest,javadocs-grouping,javadocs-queries,javadocs-queryparser,javadocs-highlighter,javadocs-memory,javadocs-misc,javadocs-spatial,javadocs-test-framework"/>
<!-- create javadocs for the current module -->
<target name="javadocs" depends="compile-core,define-lucene-javadoc-url,lucene-javadocs,javadocs-solr-core">
@@ -295,6 +296,7 @@
<link offline="true" href="${lucene.javadoc.url}analyzers-stempel" packagelistloc="${lucenedocs}/analyzers-stempel"/>
<link offline="true" href="${lucene.javadoc.url}analyzers-uima" packagelistloc="${lucenedocs}/analyzers-uima"/>
<link offline="true" href="${lucene.javadoc.url}codecs" packagelistloc="${lucenedocs}/codecs"/>
+ <link offline="true" href="${lucene.javadoc.url}expressions" packagelistloc="${lucenedocs}/expressions"/>
<link offline="true" href="${lucene.javadoc.url}suggest" packagelistloc="${lucenedocs}/suggest"/>
<link offline="true" href="${lucene.javadoc.url}grouping" packagelistloc="${lucenedocs}/grouping"/>
<link offline="true" href="${lucene.javadoc.url}queries" packagelistloc="${lucenedocs}/queries"/>
Modified: lucene/dev/branches/lucene5339/solr/contrib/analysis-extras/src/java/org/apache/solr/schema/ICUCollationField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/contrib/analysis-extras/src/java/org/apache/solr/schema/ICUCollationField.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/contrib/analysis-extras/src/java/org/apache/solr/schema/ICUCollationField.java (original)
+++ lucene/dev/branches/lucene5339/solr/contrib/analysis-extras/src/java/org/apache/solr/schema/ICUCollationField.java Thu Dec 19 17:48:47 2013
@@ -19,6 +19,9 @@ package org.apache.solr.schema;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
@@ -26,7 +29,12 @@ import org.apache.lucene.analysis.Analyz
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.TermToBytesRefAttribute;
import org.apache.lucene.collation.ICUCollationKeyAnalyzer;
+import org.apache.lucene.document.SortedDocValuesField;
+import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.index.StorableField;
+import org.apache.lucene.search.ConstantScoreQuery;
+import org.apache.lucene.search.DocTermOrdsRangeFilter;
+import org.apache.lucene.search.FieldCacheRangeFilter;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermRangeQuery;
@@ -35,6 +43,7 @@ import org.apache.lucene.util.Version;
import org.apache.lucene.analysis.util.ResourceLoader;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.util.Base64;
import org.apache.solr.response.TextResponseWriter;
import org.apache.solr.search.QParser;
@@ -178,8 +187,7 @@ public class ICUCollationField extends F
rbc.setVariableTop(variableTop);
}
- // we use 4.0 because it ensures we just encode the pure byte[] keys.
- analyzer = new ICUCollationKeyAnalyzer(Version.LUCENE_40, collator);
+ analyzer = new ICUCollationKeyAnalyzer(Version.LUCENE_CURRENT, collator);
}
/**
@@ -229,12 +237,12 @@ public class ICUCollationField extends F
}
/**
- * analyze the range with the analyzer, instead of the collator.
+ * analyze the text with the analyzer, instead of the collator.
* because icu collators are not thread safe, this keeps things
* simple (we already have a threadlocal clone in the reused TS)
*/
- private BytesRef analyzeRangePart(String field, String part) {
- try (TokenStream source = analyzer.tokenStream(field, part)) {
+ private BytesRef getCollationKey(String field, String text) {
+ try (TokenStream source = analyzer.tokenStream(field, text)) {
source.reset();
TermToBytesRefAttribute termAtt = source.getAttribute(TermToBytesRefAttribute.class);
@@ -242,22 +250,73 @@ public class ICUCollationField extends F
// we control the analyzer here: most errors are impossible
if (!source.incrementToken())
- throw new IllegalArgumentException("analyzer returned no terms for range part: " + part);
+ throw new IllegalArgumentException("analyzer returned no terms for text: " + text);
termAtt.fillBytesRef();
assert !source.incrementToken();
source.end();
return BytesRef.deepCopyOf(bytes);
} catch (IOException e) {
- throw new RuntimeException("Unable analyze range part: " + part, e);
+ throw new RuntimeException("Unable to analyze text: " + text, e);
}
}
@Override
public Query getRangeQuery(QParser parser, SchemaField field, String part1, String part2, boolean minInclusive, boolean maxInclusive) {
String f = field.getName();
- BytesRef low = part1 == null ? null : analyzeRangePart(f, part1);
- BytesRef high = part2 == null ? null : analyzeRangePart(f, part2);
- return new TermRangeQuery(field.getName(), low, high, minInclusive, maxInclusive);
+ BytesRef low = part1 == null ? null : getCollationKey(f, part1);
+ BytesRef high = part2 == null ? null : getCollationKey(f, part2);
+ if (!field.indexed() && field.hasDocValues()) {
+ if (field.multiValued()) {
+ return new ConstantScoreQuery(DocTermOrdsRangeFilter.newBytesRefRange(
+ field.getName(), low, high, minInclusive, maxInclusive));
+ } else {
+ return new ConstantScoreQuery(FieldCacheRangeFilter.newBytesRefRange(
+ field.getName(), low, high, minInclusive, maxInclusive));
+ }
+ } else {
+ return new TermRangeQuery(field.getName(), low, high, minInclusive, maxInclusive);
+ }
+ }
+
+ @Override
+ public void checkSchemaField(SchemaField field) {
+ // no-op
+ }
+
+ @Override
+ public List<StorableField> createFields(SchemaField field, Object value, float boost) {
+ if (field.hasDocValues()) {
+ List<StorableField> fields = new ArrayList<StorableField>();
+ fields.add(createField(field, value, boost));
+ final BytesRef bytes = getCollationKey(field.getName(), value.toString());
+ if (field.multiValued()) {
+ fields.add(new SortedSetDocValuesField(field.getName(), bytes));
+ } else {
+ fields.add(new SortedDocValuesField(field.getName(), bytes));
+ }
+ return fields;
+ } else {
+ return Collections.singletonList(createField(field, value, boost));
+ }
+ }
+
+ @Override
+ public Object marshalSortValue(Object value) {
+ if (null == value) {
+ return null;
+ }
+ final BytesRef val = (BytesRef)value;
+ return Base64.byteArrayToBase64(val.bytes, val.offset, val.length);
+ }
+
+ @Override
+ public Object unmarshalSortValue(Object value) {
+ if (null == value) {
+ return null;
+ }
+ final String val = (String)value;
+ final byte[] bytes = Base64.base64ToByteArray(val);
+ return new BytesRef(bytes);
}
}
Modified: lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHProperties.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHProperties.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHProperties.java (original)
+++ lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHProperties.java Thu Dec 19 17:48:47 2013
@@ -37,6 +37,8 @@ public abstract class DIHProperties {
public abstract Map<String, Object> readIndexerProperties();
+ public abstract String convertDateToString(Date d);
+
public Date getCurrentTimestamp() {
return new Date();
}
Modified: lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java (original)
+++ lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java Thu Dec 19 17:48:47 2013
@@ -51,6 +51,11 @@ public class DocBuilder {
private static final Logger LOG = LoggerFactory.getLogger(DocBuilder.class);
private static final Date EPOCH = new Date(0);
+ public static final String DELETE_DOC_BY_ID = "$deleteDocById";
+ public static final String DELETE_DOC_BY_QUERY = "$deleteDocByQuery";
+ public static final String DOC_BOOST = "$docBoost";
+ public static final String SKIP_DOC = "$skipDoc";
+ public static final String SKIP_ROW = "$skipRow";
DataImporter dataImporter;
@@ -117,6 +122,7 @@ public class DocBuilder {
private VariableResolver getVariableResolver() {
try {
VariableResolver resolver = null;
+ String epoch = propWriter.convertDateToString(EPOCH);
if(dataImporter != null && dataImporter.getCore() != null
&& dataImporter.getCore().getResourceLoader().getCoreProperties() != null){
resolver = new VariableResolver(dataImporter.getCore().getResourceLoader().getCoreProperties());
@@ -129,7 +135,7 @@ public class DocBuilder {
indexerNamespace.put(LAST_INDEX_TIME, persistedProperties.get(LAST_INDEX_TIME));
} else {
// set epoch
- indexerNamespace.put(LAST_INDEX_TIME, EPOCH);
+ indexerNamespace.put(LAST_INDEX_TIME, epoch);
}
indexerNamespace.put(INDEX_START_TIME, dataImporter.getIndexStartTime());
indexerNamespace.put("request", new HashMap<String,Object>(reqParams.getRawParams()));
@@ -140,7 +146,7 @@ public class DocBuilder {
if (lastIndex != null) {
entityNamespace.put(SolrWriter.LAST_INDEX_KEY, lastIndex);
} else {
- entityNamespace.put(SolrWriter.LAST_INDEX_KEY, EPOCH);
+ entityNamespace.put(SolrWriter.LAST_INDEX_KEY, epoch);
}
indexerNamespace.put(entity.getName(), entityNamespace);
}
@@ -567,7 +573,7 @@ public class DocBuilder {
}
private void handleSpecialCommands(Map<String, Object> arow, DocWrapper doc) {
- Object value = arow.get("$deleteDocById");
+ Object value = arow.get(DELETE_DOC_BY_ID);
if (value != null) {
if (value instanceof Collection) {
Collection collection = (Collection) value;
@@ -580,7 +586,7 @@ public class DocBuilder {
importStatistics.deletedDocCount.incrementAndGet();
}
}
- value = arow.get("$deleteDocByQuery");
+ value = arow.get(DELETE_DOC_BY_QUERY);
if (value != null) {
if (value instanceof Collection) {
Collection collection = (Collection) value;
@@ -593,7 +599,7 @@ public class DocBuilder {
importStatistics.deletedDocCount.incrementAndGet();
}
}
- value = arow.get("$docBoost");
+ value = arow.get(DOC_BOOST);
if (value != null) {
float value1 = 1.0f;
if (value instanceof Number) {
@@ -604,7 +610,7 @@ public class DocBuilder {
doc.setDocumentBoost(value1);
}
- value = arow.get("$skipDoc");
+ value = arow.get(SKIP_DOC);
if (value != null) {
if (Boolean.parseBoolean(value.toString())) {
throw new DataImportHandlerException(DataImportHandlerException.SKIP,
@@ -612,7 +618,7 @@ public class DocBuilder {
}
}
- value = arow.get("$skipRow");
+ value = arow.get(SKIP_ROW);
if (value != null) {
if (Boolean.parseBoolean(value.toString())) {
throw new DataImportHandlerException(DataImportHandlerException.SKIP_ROW);
Modified: lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java (original)
+++ lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java Thu Dec 19 17:48:47 2013
@@ -153,7 +153,4 @@ public class EntityProcessorBase extends
public static final String CONTINUE = "continue";
public static final String SKIP = "skip";
-
- public static final String SKIP_DOC = "$skipDoc";
-
}
Modified: lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java (original)
+++ lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java Thu Dec 19 17:48:47 2013
@@ -130,7 +130,8 @@ public class SimplePropertiesWriter exte
}
- protected String convertDateToString(Date d) {
+ @Override
+ public String convertDateToString(Date d) {
return dateFormat.format(d);
}
protected Date convertStringToDate(String s) {
Modified: lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java (original)
+++ lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java Thu Dec 19 17:48:47 2013
@@ -330,7 +330,7 @@ public class XPathEntityProcessor extend
} else if (SKIP.equals(onError)) {
LOG.warn(msg, e);
Map<String, Object> map = new HashMap<String, Object>();
- map.put(SKIP_DOC, Boolean.TRUE);
+ map.put(DocBuilder.SKIP_DOC, Boolean.TRUE);
rows.add(map);
} else if (CONTINUE.equals(onError)) {
LOG.warn(msg, e);
Modified: lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/DIHConfiguration.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/DIHConfiguration.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/DIHConfiguration.java (original)
+++ lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/DIHConfiguration.java Thu Dec 19 17:48:47 2013
@@ -8,6 +8,7 @@ import java.util.Locale;
import java.util.Map;
import org.apache.solr.handler.dataimport.DataImporter;
+import org.apache.solr.handler.dataimport.DocBuilder;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.slf4j.Logger;
@@ -111,7 +112,7 @@ public class DIHConfiguration {
for (Map.Entry<String,EntityField> entry : fields.entrySet()) {
EntityField fld = entry.getValue();
SchemaField field = getSchemaField(fld.getName());
- if (field == null) {
+ if (field == null && !isSpecialCommand(fld.getName())) {
LOG.info("The field :" + fld.getName() + " present in DataConfig does not have a counterpart in Solr Schema");
}
}
@@ -178,4 +179,13 @@ public class DIHConfiguration {
public IndexSchema getSchema() {
return schema;
}
+
+ public static boolean isSpecialCommand(String fld) {
+ return DocBuilder.DELETE_DOC_BY_ID.equals(fld) ||
+ DocBuilder.DELETE_DOC_BY_QUERY.equals(fld) ||
+ DocBuilder.DOC_BOOST.equals(fld) ||
+ DocBuilder.SKIP_DOC.equals(fld) ||
+ DocBuilder.SKIP_ROW.equals(fld);
+
+ }
}
\ No newline at end of file
Modified: lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestDocBuilder2.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestDocBuilder2.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestDocBuilder2.java (original)
+++ lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestDocBuilder2.java Thu Dec 19 17:48:47 2013
@@ -132,7 +132,7 @@ public class TestDocBuilder2 extends Abs
public void testSkipDoc() throws Exception {
List rows = new ArrayList();
rows.add(createMap("id", "1", "desc", "one"));
- rows.add(createMap("id", "2", "desc", "two", "$skipDoc", "true"));
+ rows.add(createMap("id", "2", "desc", "two", DocBuilder.SKIP_DOC, "true"));
MockDataSource.setIterator("select * from x", rows.iterator());
runFullImport(dataConfigWithDynamicTransformer);
@@ -146,7 +146,7 @@ public class TestDocBuilder2 extends Abs
public void testSkipRow() throws Exception {
List rows = new ArrayList();
rows.add(createMap("id", "1", "desc", "one"));
- rows.add(createMap("id", "2", "desc", "two", "$skipRow", "true"));
+ rows.add(createMap("id", "2", "desc", "two", DocBuilder.SKIP_ROW, "true"));
MockDataSource.setIterator("select * from x", rows.iterator());
runFullImport(dataConfigWithDynamicTransformer);
@@ -166,7 +166,7 @@ public class TestDocBuilder2 extends Abs
MockDataSource.setIterator("3", rows.iterator());
rows = new ArrayList();
- rows.add(createMap("name_s", "xyz", "$skipRow", "true"));
+ rows.add(createMap("name_s", "xyz", DocBuilder.SKIP_ROW, "true"));
MockDataSource.setIterator("4", rows.iterator());
runFullImport(dataConfigWithTwoEntities);
@@ -197,7 +197,7 @@ public class TestDocBuilder2 extends Abs
List rows = new ArrayList();
rows.add(createMap("id", "1", "desc", "one"));
rows.add(createMap("id", "2", "desc", "two"));
- rows.add(createMap("id", "3", "desc", "two", "$deleteDocById", "2"));
+ rows.add(createMap("id", "3", "desc", "two", DocBuilder.DELETE_DOC_BY_ID, "2"));
MockDataSource.setIterator("select * from x", rows.iterator());
runFullImport(dataConfigForSkipTransform);
@@ -213,7 +213,7 @@ public class TestDocBuilder2 extends Abs
rows = new ArrayList();
rows.add(createMap("id", "1", "desc", "one"));
rows.add(createMap("id", "2", "desc", "one"));
- rows.add(createMap("id", "3", "desc", "two", "$deleteDocByQuery", "desc:one"));
+ rows.add(createMap("id", "3", "desc", "two", DocBuilder.DELETE_DOC_BY_QUERY, "desc:one"));
MockDataSource.setIterator("select * from x", rows.iterator());
runFullImport(dataConfigForSkipTransform);
@@ -227,7 +227,7 @@ public class TestDocBuilder2 extends Abs
MockDataSource.clearCache();
rows = new ArrayList();
- rows.add(createMap("$deleteDocById", "3"));
+ rows.add(createMap(DocBuilder.DELETE_DOC_BY_ID, "3"));
MockDataSource.setIterator("select * from x", rows.iterator());
runFullImport(dataConfigForSkipTransform, createMap("clean","false"));
assertQ(req("id:3"), "//*[@numFound='0']");
Modified: lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta.java (original)
+++ lucene/dev/branches/lucene5339/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta.java Thu Dec 19 17:48:47 2013
@@ -23,7 +23,7 @@ import org.junit.Test;
*/
/**
- * Test with various combinations of parameters, child entites, transformers.
+ * Test with various combinations of parameters, child entities, transformers.
*/
public class TestSqlEntityProcessorDelta extends AbstractSqlEntityProcessorTestCase {
private boolean delta = false;
@@ -48,6 +48,21 @@ public class TestSqlEntityProcessorDelta
singleEntity(c);
validateChanges();
}
+
+ @Test
+ public void testDeltaImportWithoutInitialFullImport() throws Exception {
+ log.debug("testDeltaImportWithoutInitialFullImport delta-import...");
+ countryEntity = false;
+ delta = true;
+ /*
+ * We need to add 2 in total:
+ * +1 for deltaQuery i.e identifying id of items to update,
+ * +1 for deletedPkQuery i.e delete query
+ */
+ singleEntity(totalPeople() + 2);
+ validateChanges();
+ }
+
@Test
public void testWithSimpleTransformer() throws Exception {
log.debug("testWithSimpleTransformer full-import...");
Modified: lucene/dev/branches/lucene5339/solr/contrib/extraction/ivy.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/contrib/extraction/ivy.xml?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/contrib/extraction/ivy.xml (original)
+++ lucene/dev/branches/lucene5339/solr/contrib/extraction/ivy.xml Thu Dec 19 17:48:47 2013
@@ -22,6 +22,7 @@
<!-- Tika JARs -->
<dependency org="org.apache.tika" name="tika-core" rev="${/org.apache.tika/tika-core}" transitive="false"/>
<dependency org="org.apache.tika" name="tika-parsers" rev="${/org.apache.tika/tika-parsers}" transitive="false"/>
+ <dependency org="org.apache.tika" name="tika-xmp" rev="${/org.apache.tika/tika-xmp}" transitive="false"/>
<!-- Tika dependencies - see http://tika.apache.org/1.3/gettingstarted.html#Using_Tika_as_a_Maven_dependency -->
<!-- When upgrading Tika, upgrade dependencies versions and add any new ones
(except slf4j-api, commons-codec, commons-logging, geronimo-stax-api_1.0_spec) -->
@@ -44,12 +45,19 @@
<dependency org="dom4j" name="dom4j" rev="${/dom4j/dom4j}" transitive="false"/>
<dependency org="org.ccil.cowan.tagsoup" name="tagsoup" rev="${/org.ccil.cowan.tagsoup/tagsoup}" transitive="false"/>
<dependency org="com.googlecode.mp4parser" name="isoparser" rev="${/com.googlecode.mp4parser/isoparser}" transitive="false"/>
+ <dependency org="org.aspectj" name="aspectjrt" rev="${/org.aspectj/aspectjrt}" transitive="false"/>
<dependency org="com.drewnoakes" name="metadata-extractor" rev="${/com.drewnoakes/metadata-extractor}" transitive="false"/>
<dependency org="de.l3s.boilerpipe" name="boilerpipe" rev="${/de.l3s.boilerpipe/boilerpipe}" transitive="false"/>
<dependency org="rome" name="rome" rev="${/rome/rome}" transitive="false"/>
<dependency org="jdom" name="jdom" rev="${/jdom/jdom}" transitive="false"/>
<dependency org="com.googlecode.juniversalchardet" name="juniversalchardet" rev="${/com.googlecode.juniversalchardet/juniversalchardet}" transitive="false"/>
<dependency org="org.tukaani" name="xz" rev="${/org.tukaani/xz}" transitive="false"/>
+ <dependency org="com.adobe.xmp" name="xmpcore" rev="${/com.adobe.xmp/xmpcore}" transitive="false"/>
+ <dependency org="org.apache.james" name="apache-mime4j-core" rev="${/org.apache.james/apache-mime4j-core}" transitive="false"/>
+ <dependency org="org.apache.james" name="apache-mime4j-dom" rev="${/org.apache.james/apache-mime4j-dom}" transitive="false"/>
+ <dependency org="asm" name="asm" rev="${/asm/asm}" transitive="false"/>
+ <dependency org="com.googlecode.juniversalchardet" name="juniversalchardet" rev="${/com.googlecode.juniversalchardet/juniversalchardet}" transitive="false"/>
+
<!-- Other ExtracingRequestHandler dependencies -->
<dependency org="com.ibm.icu" name="icu4j" rev="${/com.ibm.icu/icu4j}" transitive="false"/>
<dependency org="xerces" name="xercesImpl" rev="${/xerces/xercesImpl}" transitive="false"/>
Modified: lucene/dev/branches/lucene5339/solr/contrib/extraction/src/java/org/apache/solr/handler/extraction/SolrContentHandlerFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/contrib/extraction/src/java/org/apache/solr/handler/extraction/SolrContentHandlerFactory.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/contrib/extraction/src/java/org/apache/solr/handler/extraction/SolrContentHandlerFactory.java (original)
+++ lucene/dev/branches/lucene5339/solr/contrib/extraction/src/java/org/apache/solr/handler/extraction/SolrContentHandlerFactory.java Thu Dec 19 17:48:47 2013
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Modified: lucene/dev/branches/lucene5339/solr/core/ivy.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/core/ivy.xml?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/core/ivy.xml (original)
+++ lucene/dev/branches/lucene5339/solr/core/ivy.xml Thu Dec 19 17:48:47 2013
@@ -35,6 +35,9 @@
<dependency org="commons-lang" name="commons-lang" rev="${/commons-lang/commons-lang}" conf="compile->*"/>
<dependency org="com.google.guava" name="guava" rev="${/com.google.guava/guava}" conf="compile->*"/>
<dependency org="com.spatial4j" name="spatial4j" rev="${/com.spatial4j/spatial4j}" conf="compile->*"/>
+ <dependency org="org.antlr" name="antlr-runtime" rev="${/org.antlr/antlr-runtime}" transitive="false"/>
+ <dependency org="org.ow2.asm" name="asm" rev="${/org.ow2.asm/asm}" transitive="false"/>
+ <dependency org="org.ow2.asm" name="asm-commons" rev="${/org.ow2.asm/asm-commons}" transitive="false"/>
<dependency org="org.restlet.jee" name="org.restlet" rev="${/org.restlet.jee/org.restlet}" conf="compile->*"/>
<dependency org="org.restlet.jee" name="org.restlet.ext.servlet" rev="${/org.restlet.jee/org.restlet.ext.servlet}" conf="compile->*"/>
<dependency org="joda-time" name="joda-time" rev="${/joda-time/joda-time}" conf="compile->*"/>
Modified: lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java (original)
+++ lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java Thu Dec 19 17:48:47 2013
@@ -93,6 +93,8 @@ public class JettySolrRunner {
/** Maps servlet holders (i.e. factories: class + init params) to path specs */
private SortedMap<ServletHolder,String> extraServlets = new TreeMap<ServletHolder,String>();
+ private SortedMap<Class,String> extraRequestFilters;
+ private LinkedList<FilterHolder> extraFilters;
private SSLConfig sslConfig;
@@ -167,16 +169,30 @@ public class JettySolrRunner {
public JettySolrRunner(String solrHome, String context, int port,
String solrConfigFilename, String schemaFileName, boolean stopAtShutdown,
SortedMap<ServletHolder,String> extraServlets) {
- if (null != extraServlets) { this.extraServlets.putAll(extraServlets); }
- this.init(solrHome, context, port, stopAtShutdown);
- this.solrConfigFilename = solrConfigFilename;
- this.schemaFilename = schemaFileName;
+ this (solrHome, context, port, solrConfigFilename, schemaFileName,
+ stopAtShutdown, extraServlets, null, null);
}
public JettySolrRunner(String solrHome, String context, int port,
String solrConfigFilename, String schemaFileName, boolean stopAtShutdown,
SortedMap<ServletHolder,String> extraServlets, SSLConfig sslConfig) {
+ this (solrHome, context, port, solrConfigFilename, schemaFileName,
+ stopAtShutdown, extraServlets, sslConfig, null);
+ }
+
+ /**
+ * Constructor taking an ordered list of additional (filter holder -> path spec) mappings.
+ * Filters are placed after the DebugFilter but before the SolrDispatchFilter.
+ */
+ public JettySolrRunner(String solrHome, String context, int port,
+ String solrConfigFilename, String schemaFileName, boolean stopAtShutdown,
+ SortedMap<ServletHolder,String> extraServlets, SSLConfig sslConfig,
+ SortedMap<Class,String> extraRequestFilters) {
if (null != extraServlets) { this.extraServlets.putAll(extraServlets); }
+ if (null != extraRequestFilters) {
+ this.extraRequestFilters = new TreeMap<Class,String>(extraRequestFilters.comparator());
+ this.extraRequestFilters.putAll(extraRequestFilters);
+ }
this.init(solrHome, context, port, stopAtShutdown);
this.solrConfigFilename = solrConfigFilename;
this.schemaFilename = schemaFileName;
@@ -227,6 +243,7 @@ public class JettySolrRunner {
: new SelectChannelConnector();
c.setReuseAddress(true);
c.setLowResourcesMaxIdleTime(1500);
+ c.setSoLingerTime(0);
connector = c;
threadPool = (QueuedThreadPool) c.getThreadPool();
} else if ("Socket".equals(connectorName)) {
@@ -234,6 +251,7 @@ public class JettySolrRunner {
? new SslSocketConnector(sslcontext)
: new SocketConnector();
c.setReuseAddress(true);
+ c.setSoLingerTime(0);
connector = c;
threadPool = (QueuedThreadPool) c.getThreadPool();
} else {
@@ -307,6 +325,13 @@ public class JettySolrRunner {
// SolrDispatchFilter filter = new SolrDispatchFilter();
// FilterHolder fh = new FilterHolder(filter);
debugFilter = root.addFilter(DebugFilter.class, "*", EnumSet.of(DispatcherType.REQUEST) );
+ if (extraRequestFilters != null) {
+ extraFilters = new LinkedList<FilterHolder>();
+ for (Class filterClass : extraRequestFilters.keySet()) {
+ extraFilters.add(root.addFilter(filterClass, extraRequestFilters.get(filterClass),
+ EnumSet.of(DispatcherType.REQUEST)));
+ }
+ }
dispatchFilter = root.addFilter(SolrDispatchFilter.class, "*", EnumSet.of(DispatcherType.REQUEST) );
for (ServletHolder servletHolder : extraServlets.keySet()) {
String pathSpec = extraServlets.get(servletHolder);
@@ -443,6 +468,11 @@ public class JettySolrRunner {
//server.destroy();
if (server.getState().equals(Server.FAILED)) {
filter.destroy();
+ if (extraFilters != null) {
+ for (FilterHolder f : extraFilters) {
+ f.getFilter().destroy();
+ }
+ }
}
server.join();
Modified: lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Thu Dec 19 17:48:47 2013
@@ -115,7 +115,7 @@ public class DistributedQueue {
*
* @return the data at the head of the queue.
*/
- private QueueEvent element() throws NoSuchElementException, KeeperException,
+ private QueueEvent element() throws KeeperException,
InterruptedException {
TreeMap<Long,String> orderedChildren;
@@ -130,9 +130,9 @@ public class DistributedQueue {
try {
orderedChildren = orderedChildren(null);
} catch (KeeperException.NoNodeException e) {
- throw new NoSuchElementException();
+ return null;
}
- if (orderedChildren.size() == 0) throw new NoSuchElementException();
+ if (orderedChildren.size() == 0) return null;
for (String headNode : orderedChildren.values()) {
if (headNode != null) {
@@ -208,7 +208,7 @@ public class DistributedQueue {
@Override
public void process(WatchedEvent event) {
- LOG.info("Watcher fired on path: " + event.getPath() + " state: "
+ LOG.info("LatchChildWatcher fired on path: " + event.getPath() + " state: "
+ event.getState() + " type " + event.getType());
synchronized (lock) {
this.event = event;
@@ -322,11 +322,9 @@ public class DistributedQueue {
* @return data at the first element of the queue, or null.
*/
public byte[] peek() throws KeeperException, InterruptedException {
- try {
- return element().getBytes();
- } catch (NoSuchElementException e) {
- return null;
- }
+ QueueEvent element = element();
+ if(element == null) return null;
+ return element.getBytes();
}
public static class QueueEvent {
@@ -384,16 +382,29 @@ public class DistributedQueue {
/**
* Returns the data at the first element of the queue, or null if the queue is
- * empty.
+ * empty and block is false.
*
+ * @param block if true, blocks until an element enters the queue
* @return data at the first element of the queue, or null.
*/
public QueueEvent peek(boolean block) throws KeeperException, InterruptedException {
- if (!block) {
+ return peek(block ? Long.MAX_VALUE : 0);
+ }
+
+ /**
+ * Returns the data at the first element of the queue, or null if the queue is
+ * empty after wait ms.
+ *
+ * @param wait max wait time in ms.
+ * @return data at the first element of the queue, or null.
+ */
+ public QueueEvent peek(long wait) throws KeeperException, InterruptedException {
+ if (wait == 0) {
return element();
}
-
+
TreeMap<Long,String> orderedChildren;
+ boolean waitedEnough = false;
while (true) {
LatchChildWatcher childWatcher = new LatchChildWatcher();
try {
@@ -402,11 +413,15 @@ public class DistributedQueue {
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
continue;
}
+ if(waitedEnough) {
+ if(orderedChildren.isEmpty()) return null;
+ }
if (orderedChildren.size() == 0) {
- childWatcher.await(DEFAULT_TIMEOUT);
+ childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT: wait);
+ waitedEnough = wait != Long.MAX_VALUE;
continue;
}
-
+
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try {
Modified: lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Thu Dec 19 17:48:47 2013
@@ -71,6 +71,10 @@ public abstract class ElectionContext {
}
abstract void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException, IOException;
+
+ public void checkIfIamLeaderFired() {}
+
+ public void joinedElectionFired() {}
}
class ShardLeaderElectionContextBase extends ElectionContext {
@@ -114,9 +118,9 @@ class ShardLeaderElectionContextBase ext
final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
private static Logger log = LoggerFactory.getLogger(ShardLeaderElectionContext.class);
- private ZkController zkController;
- private CoreContainer cc;
- private SyncStrategy syncStrategy = new SyncStrategy();
+ private final ZkController zkController;
+ private final CoreContainer cc;
+ private final SyncStrategy syncStrategy;
private volatile boolean isClosed = false;
@@ -127,6 +131,7 @@ final class ShardLeaderElectionContext e
zkController.getZkStateReader());
this.zkController = zkController;
this.cc = cc;
+ syncStrategy = new SyncStrategy(cc.getUpdateShardHandler());
}
@Override
@@ -180,6 +185,17 @@ final class ShardLeaderElectionContext e
// we are going to attempt to be the leader
// first cancel any current recovery
core.getUpdateHandler().getSolrCoreState().cancelRecovery();
+
+ if (weAreReplacement) {
+ // wait a moment for any floating updates to finish
+ try {
+ Thread.sleep(2500);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, e);
+ }
+ }
+
boolean success = false;
try {
success = syncStrategy.sync(zkController, core, leaderProps);
@@ -438,4 +454,15 @@ final class OverseerElectionContext exte
overseer.start(id);
}
+ @Override
+ public void joinedElectionFired() {
+ overseer.close();
+ }
+
+ @Override
+ public void checkIfIamLeaderFired() {
+ // leader changed - close the overseer
+ overseer.close();
+ }
+
}
Modified: lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Thu Dec 19 17:48:47 2013
@@ -63,12 +63,20 @@ public class LeaderElector {
protected SolrZkClient zkClient;
private ZkCmdExecutor zkCmdExecutor;
-
+
+ // for tests
+ private volatile ElectionContext context;
+
public LeaderElector(SolrZkClient zkClient) {
this.zkClient = zkClient;
zkCmdExecutor = new ZkCmdExecutor((int) (zkClient.getZkClientTimeout()/1000.0 + 3000));
}
+ // for tests
+ public ElectionContext getContext() {
+ return context;
+ }
+
/**
* Check if the candidate with the given n_* sequence number is the leader.
* If it is, set the leaderId on the leader zk node. If it is not, start
@@ -79,6 +87,7 @@ public class LeaderElector {
*/
private void checkIfIamLeader(final int seq, final ElectionContext context, boolean replacement) throws KeeperException,
InterruptedException, IOException {
+ context.checkIfIamLeaderFired();
// get all other numbers...
final String holdElectionPath = context.electionPath + ELECTION_NODE;
List<String> seqs = zkClient.getChildren(holdElectionPath, null, true);
@@ -208,6 +217,8 @@ public class LeaderElector {
* @return sequential node number
*/
public int joinElection(ElectionContext context, boolean replacement) throws KeeperException, InterruptedException, IOException {
+ context.joinedElectionFired();
+
final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE;
long sessionId = zkClient.getSolrZooKeeper().getSessionId();
@@ -273,6 +284,7 @@ public class LeaderElector {
*/
public void setup(final ElectionContext context) throws InterruptedException,
KeeperException {
+ this.context = context;
String electZKPath = context.electionPath + LeaderElector.ELECTION_NODE;
zkCmdExecutor.ensureExists(electZKPath, zkClient);
Modified: lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/Overseer.java Thu Dec 19 17:48:47 2013
@@ -46,6 +46,8 @@ import org.apache.zookeeper.KeeperExcept
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.util.Collections.singletonMap;
+
/**
* Cluster leader. Responsible node assignments, cluster state file?
*/
@@ -63,6 +65,8 @@ public class Overseer {
static enum LeaderStatus { DONT_KNOW, NO, YES };
+ private long lastUpdatedTime = 0;
+
private class ClusterStateUpdater implements Runnable, ClosableThread {
private final ZkStateReader reader;
@@ -151,33 +155,51 @@ public class Overseer {
break;
}
else if (LeaderStatus.YES != isLeader) {
- log.debug("am_i_leader unclear {}", isLeader);
+ log.debug("am_i_leader unclear {}", isLeader);
continue; // not a no, not a yes, try ask again
}
+ DistributedQueue.QueueEvent head = null;
+ try {
+ head = stateUpdateQueue.peek(true);
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
+ log.warn(
+ "Solr cannot talk to ZK, exiting Overseer main queue loop", e);
+ return;
+ }
+ log.error("Exception in Overseer main queue loop", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+
+ } catch (Exception e) {
+ log.error("Exception in Overseer main queue loop", e);
+ }
synchronized (reader.getUpdateLock()) {
try {
- byte[] head = stateUpdateQueue.peek();
-
- if (head != null) {
- reader.updateClusterState(true);
- ClusterState clusterState = reader.getClusterState();
+ reader.updateClusterState(true);
+ ClusterState clusterState = reader.getClusterState();
+
+ while (head != null) {
+ final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
+ final String operation = message.getStr(QUEUE_OPERATION);
+
+ clusterState = processMessage(clusterState, message, operation);
+ workQueue.offer(head.getBytes());
+
+ stateUpdateQueue.poll();
+
+ if (System.currentTimeMillis() - lastUpdatedTime > STATE_UPDATE_DELAY) break;
- while (head != null) {
- final ZkNodeProps message = ZkNodeProps.load(head);
- final String operation = message.getStr(QUEUE_OPERATION);
-
- clusterState = processMessage(clusterState, message, operation);
- workQueue.offer(head);
-
- stateUpdateQueue.poll();
- head = stateUpdateQueue.peek();
- }
- zkClient.setData(ZkStateReader.CLUSTER_STATE,
- ZkStateReader.toJSON(clusterState), true);
+ // if an event comes in the next 100ms batch it together
+ head = stateUpdateQueue.peek(100);
}
+ lastUpdatedTime = System.currentTimeMillis();
+ zkClient.setData(ZkStateReader.CLUSTER_STATE,
+ ZkStateReader.toJSON(clusterState), true);
// clean work queue
- while (workQueue.poll() != null);
-
+ while (workQueue.poll() != null) ;
+
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
@@ -193,11 +215,6 @@ public class Overseer {
}
}
- try {
- Thread.sleep(STATE_UPDATE_DELAY);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
}
}
@@ -449,7 +466,7 @@ public class Overseer {
//request new shardId
if (collectionExists) {
// use existing numShards
- numShards = state.getCollectionStates().get(collection).getSlices().size();
+ numShards = state.getCollection(collection).getSlices().size();
log.info("Collection already exists with " + ZkStateReader.NUM_SHARDS_PROP + "=" + numShards);
}
sliceName = Assign.assignShard(collection, state, numShards);
@@ -596,11 +613,11 @@ public class Overseer {
List<DocRouter.Range> ranges = router.partitionRange(shards.size(), router.fullRange());
- Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>();
+// Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>();
Map<String, Slice> newSlices = new LinkedHashMap<String,Slice>();
- newCollections.putAll(state.getCollectionStates());
+// newCollections.putAll(state.getCollectionStates());
for (int i = 0; i < shards.size(); i++) {
String sliceName = shards.get(i);
/*}
@@ -628,9 +645,10 @@ public class Overseer {
if(message.getStr("fromApi") == null) collectionProps.put("autoCreated","true");
DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router);
- newCollections.put(collectionName, newCollection);
- ClusterState newClusterState = new ClusterState(state.getLiveNodes(), newCollections);
- return newClusterState;
+// newCollections.put(collectionName, newCollection);
+ return state.copyWith(singletonMap(newCollection.getName(), newCollection));
+// ClusterState newClusterState = new ClusterState(state.getLiveNodes(), newCollections);
+// return newClusterState;
}
/*
@@ -756,6 +774,9 @@ public class Overseer {
newCollections.put(collectionName, newCollection);
return new ClusterState(state.getLiveNodes(), newCollections);
}
+ private ClusterState newState(ClusterState state, Map<String, DocCollection> colls) {
+ return state.copyWith(colls);
+ }
/*
* Remove collection from cloudstate
@@ -764,11 +785,11 @@ public class Overseer {
final String collection = message.getStr("name");
- final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
- newCollections.remove(collection);
+// final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
+// newCollections.remove(collection);
- ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newCollections);
- return newState;
+// ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newCollections);
+ return clusterState.copyWith(singletonMap(collection, (DocCollection)null));
}
/*
@@ -780,16 +801,17 @@ public class Overseer {
log.info("Removing collection: " + collection + " shard: " + sliceId + " from clusterstate");
- final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
- DocCollection coll = newCollections.get(collection);
+// final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
+ DocCollection coll = clusterState.getCollection(collection);
Map<String, Slice> newSlices = new LinkedHashMap<String, Slice>(coll.getSlicesMap());
newSlices.remove(sliceId);
DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
- newCollections.put(newCollection.getName(), newCollection);
+// newCollections.put(newCollection.getName(), newCollection);
+ return newState(clusterState, singletonMap(collection,newCollection));
- return new ClusterState(clusterState.getLiveNodes(), newCollections);
+// return new ClusterState(clusterState.getLiveNodes(), newCollections);
}
/*
@@ -801,8 +823,9 @@ public class Overseer {
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
- final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
- DocCollection coll = newCollections.get(collection);
+// final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
+// DocCollection coll = newCollections.get(collection);
+ DocCollection coll = clusterState.getCollectionOrNull(collection) ;
if (coll == null) {
// TODO: log/error that we didn't find it?
// just in case, remove the zk collection node
@@ -851,7 +874,7 @@ public class Overseer {
// if there are no slices left in the collection, remove it?
if (newSlices.size() == 0) {
- newCollections.remove(coll.getName());
+// newCollections.remove(coll.getName());
// TODO: it might be better logically to have this in ZkController
// but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or
@@ -864,15 +887,18 @@ public class Overseer {
} catch (KeeperException e) {
SolrException.log(log, "Problem cleaning up collection in zk:" + collection, e);
}
+ return newState(clusterState,singletonMap(collection, (DocCollection) null));
+
} else {
DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
- newCollections.put(newCollection.getName(), newCollection);
+ return newState(clusterState,singletonMap(collection,newCollection));
+// newCollections.put(newCollection.getName(), newCollection);
}
- ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newCollections);
- return newState;
+// ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newCollections);
+// return newState;
}
@Override
@@ -935,11 +961,9 @@ public class Overseer {
}
- private OverseerThread ccThread;
+ private volatile OverseerThread ccThread;
- private OverseerThread updaterThread;
-
- private volatile boolean isClosed;
+ private volatile OverseerThread updaterThread;
private ZkStateReader reader;
@@ -954,6 +978,7 @@ public class Overseer {
}
public void start(String id) {
+ close();
log.info("Overseer (id=" + id + ") starting");
createOverseerNode(reader.getZkClient());
//launch cluster state updater thread
@@ -970,8 +995,11 @@ public class Overseer {
ccThread.start();
}
+ public OverseerThread getUpdaterThread() {
+ return updaterThread;
+ }
+
public void close() {
- isClosed = true;
if (updaterThread != null) {
try {
updaterThread.close();
@@ -988,12 +1016,8 @@ public class Overseer {
log.error("Error closing ccThread", t);
}
}
-
- try {
- reader.close();
- } catch (Throwable t) {
- log.error("Error closing zkStateReader", t);
- }
+ updaterThread = null;
+ ccThread = null;
}
/**
Modified: lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Thu Dec 19 17:48:47 2013
@@ -47,13 +47,13 @@ import org.apache.solr.common.params.Cor
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
+import org.apache.solr.update.SolrIndexSplitter;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,11 +111,13 @@ public class OverseerCollectionProcessor
public static final String COLL_CONF = "collection.configName";
+ public static final String COLL_PROP_PREFIX = "property.";
public static final Map<String,Object> COLL_PROPS = ZkNodeProps.makeMap(
ROUTER, DocRouter.DEFAULT_NAME,
REPLICATION_FACTOR, "1",
- MAX_SHARDS_PER_NODE, "1");
+ MAX_SHARDS_PER_NODE, "1",
+ "external",null );
// TODO: use from Overseer?
@@ -286,36 +288,34 @@ public class OverseerCollectionProcessor
String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
- //assume the core exists and try to unload it
- if (!Slice.ACTIVE.equals(replica.getStr(Slice.STATE))) {
- deleteCoreNode(collectionName, replicaName, replica, core);
- if(waitForCoreNodeGone(collectionName, shard, replicaName)) return;
- } else {
- Map m = ZkNodeProps.makeMap("qt", adminPath,
- CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString(),
- CoreAdminParams.CORE, core) ;
-
- ShardRequest sreq = new ShardRequest();
- sreq.purpose = 1;
- if (baseUrl.startsWith("http://")) baseUrl = baseUrl.substring(7);
- sreq.shards = new String[]{baseUrl};
- sreq.actualShards = sreq.shards;
- sreq.params = new ModifiableSolrParams(new MapSolrParams(m) );
- try {
- shardHandler.submit(sreq, baseUrl, sreq.params);
- } catch (Exception e) {
- log.info("Exception trying to unload core "+sreq,e);
- }
- if (waitForCoreNodeGone(collectionName, shard, replicaName)) return;//check if the core unload removed the corenode zk enry
- deleteCoreNode(collectionName, replicaName, replica, core); // this could be because the core is gone but not updated in ZK yet (race condition)
- if(waitForCoreNodeGone(collectionName, shard, replicaName)) return;
-
+
+ // assume the core exists and try to unload it
+ Map m = ZkNodeProps.makeMap("qt", adminPath, CoreAdminParams.ACTION,
+ CoreAdminAction.UNLOAD.toString(), CoreAdminParams.CORE, core);
+
+ ShardRequest sreq = new ShardRequest();
+ sreq.purpose = 1;
+ if (baseUrl.startsWith("http://")) baseUrl = baseUrl.substring(7);
+ sreq.shards = new String[] {baseUrl};
+ sreq.actualShards = sreq.shards;
+ sreq.params = new ModifiableSolrParams(new MapSolrParams(m));
+ try {
+ shardHandler.submit(sreq, baseUrl, sreq.params);
+ } catch (Exception e) {
+ log.warn("Exception trying to unload core " + sreq, e);
}
- throw new SolrException(ErrorCode.SERVER_ERROR, "Could not remove replica : "+collectionName+"/"+shard+"/"+replicaName);
+
+ collectShardResponses(!Slice.ACTIVE.equals(replica.getStr(Slice.STATE)) ? new NamedList() : results, false, null);
+
+ if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return;//check if the core unload removed the corenode zk enry
+ deleteCoreNode(collectionName, replicaName, replica, core); // try and ensure core info is removed from clusterstate
+ if(waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return;
+
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Could not remove replica : " + collectionName + "/" + shard+"/" + replicaName);
}
- private boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName) throws InterruptedException {
- long waitUntil = System.currentTimeMillis() + 30000;
+ private boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
+ long waitUntil = System.currentTimeMillis() + timeoutms;
boolean deleted = false;
while (System.currentTimeMillis() < waitUntil) {
Thread.sleep(100);
@@ -545,6 +545,7 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, sliceName);
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+ addPropertyParams(message, params);
ShardRequest sreq = new ShardRequest();
params.set("qt", adminPath);
@@ -739,7 +740,7 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.SHARD_RANGE, subRange.toString());
params.set(CoreAdminParams.SHARD_STATE, Slice.CONSTRUCTION);
params.set(CoreAdminParams.SHARD_PARENT, parentSlice.getName());
-
+ addPropertyParams(message, params);
sendShardRequest(nodeName, params);
}
@@ -849,6 +850,7 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.NAME, shardName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, sliceName);
+ addPropertyParams(message, params);
// TODO: Figure the config used by the parent shard and use it.
//params.set("collection.configName", configName);
@@ -1105,7 +1107,7 @@ public class OverseerCollectionProcessor
private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice, DocCollection targetCollection, Slice targetSlice, String splitKey, int timeout, NamedList results) throws KeeperException, InterruptedException {
String tempSourceCollectionName = "split_" + sourceSlice.getName() + "_temp_" + targetSlice.getName();
- if (clusterState.getCollectionStates().containsKey(tempSourceCollectionName)) {
+ if (clusterState.hasCollection(tempSourceCollectionName)) {
log.info("Deleting temporary collection: " + tempSourceCollectionName);
Map<String, Object> props = ZkNodeProps.makeMap(
QUEUE_OPERATION, DELETECOLLECTION,
@@ -1144,7 +1146,7 @@ public class OverseerCollectionProcessor
Overseer.QUEUE_OPERATION, Overseer.ADD_ROUTING_RULE,
COLLECTION_PROP, sourceCollection.getName(),
SHARD_ID_PROP, sourceSlice.getName(),
- "routeKey", splitKey,
+ "routeKey", SolrIndexSplitter.getRouteKey(splitKey) + "!",
"range", splitRange.toString(),
"targetCollection", targetCollection.getName(),
"expireAt", String.valueOf(System.currentTimeMillis() + timeout));
@@ -1160,8 +1162,8 @@ public class OverseerCollectionProcessor
Thread.sleep(100);
Map<String, RoutingRule> rules = zkStateReader.getClusterState().getSlice(sourceCollection.getName(), sourceSlice.getName()).getRoutingRules();
if (rules != null) {
- RoutingRule rule = rules.get(splitKey);
- if (rule.getRouteRanges().contains(splitRange)) {
+ RoutingRule rule = rules.get(SolrIndexSplitter.getRouteKey(splitKey) + "!");
+ if (rule != null && rule.getRouteRanges().contains(splitRange)) {
added = true;
break;
}
@@ -1177,13 +1179,13 @@ public class OverseerCollectionProcessor
Replica sourceLeader = sourceSlice.getLeader();
// create a temporary collection with just one node on the shard leader
- String sourceLeaderUrl = zkStateReader.getZkClient().getBaseUrlForNodeName(sourceLeader.getNodeName());
- if (sourceLeaderUrl.startsWith("http://")) sourceLeaderUrl = sourceLeaderUrl.substring(7);
+ String configName = zkStateReader.readConfigName(sourceCollection.getName());
Map<String, Object> props = ZkNodeProps.makeMap(
QUEUE_OPERATION, CREATECOLLECTION,
"name", tempSourceCollectionName,
REPLICATION_FACTOR, 1,
NUM_SLICES, 1,
+ COLL_CONF, configName,
CREATE_NODE_SET, sourceLeader.getNodeName());
log.info("Creating temporary collection: " + props);
createCollection(clusterState, new ZkNodeProps(props), results);
@@ -1192,6 +1194,23 @@ public class OverseerCollectionProcessor
Slice tempSourceSlice = clusterState.getCollection(tempSourceCollectionName).getSlices().iterator().next();
Replica tempSourceLeader = clusterState.getLeader(tempSourceCollectionName, tempSourceSlice.getName());
+ String tempCollectionReplica1 = tempSourceCollectionName + "_" + tempSourceSlice.getName() + "_replica1";
+ String coreNodeName = waitForCoreNodeName(clusterState.getCollection(tempSourceCollectionName),
+ zkStateReader.getZkClient().getBaseUrlForNodeName(sourceLeader.getNodeName()), tempCollectionReplica1);
+ // wait for the replicas to be seen as active on temp source leader
+ log.info("Asking source leader to wait for: " + tempCollectionReplica1 + " to be alive on: " + sourceLeader.getNodeName());
+ CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
+ cmd.setCoreName(tempCollectionReplica1);
+ cmd.setNodeName(sourceLeader.getNodeName());
+ cmd.setCoreNodeName(coreNodeName);
+ cmd.setState(ZkStateReader.ACTIVE);
+ cmd.setCheckLive(true);
+ cmd.setOnlyIfLeader(true);
+ sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()));
+
+ collectShardResponses(results, true,
+ "MIGRATE failed to create temp collection leader or timed out waiting for it to come up");
+
log.info("Asking source leader to split index");
params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString());
@@ -1213,11 +1232,11 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.SHARD, tempSourceSlice.getName());
sendShardRequest(targetLeader.getNodeName(), params);
- String coreNodeName = waitForCoreNodeName(clusterState.getCollection(tempSourceCollectionName),
+ coreNodeName = waitForCoreNodeName(clusterState.getCollection(tempSourceCollectionName),
zkStateReader.getZkClient().getBaseUrlForNodeName(targetLeader.getNodeName()), tempCollectionReplica2);
// wait for the replicas to be seen as active on temp source leader
log.info("Asking temp source leader to wait for: " + tempCollectionReplica2 + " to be alive on: " + targetLeader.getNodeName());
- CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
+ cmd = new CoreAdminRequest.WaitForState();
cmd.setCoreName(tempSourceLeader.getStr("core"));
cmd.setNodeName(targetLeader.getNodeName());
cmd.setCoreNodeName(coreNodeName);
@@ -1287,6 +1306,14 @@ public class OverseerCollectionProcessor
shardHandler.submit(sreq, replica, sreq.params);
}
+ private void addPropertyParams(ZkNodeProps message, ModifiableSolrParams params) {
+ // Now add the property.key=value pairs
+ for (String key : message.keySet()) {
+ if (key.startsWith(COLL_PROP_PREFIX)) {
+ params.set(key, message.getStr(key));
+ }
+ }
+ }
private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
String collectionName = message.getStr("name");
if (clusterState.getCollections().contains(collectionName)) {
@@ -1317,7 +1344,7 @@ public class OverseerCollectionProcessor
List<String> createNodeList = ((createNodeSetStr = message.getStr(CREATE_NODE_SET)) == null)?null:StrUtils.splitSmart(createNodeSetStr, ",", true);
if (repFactor <= 0) {
- throw new SolrException(ErrorCode.BAD_REQUEST, REPLICATION_FACTOR + " must be greater than or equal to 0");
+ throw new SolrException(ErrorCode.BAD_REQUEST, REPLICATION_FACTOR + " must be greater than 0");
}
if (numSlices <= 0) {
@@ -1366,6 +1393,7 @@ public class OverseerCollectionProcessor
+ ". This requires " + requestedShardsToCreate
+ " shards to be created (higher than the allowed number)");
}
+ String configName = createConfNode(collectionName, message);
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message));
@@ -1380,8 +1408,6 @@ public class OverseerCollectionProcessor
if (!created)
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully createcollection: " + message.getStr("name"));
-
- String configName = message.getStr(COLL_CONF);
log.info("going to create cores replicas shardNames {} , repFactor : {}", shardNames, repFactor);
for (int i = 1; i <= shardNames.size(); i++) {
String sliceName = shardNames.get(i-1);
@@ -1401,6 +1427,7 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, sliceName);
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+ addPropertyParams(message, params);
ShardRequest sreq = new ShardRequest();
params.set("qt", adminPath);
@@ -1435,6 +1462,37 @@ public class OverseerCollectionProcessor
}
}
+ private String createConfNode(String coll, ZkNodeProps message) throws KeeperException, InterruptedException {
+ String configName = message.getStr(OverseerCollectionProcessor.COLL_CONF);
+ if(configName == null){
+ // if there is only one conf, use that
+ List<String> configNames=null;
+ try {
+ configNames = zkStateReader.getZkClient().getChildren(ZkController.CONFIGS_ZKNODE, null, true);
+ if (configNames != null && configNames.size() == 1) {
+ configName = configNames.get(0);
+ // no config set named, but there is only 1 - use it
+ log.info("Only one config set found in zk - using it:" + configName);
+ }
+ } catch (KeeperException.NoNodeException e) {
+
+ }
+
+ }
+
+ if(configName!= null){
+ log.info("creating collections conf node {} ",ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll);
+ zkStateReader.getZkClient().makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll,
+ ZkStateReader.toJSON(ZkNodeProps.makeMap(ZkController.CONFIGNAME_PROP,configName)),true );
+
+ } else {
+ String msg = "Could not obtain config name";
+ log.warn(msg);
+ }
+ return configName;
+
+ }
+
private void collectionCmd(ClusterState clusterState, ZkNodeProps message, ModifiableSolrParams params, NamedList results, String stateMatcher) {
log.info("Executing Collection Cmd : " + params);
String collectionName = message.getStr("name");
Modified: lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Thu Dec 19 17:48:47 2013
@@ -19,12 +19,14 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.store.Directory;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
@@ -41,6 +43,7 @@ import org.apache.solr.common.params.Mod
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.RequestHandlers.LazyRequestHandlerWrapper;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.ReplicationHandler;
@@ -161,6 +164,7 @@ public class RecoveryStrategy extends Th
RefCounted<SolrIndexSearcher> searchHolder = core
.getNewestSearcher(false);
SolrIndexSearcher searcher = searchHolder.get();
+ Directory dir = core.getDirectoryFactory().get(core.getIndexDir(), DirContext.META_DATA, null);
try {
log.debug(core.getCoreDescriptor().getCoreContainer()
.getZkController().getNodeName()
@@ -170,8 +174,12 @@ public class RecoveryStrategy extends Th
+ leaderUrl
+ " gen:"
+ core.getDeletionPolicy().getLatestCommit().getGeneration()
- + " data:" + core.getDataDir());
+ + " data:" + core.getDataDir()
+ + " index:" + core.getIndexDir()
+ + " newIndex:" + core.getNewIndexDir()
+ + " files:" + Arrays.asList(dir.listAll()));
} finally {
+ core.getDirectoryFactory().release(dir);
searchHolder.decref();
}
} catch (Exception e) {
@@ -186,7 +194,6 @@ public class RecoveryStrategy extends Th
HttpSolrServer server = new HttpSolrServer(leaderUrl);
try {
server.setConnectionTimeout(30000);
- server.setSoTimeout(60000);
UpdateRequest ureq = new UpdateRequest();
ureq.setParams(new ModifiableSolrParams());
ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
@@ -202,8 +209,7 @@ public class RecoveryStrategy extends Th
throws SolrServerException, IOException {
HttpSolrServer server = new HttpSolrServer(leaderBaseUrl);
try {
- server.setConnectionTimeout(45000);
- server.setSoTimeout(120000);
+ server.setConnectionTimeout(30000);
WaitForState prepCmd = new WaitForState();
prepCmd.setCoreName(leaderCoreName);
prepCmd.setNodeName(zkController.getNodeName());
Modified: lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java?rev=1552377&r1=1552376&r2=1552377&view=diff
==============================================================================
--- lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (original)
+++ lucene/dev/branches/lucene5339/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java Thu Dec 19 17:48:47 2013
@@ -20,22 +20,19 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
@@ -43,8 +40,12 @@ import org.apache.solr.handler.component
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.request.SolrRequestInfo;
+import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.PeerSync;
-import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.update.UpdateShardHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,27 +55,18 @@ public class SyncStrategy {
private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
private final ShardHandler shardHandler;
-
- private ThreadPoolExecutor recoveryCmdExecutor = new ThreadPoolExecutor(
- 0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(), new DefaultSolrThreadFactory(
- "recoveryCmdExecutor"));
private volatile boolean isClosed;
private final HttpClient client;
- {
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
- params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 20);
- params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 15000);
- params.set(HttpClientUtil.PROP_SO_TIMEOUT, 60000);
- params.set(HttpClientUtil.PROP_USE_RETRY, false);
- client = HttpClientUtil.createClient(params);
- }
+
+ private final ExecutorService updateExecutor;
- public SyncStrategy() {
+ public SyncStrategy(UpdateShardHandler updateShardHandler) {
+ client = updateShardHandler.getHttpClient();
+
shardHandler = new HttpShardHandlerFactory().getShardHandler(client);
+ updateExecutor = updateShardHandler.getUpdateExecutor();
}
private static class ShardCoreRequest extends ShardRequest {
@@ -87,17 +79,26 @@ public class SyncStrategy {
if (SKIP_AUTO_RECOVERY) {
return true;
}
- if (isClosed) {
- log.warn("Closed, skipping sync up.");
- return false;
- }
- log.info("Sync replicas to " + ZkCoreNodeProps.getCoreUrl(leaderProps));
+ boolean success;
+ SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
+ SolrQueryResponse rsp = new SolrQueryResponse();
+ SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp));
+ try {
+ if (isClosed) {
+ log.warn("Closed, skipping sync up.");
+ return false;
+ }
+ log.info("Sync replicas to " + ZkCoreNodeProps.getCoreUrl(leaderProps));
+
+ if (core.getUpdateHandler().getUpdateLog() == null) {
+ log.error("No UpdateLog found - cannot sync");
+ return false;
+ }
- if (core.getUpdateHandler().getUpdateLog() == null) {
- log.error("No UpdateLog found - cannot sync");
- return false;
+ success = syncReplicas(zkController, core, leaderProps);
+ } finally {
+ SolrRequestInfo.clearRequestInfo();
}
- boolean success = syncReplicas(zkController, core, leaderProps);
return success;
}
@@ -257,16 +258,6 @@ public class SyncStrategy {
public void close() {
this.isClosed = true;
- try {
- client.getConnectionManager().shutdown();
- } catch (Throwable e) {
- SolrException.log(log, e);
- }
- try {
- ExecutorUtil.shutdownNowAndAwaitTermination(recoveryCmdExecutor);
- } catch (Throwable e) {
- SolrException.log(log, e);
- }
}
private void requestRecovery(final ZkNodeProps leaderProps, final String baseUrl, final String coreName) throws SolrServerException, IOException {
@@ -282,8 +273,8 @@ public class SyncStrategy {
HttpSolrServer server = new HttpSolrServer(baseUrl, client);
try {
- server.setConnectionTimeout(15000);
- server.setSoTimeout(60000);
+ server.setConnectionTimeout(30000);
+ server.setSoTimeout(120000);
server.request(recoverRequestCmd);
} catch (Throwable t) {
SolrException.log(log, ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Could not tell a replica to recover", t);
@@ -292,7 +283,7 @@ public class SyncStrategy {
}
}
};
- recoveryCmdExecutor.execute(thread);
+ updateExecutor.execute(thread);
}
public static ModifiableSolrParams params(String... params) {