You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2012/08/13 15:53:27 UTC
svn commit: r1372423 [40/45] - in /lucene/dev/branches/LUCENE-2878: ./
dev-tools/ dev-tools/eclipse/ dev-tools/idea/.idea/libraries/
dev-tools/maven/ dev-tools/maven/lucene/
dev-tools/maven/lucene/analysis/common/
dev-tools/maven/lucene/analysis/icu/ d...
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java Mon Aug 13 13:52:46 2012
@@ -22,6 +22,8 @@ import org.apache.solr.core.SolrCore;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.util.SystemIdResolver;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.XMLErrorLogger;
import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
import org.apache.solr.handler.dataimport.config.ConfigParseUtil;
@@ -41,9 +43,12 @@ import org.apache.commons.io.IOUtils;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
+
+import java.io.IOException;
import java.io.StringReader;
import java.text.SimpleDateFormat;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
@@ -67,14 +72,14 @@ public class DataImporter {
private DIHConfiguration config;
private Date indexStartTime;
private Properties store = new Properties();
- private Map<String, Properties> dataSourceProps = new HashMap<String, Properties>();
+ private Map<String, Map<String,String>> requestLevelDataSourceProps = new HashMap<String, Map<String,String>>();
private IndexSchema schema;
public DocBuilder docBuilder;
public DocBuilder.Statistics cumulativeStatistics = new DocBuilder.Statistics();
private SolrCore core;
+ private Map<String, Object> coreScopeSession = new ConcurrentHashMap<String,Object>();
private DIHPropertiesWriter propWriter;
private ReentrantLock importLock = new ReentrantLock();
- private final Map<String , Object> coreScopeSession;
private boolean isDeltaImportSupported = false;
private final String handlerName;
private Map<String, SchemaField> lowerNameVsSchemaField = new HashMap<String, SchemaField>();
@@ -83,12 +88,19 @@ public class DataImporter {
* Only for testing purposes
*/
DataImporter() {
- coreScopeSession = new HashMap<String, Object>();
createPropertyWriter();
propWriter.init(this);
this.handlerName = "dataimport" ;
}
-
+
+ DataImporter(SolrCore core, String handlerName) {
+ this.handlerName = handlerName;
+ this.core = core;
+ this.schema = core.getSchema();
+ loadSchemaFieldMap();
+ createPropertyWriter();
+ }
+
private void createPropertyWriter() {
if (this.core == null
|| !this.core.getCoreDescriptor().getCoreContainer().isZooKeeperAware()) {
@@ -99,27 +111,58 @@ public class DataImporter {
propWriter.init(this);
}
- DataImporter(InputSource dataConfig, SolrCore core, Map<String, Properties> ds, Map<String, Object> session, String handlerName) {
- this.handlerName = handlerName;
- if (dataConfig == null) {
- throw new DataImportHandlerException(SEVERE, "Configuration not found");
- }
- this.core = core;
- this.schema = core.getSchema();
- loadSchemaFieldMap();
- createPropertyWriter();
-
- dataSourceProps = ds;
- if (session == null)
- session = new HashMap<String, Object>();
- coreScopeSession = session;
- loadDataConfig(dataConfig);
-
- for (Entity e : config.getEntities()) {
- if (e.getAllAttributes().containsKey(SqlEntityProcessor.DELTA_QUERY)) {
- isDeltaImportSupported = true;
- break;
+
+ boolean maybeReloadConfiguration(RequestInfo params,
+ NamedList<?> defaultParams) throws IOException {
+ if (importLock.tryLock()) {
+ boolean success = false;
+ try {
+ String dataConfigText = params.getDataConfig();
+ String dataconfigFile = (String) params.getConfigFile();
+ InputSource is = null;
+ if(dataConfigText!=null && dataConfigText.length()>0) {
+ is = new InputSource(new StringReader(dataConfigText));
+ } else if(dataconfigFile!=null) {
+ is = new InputSource(core.getResourceLoader().openResource(dataconfigFile));
+ is.setSystemId(SystemIdResolver.createSystemIdFromResourceName(dataconfigFile));
+ LOG.info("Loading DIH Configuration: " + dataconfigFile);
+ }
+ if(is!=null) {
+ loadDataConfig(is);
+ success = true;
+ }
+
+ Map<String,Map<String,String>> dsProps = new HashMap<String,Map<String,String>>();
+ if(defaultParams!=null) {
+ int position = 0;
+ while (position < defaultParams.size()) {
+ if (defaultParams.getName(position) == null) {
+ break;
+ }
+ String name = defaultParams.getName(position);
+ if (name.equals("datasource")) {
+ success = true;
+ NamedList dsConfig = (NamedList) defaultParams.getVal(position);
+ LOG.info("Getting configuration for Global Datasource...");
+ Map<String,String> props = new HashMap<String,String>();
+ for (int i = 0; i < dsConfig.size(); i++) {
+ props.put(dsConfig.getName(i), dsConfig.getVal(i).toString());
+ }
+ LOG.info("Adding properties to datasource: " + props);
+ dsProps.put((String) dsConfig.get("name"), props);
+ }
+ position++;
+ }
+ }
+ requestLevelDataSourceProps = Collections.unmodifiableMap(dsProps);
+ } catch(IOException ioe) {
+ throw ioe;
+ } finally {
+ importLock.unlock();
}
+ return success;
+ } else {
+ return false;
}
}
@@ -188,7 +231,13 @@ public class DataImporter {
LOG.info("Data Configuration loaded successfully");
} catch (Exception e) {
throw new DataImportHandlerException(SEVERE,
- "Exception occurred while initializing context", e);
+ "Data Config problem: " + e.getMessage(), e);
+ }
+ for (Entity e : config.getEntities()) {
+ if (e.getAllAttributes().containsKey(SqlEntityProcessor.DELTA_QUERY)) {
+ isDeltaImportSupported = true;
+ break;
+ }
}
}
@@ -196,7 +245,7 @@ public class DataImporter {
DIHConfiguration config;
List<Map<String, String >> functions = new ArrayList<Map<String ,String>>();
Script script = null;
- Map<String, Properties> dataSources = new HashMap<String, Properties>();
+ Map<String, Map<String,String>> dataSources = new HashMap<String, Map<String,String>>();
NodeList dataConfigTags = xmlDocument.getElementsByTagName("dataConfig");
if(dataConfigTags == null || dataConfigTags.getLength() == 0) {
@@ -232,16 +281,16 @@ public class DataImporter {
List<Element> dataSourceTags = ConfigParseUtil.getChildNodes(e, DATA_SRC);
if (!dataSourceTags.isEmpty()) {
for (Element element : dataSourceTags) {
- Properties p = new Properties();
+ Map<String,String> p = new HashMap<String,String>();
HashMap<String, String> attrs = ConfigParseUtil.getAllAttributes(element);
for (Map.Entry<String, String> entry : attrs.entrySet()) {
- p.setProperty(entry.getKey(), entry.getValue());
+ p.put(entry.getKey(), entry.getValue());
}
- dataSources.put(p.getProperty("name"), p);
+ dataSources.put(p.get("name"), p);
}
}
if(dataSources.get(null) == null){
- for (Properties properties : dataSources.values()) {
+ for (Map<String,String> properties : dataSources.values()) {
dataSources.put(null,properties);
break;
}
@@ -270,17 +319,17 @@ public class DataImporter {
}
DataSource getDataSourceInstance(Entity key, String name, Context ctx) {
- Properties p = dataSourceProps.get(name);
+ Map<String,String> p = requestLevelDataSourceProps.get(name);
if (p == null)
p = config.getDataSources().get(name);
if (p == null)
- p = dataSourceProps.get(null);// for default data source
+ p = requestLevelDataSourceProps.get(null);// for default data source
if (p == null)
p = config.getDataSources().get(null);
if (p == null)
throw new DataImportHandlerException(SEVERE,
"No dataSource :" + name + " available for entity :" + key.getName());
- String type = p.getProperty(TYPE);
+ String type = p.get(TYPE);
DataSource dataSrc = null;
if (type == null) {
dataSrc = new JdbcDataSource();
@@ -458,6 +507,8 @@ public class DataImporter {
public static final String DEBUG_NOT_ENABLED = "Debug not enabled. Add a tag <str name=\"enableDebug\">true</str> in solrconfig.xml";
public static final String CONFIG_RELOADED = "Configuration Re-loaded sucessfully";
+
+ public static final String CONFIG_NOT_RELOADED = "Configuration NOT Re-loaded...Data Importer is busy.";
public static final String TOTAL_DOC_PROCESSED = "Total Documents Processed";
@@ -476,13 +527,16 @@ public class DataImporter {
return schema;
}
- Map<String, Object> getCoreScopeSession() {
- return coreScopeSession;
- }
-
SolrCore getCore() {
return core;
}
+
+ void putToCoreScopeSession(String key, Object val) {
+ coreScopeSession.put(key, val);
+ }
+ Object getFromCoreScopeSession(String key) {
+ return coreScopeSession.get(key);
+ }
public static final String COLUMN = "column";
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/RequestInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/RequestInfo.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/RequestInfo.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/RequestInfo.java Mon Aug 13 13:52:46 2012
@@ -36,6 +36,7 @@ public class RequestInfo {
private final boolean clean;
private final List<String> entitiesToRun;
private final Map<String,Object> rawParams;
+ private final String configFile;
private final String dataConfig;
//TODO: find a different home for these two...
@@ -98,7 +99,8 @@ public class RequestInfo {
} else {
entitiesToRun = null;
}
-
+ String configFileParam = (String) requestParams.get("config");
+ configFile = configFileParam;
String dataConfigParam = (String) requestParams.get("dataConfig");
if (dataConfigParam != null && dataConfigParam.trim().length() == 0) {
// Empty data-config param is not valid, change it to null
@@ -161,4 +163,8 @@ public class RequestInfo {
public DebugInfo getDebugInfo() {
return debugInfo;
}
+
+ public String getConfigFile() {
+ return configFile;
+ }
}
\ No newline at end of file
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/DIHConfiguration.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/DIHConfiguration.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/DIHConfiguration.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/DIHConfiguration.java Mon Aug 13 13:52:46 2012
@@ -4,7 +4,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import org.apache.solr.handler.dataimport.DataImporter;
import org.w3c.dom.Element;
@@ -49,8 +48,8 @@ public class DIHConfiguration {
private final String onImportEnd;
private final List<Map<String, String>> functions;
private final Script script;
- private final Map<String, Properties> dataSources;
- public DIHConfiguration(Element element, DataImporter di, List<Map<String, String>> functions, Script script, Map<String, Properties> dataSources) {
+ private final Map<String, Map<String,String>> dataSources;
+ public DIHConfiguration(Element element, DataImporter di, List<Map<String, String>> functions, Script script, Map<String, Map<String,String>> dataSources) {
this.deleteQuery = ConfigParseUtil.getStringAttribute(element, "deleteQuery", null);
this.onImportStart = ConfigParseUtil.getStringAttribute(element, "onImportStart", null);
this.onImportEnd = ConfigParseUtil.getStringAttribute(element, "onImportEnd", null);
@@ -90,7 +89,7 @@ public class DIHConfiguration {
public List<Map<String,String>> getFunctions() {
return functions;
}
- public Map<String,Properties> getDataSources() {
+ public Map<String,Map<String,String>> getDataSources() {
return dataSources;
}
public Script getScript() {
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test-files/dih/solr/collection1/conf/dataimport-solrconfig-end-to-end.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test-files/dih/solr/collection1/conf/dataimport-solrconfig-end-to-end.xml?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test-files/dih/solr/collection1/conf/dataimport-solrconfig-end-to-end.xml (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test-files/dih/solr/collection1/conf/dataimport-solrconfig-end-to-end.xml Mon Aug 13 13:52:46 2012
@@ -31,11 +31,7 @@
<str name="echoParams">explicit</str>
</lst>
</requestHandler>
- <requestHandler name="/dataimport-end-to-end" class="org.apache.solr.handler.dataimport.DataImportHandler">
- <lst name="defaults">
- <str name="config">data-config-end-to-end.xml</str>
- </lst>
- </requestHandler>
+ <requestHandler name="/dataimport-end-to-end" class="org.apache.solr.handler.dataimport.DataImportHandler" />
<requestHandler name="/search" class="org.apache.solr.handler.component.SearchHandler">
<lst name="defaults">
<str name="echoParams">explicit</str>
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestDIHEndToEnd.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestDIHEndToEnd.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestDIHEndToEnd.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestDIHEndToEnd.java Mon Aug 13 13:52:46 2012
@@ -31,7 +31,8 @@ public class TestDIHEndToEnd extends Abs
}
@Test
public void testEndToEnd() throws Exception {
- LocalSolrQueryRequest request = lrf.makeRequest("command", "full-import",
+ LocalSolrQueryRequest request = lrf.makeRequest(
+ "command", "full-import", "config", "data-config-end-to-end.xml",
"clean", "true", "commit", "true", "synchronous", "true", "indent", "true");
h.query("/dataimport-end-to-end", request);
assertQ(req("*:*"), "//*[@numFound='20']");
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessor.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessor.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessor.java Mon Aug 13 13:52:46 2012
@@ -17,6 +17,7 @@
package org.apache.solr.handler.dataimport;
import org.junit.Test;
+import org.junit.Ignore;
import java.util.*;
@@ -28,6 +29,7 @@ import java.util.*;
*
* @since solr 1.3
*/
+@Ignore("FIXME: I fail so often it makes me ill!")
public class TestSqlEntityProcessor extends AbstractDataImportHandlerTestCase {
private static ThreadLocal<Integer> local = new ThreadLocal<Integer>();
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessor2.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessor2.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessor2.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessor2.java Mon Aug 13 13:52:46 2012
@@ -35,6 +35,7 @@ import java.text.ParseException;
*
* @since solr 1.3
*/
+@Ignore("FIXME: I fail so often it makes me ill!")
public class TestSqlEntityProcessor2 extends AbstractDataImportHandlerTestCase {
@BeforeClass
public static void beforeClass() throws Exception {
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta.java Mon Aug 13 13:52:46 2012
@@ -19,6 +19,7 @@ package org.apache.solr.handler.dataimpo
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
@@ -35,6 +36,7 @@ import java.util.List;
*
* @since solr 1.3
*/
+@Ignore("FIXME: I fail so often it makes me ill!")
public class TestSqlEntityProcessorDelta extends AbstractDataImportHandlerTestCase {
private static final String FULLIMPORT_QUERY = "select * from x";
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta2.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta2.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta2.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta2.java Mon Aug 13 13:52:46 2012
@@ -18,6 +18,7 @@ package org.apache.solr.handler.dataimpo
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
@@ -33,6 +34,7 @@ import java.util.List;
*
* @since solr 1.3
*/
+@Ignore("FIXME: I fail so often it makes me ill!")
public class TestSqlEntityProcessorDelta2 extends AbstractDataImportHandlerTestCase {
private static final String FULLIMPORT_QUERY = "select * from x";
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta3.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta3.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta3.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta3.java Mon Aug 13 13:52:46 2012
@@ -18,12 +18,14 @@ package org.apache.solr.handler.dataimpo
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+@Ignore("FIXME: I fail so often it makes me ill!")
public class TestSqlEntityProcessorDelta3 extends AbstractDataImportHandlerTestCase {
private static final String P_FULLIMPORT_QUERY = "select * from parent";
private static final String P_DELTA_QUERY = "select parent_id from parent where last_modified > NOW";
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDeltaPrefixedPk.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDeltaPrefixedPk.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDeltaPrefixedPk.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDeltaPrefixedPk.java Mon Aug 13 13:52:46 2012
@@ -18,6 +18,7 @@ package org.apache.solr.handler.dataimpo
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
@@ -34,6 +35,7 @@ import java.util.logging.*;
*
* @since solr 3.1
*/
+@Ignore("FIXME: I fail so often it makes me ill!")
public class TestSqlEntityProcessorDeltaPrefixedPk extends AbstractDataImportHandlerTestCase {
private static final String FULLIMPORT_QUERY = "select * from x";
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/extraction/ivy.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/extraction/ivy.xml?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/extraction/ivy.xml (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/extraction/ivy.xml Mon Aug 13 13:52:46 2012
@@ -43,7 +43,6 @@
<dependency org="org.apache.xmlbeans" name="xmlbeans" rev="2.3.0" transitive="false"/>
<dependency org="dom4j" name="dom4j" rev="1.6.1" transitive="false"/>
<dependency org="org.ccil.cowan.tagsoup" name="tagsoup" rev="1.2.1" transitive="false"/>
- <dependency org="asm" name="asm" rev="3.1" transitive="false"/>
<dependency org="com.googlecode.mp4parser" name="isoparser" rev="1.0-beta-5" transitive="false"/>
<dependency org="net.sf.scannotation" name="scannotation" rev="1.0.2" transitive="false"/>
<dependency org="javassist" name="javassist" rev="3.6.0.GA" transitive="false"/>
@@ -52,8 +51,8 @@
<dependency org="rome" name="rome" rev="0.9" transitive="false"/>
<dependency org="jdom" name="jdom" rev="1.0" transitive="false"/>
<!-- Other ExtracingRequestHandler dependencies -->
- <dependency org="com.ibm.icu" name="icu4j" rev="4.8.1.1" transitive="false"/>
- <dependency org="xerces" name="xercesImpl" rev="2.8.1" transitive="false"/>
+ <dependency org="com.ibm.icu" name="icu4j" rev="49.1" transitive="false"/>
+ <dependency org="xerces" name="xercesImpl" rev="2.9.1" transitive="false"/>
<exclude org="*" ext="*" matcher="regexp" type="${ivy.exclude.types}"/>
</dependencies>
</ivy-module>
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/langid/src/java/org/apache/solr/update/processor/LanguageIdentifierUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/langid/src/java/org/apache/solr/update/processor/LanguageIdentifierUpdateProcessor.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/langid/src/java/org/apache/solr/update/processor/LanguageIdentifierUpdateProcessor.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/langid/src/java/org/apache/solr/update/processor/LanguageIdentifierUpdateProcessor.java Mon Aug 13 13:52:46 2012
@@ -25,6 +25,7 @@ import org.apache.solr.common.params.Sol
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.AddUpdateCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,7 +98,8 @@ public abstract class LanguageIdentifier
}
langField = params.get(LANG_FIELD, DOCID_LANGFIELD_DEFAULT);
langsField = params.get(LANGS_FIELD, DOCID_LANGSFIELD_DEFAULT);
- docIdField = params.get(DOCID_PARAM, DOCID_FIELD_DEFAULT);
+ SchemaField uniqueKeyField = schema.getUniqueKeyField();
+ docIdField = params.get(DOCID_PARAM, uniqueKeyField == null ? DOCID_FIELD_DEFAULT : uniqueKeyField.getName());
fallbackValue = params.get(FALLBACK);
if(params.get(FALLBACK_FIELDS, "").length() > 0) {
fallbackFields = params.get(FALLBACK_FIELDS).split(",");
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/langid/src/test/org/apache/solr/update/processor/LangDetectLanguageIdentifierUpdateProcessorFactoryTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/langid/src/test/org/apache/solr/update/processor/LangDetectLanguageIdentifierUpdateProcessorFactoryTest.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/langid/src/test/org/apache/solr/update/processor/LangDetectLanguageIdentifierUpdateProcessorFactoryTest.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/langid/src/test/org/apache/solr/update/processor/LangDetectLanguageIdentifierUpdateProcessorFactoryTest.java Mon Aug 13 13:52:46 2012
@@ -24,7 +24,7 @@ import org.junit.Test;
public class LangDetectLanguageIdentifierUpdateProcessorFactoryTest extends LanguageIdentifierUpdateProcessorFactoryTestCase {
@Override
protected LanguageIdentifierUpdateProcessor createLangIdProcessor(ModifiableSolrParams parameters) throws Exception {
- return new LangDetectLanguageIdentifierUpdateProcessor(_parser.buildRequestFrom(null, parameters, null), resp, null);
+ return new LangDetectLanguageIdentifierUpdateProcessor(_parser.buildRequestFrom(h.getCore(), parameters, null), resp, null);
}
// this one actually works better it seems with short docs
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/langid/src/test/org/apache/solr/update/processor/TikaLanguageIdentifierUpdateProcessorFactoryTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/langid/src/test/org/apache/solr/update/processor/TikaLanguageIdentifierUpdateProcessorFactoryTest.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/langid/src/test/org/apache/solr/update/processor/TikaLanguageIdentifierUpdateProcessorFactoryTest.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/langid/src/test/org/apache/solr/update/processor/TikaLanguageIdentifierUpdateProcessorFactoryTest.java Mon Aug 13 13:52:46 2012
@@ -22,6 +22,6 @@ import org.apache.solr.common.params.Mod
public class TikaLanguageIdentifierUpdateProcessorFactoryTest extends LanguageIdentifierUpdateProcessorFactoryTestCase {
@Override
protected LanguageIdentifierUpdateProcessor createLangIdProcessor(ModifiableSolrParams parameters) throws Exception {
- return new TikaLanguageIdentifierUpdateProcessor(_parser.buildRequestFrom(null, parameters, null), resp, null);
+ return new TikaLanguageIdentifierUpdateProcessor(_parser.buildRequestFrom(h.getCore(), parameters, null), resp, null);
}
}
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/uima/README.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/uima/README.txt?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/uima/README.txt (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/uima/README.txt Mon Aug 13 13:52:46 2012
@@ -1,3 +1,15 @@
+Apache Solr UIMA Metadata Extraction Library
+
+Introduction
+------------
+This module is intended to be used both as an UpdateRequestProcessor while indexing documents and as a set of tokenizer/filters
+to be configured inside the schema.xml for use during analysis phase.
+UIMAUpdateRequestProcessor purpose is to provide additional on the fly automatically generated fields to the Solr index.
+Such fields could be language, concepts, keywords, sentences, named entities, etc.
+UIMA based tokenizers/filters can be used either inside plain Lucene or as index/query analyzers to be defined
+inside the schema.xml of a Solr core to create/filter tokens using specific UIMA annotations.
+
+
Getting Started
---------------
To start using Solr UIMA Metadata Extraction Library you should go through the following configuration steps:
@@ -6,6 +18,7 @@ To start using Solr UIMA Metadata Extrac
or set <lib/> tags in solrconfig.xml appropriately to point those jar files.
<lib dir="../../contrib/uima/lib" />
+ <lib dir="../../contrib/uima/lucene-libs" />
<lib dir="../../dist/" regex="apache-solr-uima-\d.*\.jar" />
2. modify your schema.xml adding the fields you want to be hold metadata specifying proper values for type, indexed, stored and multiValued options:
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/test-files/uima/uima-tokenizers-schema.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/test-files/uima/uima-tokenizers-schema.xml?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/test-files/uima/uima-tokenizers-schema.xml (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/uima/src/test-files/uima/uima-tokenizers-schema.xml Mon Aug 13 13:52:46 2012
@@ -299,14 +299,14 @@
<fieldType name="uima_sentences" class="solr.TextField" positionIncrementGap="100">
<analyzer>
- <tokenizer class="org.apache.solr.uima.analysis.UIMAAnnotationsTokenizerFactory"
+ <tokenizer class="solr.UIMAAnnotationsTokenizerFactory"
descriptorPath="/uima/AggregateSentenceAE.xml" tokenType="org.apache.uima.SentenceAnnotation"/>
</analyzer>
</fieldType>
<fieldType name="uima_nouns" class="solr.TextField" positionIncrementGap="100">
<analyzer>
- <tokenizer class="org.apache.solr.uima.analysis.UIMATypeAwareAnnotationsTokenizerFactory"
+ <tokenizer class="solr.UIMATypeAwareAnnotationsTokenizerFactory"
descriptorPath="/uima/AggregateSentenceAE.xml" tokenType="org.apache.uima.TokenAnnotation"
featurePath="posTag"/>
<filter class="solr.TypeTokenFilterFactory" types="uima/stoptypes.txt" />
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/velocity/src/java/org/apache/solr/response/SolrVelocityResourceLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/velocity/src/java/org/apache/solr/response/SolrVelocityResourceLoader.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/velocity/src/java/org/apache/solr/response/SolrVelocityResourceLoader.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/velocity/src/java/org/apache/solr/response/SolrVelocityResourceLoader.java Mon Aug 13 13:52:46 2012
@@ -22,9 +22,12 @@ import org.apache.velocity.exception.Res
import org.apache.commons.collections.ExtendedProperties;
import org.apache.solr.core.SolrResourceLoader;
+import java.io.IOException;
import java.io.InputStream;
-// TODO: the name of this class seems ridiculous
+/**
+ * Velocity resource loader wrapper around Solr resource loader
+ */
public class SolrVelocityResourceLoader extends ResourceLoader {
private SolrResourceLoader loader;
@@ -39,7 +42,11 @@ public class SolrVelocityResourceLoader
@Override
public InputStream getResourceStream(String template_name) throws ResourceNotFoundException {
- return loader.openResource(template_name);
+ try {
+ return loader.openResource("velocity/" + template_name);
+ } catch (IOException ioe) {
+ throw new ResourceNotFoundException(ioe);
+ }
}
@Override
Modified: lucene/dev/branches/LUCENE-2878/solr/contrib/velocity/src/java/org/apache/solr/response/VelocityResponseWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/contrib/velocity/src/java/org/apache/solr/response/VelocityResponseWriter.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/contrib/velocity/src/java/org/apache/solr/response/VelocityResponseWriter.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/contrib/velocity/src/java/org/apache/solr/response/VelocityResponseWriter.java Mon Aug 13 13:52:46 2012
@@ -20,6 +20,7 @@ package org.apache.solr.response;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.SolrResponseBase;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.velocity.Template;
@@ -29,6 +30,8 @@ import org.apache.velocity.runtime.Runti
import org.apache.velocity.tools.generic.*;
import java.io.*;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
public class VelocityResponseWriter implements QueryResponseWriter {
@@ -80,6 +83,64 @@ public class VelocityResponseWriter impl
context.put("engine", engine); // for $engine.resourceExists(...)
+ // Mimetype to extension map for detecting file type and show icon
+ // List of types match the icons in /solr/img/filetypes
+ Map<String, String> mimeToExt = new HashMap<String, String>() {{
+ put("application/x-7z-compressed", "7z");
+ put("application/postscript", "ai");
+ put("application/pgp-signature", "asc");
+ put("application/octet-stream", "bin");
+ put("application/x-bzip2", "bz2");
+ put("text/x-c", "c");
+ put("application/vnd.ms-htmlhelp", "chm");
+ put("application/java-vm", "class");
+ put("text/css", "css");
+ put("text/csv", "csv");
+ put("application/x-debian-package", "deb");
+ put("application/msword", "doc");
+ put("message/rfc822", "eml");
+ put("image/gif", "gif");
+ put("application/winhlp", "hlp");
+ put("text/html", "html");
+ put("application/java-archive", "jar");
+ put("text/x-java-source", "java");
+ put("image/jpeg", "jpeg");
+ put("application/javascript", "js");
+ put("application/vnd.oasis.opendocument.chart", "odc");
+ put("application/vnd.oasis.opendocument.formula", "odf");
+ put("application/vnd.oasis.opendocument.graphics", "odg");
+ put("application/vnd.oasis.opendocument.image", "odi");
+ put("application/vnd.oasis.opendocument.presentation", "odp");
+ put("application/vnd.oasis.opendocument.spreadsheet", "ods");
+ put("application/vnd.oasis.opendocument.text", "odt");
+ put("application/pdf", "pdf");
+ put("application/pgp-encrypted", "pgp");
+ put("image/png", "png");
+ put("application/vnd.ms-powerpoint", "ppt");
+ put("audio/x-pn-realaudio", "ram");
+ put("application/x-rar-compressed", "rar");
+ put("application/vnd.rn-realmedia", "rm");
+ put("application/rtf", "rtf");
+ put("application/x-shockwave-flash", "swf");
+ put("application/vnd.sun.xml.calc", "sxc");
+ put("application/vnd.sun.xml.draw", "sxd");
+ put("application/vnd.sun.xml.impress", "sxi");
+ put("application/vnd.sun.xml.writer", "sxw");
+ put("application/x-tar", "tar");
+ put("application/x-tex", "tex");
+ put("text/plain", "txt");
+ put("text/x-vcard", "vcf");
+ put("application/vnd.visio", "vsd");
+ put("audio/x-wav", "wav");
+ put("audio/x-ms-wma", "wma");
+ put("video/x-ms-wmv", "wmv");
+ put("application/vnd.ms-excel", "xls");
+ put("application/xml", "xml");
+ put("application/x-xpinstall", "xpi");
+ put("application/zip", "zip");
+ }};
+ context.put("mimeToExt", mimeToExt);
+
String layout_template = request.getParams().get("v.layout");
String json_wrapper = request.getParams().get("v.json");
boolean wrap_response = (layout_template != null) || (json_wrapper != null);
@@ -113,19 +174,32 @@ public class VelocityResponseWriter impl
private VelocityEngine getEngine(SolrQueryRequest request) {
VelocityEngine engine = new VelocityEngine();
- String template_root = request.getParams().get("v.base_dir");
- File baseDir = new File(request.getCore().getResourceLoader().getConfigDir(), "velocity");
- if (template_root != null) {
- baseDir = new File(template_root);
- }
- engine.setProperty(RuntimeConstants.FILE_RESOURCE_LOADER_PATH, baseDir.getAbsolutePath());
+
engine.setProperty("params.resource.loader.instance", new SolrParamResourceLoader(request));
SolrVelocityResourceLoader resourceLoader =
new SolrVelocityResourceLoader(request.getCore().getSolrConfig().getResourceLoader());
engine.setProperty("solr.resource.loader.instance", resourceLoader);
+ File fileResourceLoaderBaseDir = null;
+ try {
+ String template_root = request.getParams().get("v.base_dir");
+ fileResourceLoaderBaseDir = new File(request.getCore().getResourceLoader().getConfigDir(), "velocity");
+ if (template_root != null) {
+ fileResourceLoaderBaseDir = new File(template_root);
+ }
+ } catch (SolrException e) {
+ // no worries... probably in ZooKeeper mode and getConfigDir() isn't available, so we'll just ignore omit
+ // the file system resource loader
+ }
+
+ if (fileResourceLoaderBaseDir != null) {
+ engine.setProperty(RuntimeConstants.FILE_RESOURCE_LOADER_PATH, fileResourceLoaderBaseDir.getAbsolutePath());
+ engine.setProperty(RuntimeConstants.RESOURCE_LOADER, "params,file,solr");
+ } else {
+ engine.setProperty(RuntimeConstants.RESOURCE_LOADER, "params,solr");
+ }
+
// TODO: Externalize Velocity properties
- engine.setProperty(RuntimeConstants.RESOURCE_LOADER, "params,file,solr");
String propFile = request.getParams().get("v.properties");
try {
if (propFile == null)
Modified: lucene/dev/branches/LUCENE-2878/solr/core/build.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/build.xml?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/build.xml (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/build.xml Mon Aug 13 13:52:46 2012
@@ -21,6 +21,8 @@
<!-- hackidty-hack-hack -->
<property name="ivy.retrieve.pattern" value="${common-solr.dir}/lib/[artifact]-[revision].[ext]"/>
+ <!-- we cannot sync because solr/core and solr/solrj share the same lib/... clean this up! -->
+ <property name="ivy.sync" value="false"/>
<!-- html file for testing -->
<property name="rat.excludes" value="**/htmlStripReaderTest.html"/>
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/SolrLogFormatter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/SolrLogFormatter.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/SolrLogFormatter.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/SolrLogFormatter.java Mon Aug 13 13:52:46 2012
@@ -250,10 +250,10 @@ sb.append("(group_name=").append(tg.getN
/*** Isn't core specific... prob better logged from zkController
if (info != null) {
- CloudState cloudState = zkController.getCloudState();
- if (info.cloudState != cloudState) {
+ ClusterState clusterState = zkController.getClusterState();
+ if (info.clusterState != clusterState) {
// something has changed in the matrix...
- sb.append(zkController.getBaseUrl() + " sees new CloudState:");
+ sb.append(zkController.getBaseUrl() + " sees new ClusterState:");
}
}
***/
@@ -263,7 +263,7 @@ sb.append("(group_name=").append(tg.getN
private Map<String,String> getCoreProps(ZkController zkController, SolrCore core) {
final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- ZkNodeProps props = zkController.getCloudState().getShardProps(collection, ZkStateReader.getCoreNodeName(zkController.getNodeName(), core.getName()));
+ ZkNodeProps props = zkController.getClusterState().getShardProps(collection, ZkStateReader.getCoreNodeName(zkController.getNodeName(), core.getName()));
if(props!=null) {
return props.getProperties();
}
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/analysis/LegacyHTMLStripCharFilterFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/analysis/LegacyHTMLStripCharFilterFactory.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/analysis/LegacyHTMLStripCharFilterFactory.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/analysis/LegacyHTMLStripCharFilterFactory.java Mon Aug 13 13:52:46 2012
@@ -20,6 +20,7 @@ package org.apache.solr.analysis;
import java.io.Reader;
+import org.apache.lucene.analysis.charfilter.HTMLStripCharFilterFactory;
import org.apache.lucene.analysis.util.CharFilterFactory;
/**
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/AssignShard.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/AssignShard.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/AssignShard.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/AssignShard.java Mon Aug 13 13:52:46 2012
@@ -24,7 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Slice;
public class AssignShard {
@@ -36,7 +36,7 @@ public class AssignShard {
* @param state
* @return the assigned shard id
*/
- public static String assignShard(String collection, CloudState state, Integer numShards) {
+ public static String assignShard(String collection, ClusterState state, Integer numShards) {
if (numShards == null) {
numShards = 1;
}
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Mon Aug 13 13:52:46 2012
@@ -23,12 +23,12 @@ import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
+import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,12 +42,12 @@ public class DistributedQueue {
private final String dir;
- private ZooKeeper zookeeper;
+ private SolrZkClient zookeeper;
private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
private final String prefix = "qn-";
- public DistributedQueue(ZooKeeper zookeeper, String dir, List<ACL> acl) {
+ public DistributedQueue(SolrZkClient zookeeper, String dir, List<ACL> acl) {
this.dir = dir;
if (acl != null) {
@@ -70,7 +70,7 @@ public class DistributedQueue {
List<String> childNames = null;
try {
- childNames = zookeeper.getChildren(dir, watcher);
+ childNames = zookeeper.getChildren(dir, watcher, true);
} catch (KeeperException.NoNodeException e) {
throw e;
}
@@ -124,7 +124,7 @@ public class DistributedQueue {
for (String headNode : orderedChildren.values()) {
if (headNode != null) {
try {
- return zookeeper.getData(dir + "/" + headNode, false, null);
+ return zookeeper.getData(dir + "/" + headNode, null, null, true);
} catch (KeeperException.NoNodeException e) {
// Another client removed the node first, try next
}
@@ -156,8 +156,8 @@ public class DistributedQueue {
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try {
- byte[] data = zookeeper.getData(path, false, null);
- zookeeper.delete(path, -1);
+ byte[] data = zookeeper.getData(path, null, null, true);
+ zookeeper.delete(path, -1, true);
return data;
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first.
@@ -202,7 +202,7 @@ public class DistributedQueue {
try {
orderedChildren = orderedChildren(childWatcher);
} catch (KeeperException.NoNodeException e) {
- zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
+ zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
continue;
}
if (orderedChildren.size() == 0) {
@@ -213,8 +213,8 @@ public class DistributedQueue {
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try {
- byte[] data = zookeeper.getData(path, false, null);
- zookeeper.delete(path, -1);
+ byte[] data = zookeeper.getData(path, null, null, true);
+ zookeeper.delete(path, -1, true);
return data;
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first.
@@ -234,11 +234,11 @@ public class DistributedQueue {
for (;;) {
try {
zookeeper.create(dir + "/" + prefix, data, acl,
- CreateMode.PERSISTENT_SEQUENTIAL);
+ CreateMode.PERSISTENT_SEQUENTIAL, true);
return true;
} catch (KeeperException.NoNodeException e) {
try {
- zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
+ zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
} catch (KeeperException.NodeExistsException ne) {
//someone created it
}
@@ -284,7 +284,7 @@ public class DistributedQueue {
try {
orderedChildren = orderedChildren(childWatcher);
} catch (KeeperException.NoNodeException e) {
- zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
+ zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
continue;
}
if (orderedChildren.size() == 0) {
@@ -295,7 +295,7 @@ public class DistributedQueue {
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try {
- byte[] data = zookeeper.getData(path, false, null);
+ byte[] data = zookeeper.getData(path, null, null, true);
return data;
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first.
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Mon Aug 13 13:52:46 2012
@@ -5,7 +5,7 @@ import java.util.Map;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@@ -13,10 +13,11 @@ import org.apache.solr.common.cloud.ZkNo
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
-import org.apache.solr.handler.component.ShardHandler;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -63,7 +64,7 @@ public abstract class ElectionContext {
}
class ShardLeaderElectionContextBase extends ElectionContext {
-
+ private static Logger log = LoggerFactory.getLogger(ShardLeaderElectionContextBase.class);
protected final SolrZkClient zkClient;
protected String shardId;
protected String collection;
@@ -111,6 +112,8 @@ class ShardLeaderElectionContextBase ext
// add core container and stop passing core around...
final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
+ private static Logger log = LoggerFactory.getLogger(ShardLeaderElectionContext.class);
+
private ZkController zkController;
private CoreContainer cc;
private SyncStrategy syncStrategy = new SyncStrategy();
@@ -131,8 +134,6 @@ final class ShardLeaderElectionContext e
String coreName = leaderProps.get(ZkStateReader.CORE_NAME_PROP);
SolrCore core = null;
try {
- // the first time we are run, we will get a startupCore - after
- // we will get null and must use cc.getCore
core = cc.getCore(coreName);
@@ -151,17 +152,19 @@ final class ShardLeaderElectionContext e
if (zkClient.exists(leaderPath, true)) {
zkClient.delete(leaderPath, -1, true);
}
-// System.out.println("I may be the new Leader:" + leaderPath
-// + " - I need to try and sync");
+ log.info("I may be the new leader - try and sync");
+ // we are going to attempt to be the leader
+ // first cancel any current recovery
+ core.getUpdateHandler().getSolrCoreState().cancelRecovery();
boolean success = syncStrategy.sync(zkController, core, leaderProps);
if (!success && anyoneElseActive()) {
rejoinLeaderElection(leaderSeqPath, core);
return;
}
}
+ log.info("I am the new leader: " + ZkCoreNodeProps.getCoreUrl(leaderProps));
// If I am going to be the leader I have to be active
- // System.out.println("I am leader go active");
core.getUpdateHandler().getSolrCoreState().cancelRecovery();
zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
@@ -181,7 +184,8 @@ final class ShardLeaderElectionContext e
// remove our ephemeral and re join the election
// System.out.println("sync failed, delete our election node:"
// + leaderSeqPath);
-
+ log.info("There is a better leader candidate than us - going back into recovery");
+
zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
cancelElection();
@@ -192,8 +196,8 @@ final class ShardLeaderElectionContext e
}
private boolean shouldIBeLeader(ZkNodeProps leaderProps) {
- CloudState cloudState = zkController.getZkStateReader().getCloudState();
- Map<String,Slice> slices = cloudState.getSlices(this.collection);
+ ClusterState clusterState = zkController.getZkStateReader().getClusterState();
+ Map<String,Slice> slices = clusterState.getSlices(this.collection);
Slice slice = slices.get(shardId);
Map<String,ZkNodeProps> shards = slice.getShards();
boolean foundSomeoneElseActive = false;
@@ -203,7 +207,7 @@ final class ShardLeaderElectionContext e
if (new ZkCoreNodeProps(shard.getValue()).getCoreUrl().equals(
new ZkCoreNodeProps(leaderProps).getCoreUrl())) {
if (state.equals(ZkStateReader.ACTIVE)
- && cloudState.liveNodesContain(shard.getValue().get(
+ && clusterState.liveNodesContain(shard.getValue().get(
ZkStateReader.NODE_NAME_PROP))) {
// we are alive
return true;
@@ -211,7 +215,7 @@ final class ShardLeaderElectionContext e
}
if ((state.equals(ZkStateReader.ACTIVE))
- && cloudState.liveNodesContain(shard.getValue().get(
+ && clusterState.liveNodesContain(shard.getValue().get(
ZkStateReader.NODE_NAME_PROP))
&& !new ZkCoreNodeProps(shard.getValue()).getCoreUrl().equals(
new ZkCoreNodeProps(leaderProps).getCoreUrl())) {
@@ -223,8 +227,8 @@ final class ShardLeaderElectionContext e
}
private boolean anyoneElseActive() {
- CloudState cloudState = zkController.getZkStateReader().getCloudState();
- Map<String,Slice> slices = cloudState.getSlices(this.collection);
+ ClusterState clusterState = zkController.getZkStateReader().getClusterState();
+ Map<String,Slice> slices = clusterState.getSlices(this.collection);
Slice slice = slices.get(shardId);
Map<String,ZkNodeProps> shards = slice.getShards();
@@ -233,7 +237,7 @@ final class ShardLeaderElectionContext e
if ((state.equals(ZkStateReader.ACTIVE))
- && cloudState.liveNodesContain(shard.getValue().get(
+ && clusterState.liveNodesContain(shard.getValue().get(
ZkStateReader.NODE_NAME_PROP))) {
return true;
}
@@ -247,16 +251,13 @@ final class ShardLeaderElectionContext e
final class OverseerElectionContext extends ElectionContext {
private final SolrZkClient zkClient;
- private final ZkStateReader stateReader;
- private ShardHandler shardHandler;
- private String adminPath;
-
- public OverseerElectionContext(ShardHandler shardHandler, String adminPath, final String zkNodeName, ZkStateReader stateReader) {
- super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null, stateReader.getZkClient());
- this.stateReader = stateReader;
- this.shardHandler = shardHandler;
- this.adminPath = adminPath;
- this.zkClient = stateReader.getZkClient();
+ private Overseer overseer;
+
+
+ public OverseerElectionContext(SolrZkClient zkClient, Overseer overseer, final String zkNodeName) {
+ super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null, zkClient);
+ this.overseer = overseer;
+ this.zkClient = zkClient;
}
@Override
@@ -278,7 +279,7 @@ final class OverseerElectionContext exte
CreateMode.EPHEMERAL, true);
}
- new Overseer(shardHandler, adminPath, stateReader, id);
+ overseer.start(id);
}
}
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/Overseer.java Mon Aug 13 13:52:46 2012
@@ -24,7 +24,7 @@ import java.util.Map;
import java.util.Map.Entry;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@@ -47,7 +47,7 @@ public class Overseer {
private static Logger log = LoggerFactory.getLogger(Overseer.class);
- private static class CloudStateUpdater implements Runnable {
+ private class ClusterStateUpdater implements Runnable {
private static final String DELETECORE = "deletecore";
private final ZkStateReader reader;
@@ -59,7 +59,7 @@ public class Overseer {
//If Overseer dies while extracting the main queue a new overseer will start from this queue
private final DistributedQueue workQueue;
- public CloudStateUpdater(final ZkStateReader reader, final String myId) {
+ public ClusterStateUpdater(final ZkStateReader reader, final String myId) {
this.zkClient = reader.getZkClient();
this.stateUpdateQueue = getInQueue(zkClient);
this.workQueue = getInternalQueue(zkClient);
@@ -70,7 +70,7 @@ public class Overseer {
@Override
public void run() {
- if(amILeader()) {
+ if(amILeader() && !Overseer.this.isClosed) {
// see if there's something left from the previous Overseer and re
// process all events that were not persisted into cloud state
synchronized (reader.getUpdateLock()) { //XXX this only protects against edits inside single node
@@ -78,17 +78,17 @@ public class Overseer {
byte[] head = workQueue.peek();
if (head != null) {
- reader.updateCloudState(true);
- CloudState cloudState = reader.getCloudState();
+ reader.updateClusterState(true);
+ ClusterState clusterState = reader.getClusterState();
log.info("Replaying operations from work queue.");
while (head != null && amILeader()) {
final ZkNodeProps message = ZkNodeProps.load(head);
final String operation = message
.get(QUEUE_OPERATION);
- cloudState = processMessage(cloudState, message, operation);
+ clusterState = processMessage(clusterState, message, operation);
zkClient.setData(ZkStateReader.CLUSTER_STATE,
- ZkStateReader.toJSON(cloudState), true);
+ ZkStateReader.toJSON(clusterState), true);
workQueue.remove();
head = workQueue.peek();
}
@@ -110,26 +110,26 @@ public class Overseer {
}
log.info("Starting to work on the main queue");
- while (amILeader()) {
+ while (amILeader() && !isClosed) {
synchronized (reader.getUpdateLock()) {
try {
byte[] head = stateUpdateQueue.peek();
if (head != null) {
- reader.updateCloudState(true);
- CloudState cloudState = reader.getCloudState();
+ reader.updateClusterState(true);
+ ClusterState clusterState = reader.getClusterState();
while (head != null) {
final ZkNodeProps message = ZkNodeProps.load(head);
final String operation = message.get(QUEUE_OPERATION);
- cloudState = processMessage(cloudState, message, operation);
+ clusterState = processMessage(clusterState, message, operation);
byte[] processed = stateUpdateQueue.remove();
workQueue.offer(processed);
head = stateUpdateQueue.peek();
}
zkClient.setData(ZkStateReader.CLUSTER_STATE,
- ZkStateReader.toJSON(cloudState), true);
+ ZkStateReader.toJSON(clusterState), true);
}
// clean work queue
while (workQueue.poll() != null);
@@ -157,12 +157,12 @@ public class Overseer {
}
}
- private CloudState processMessage(CloudState cloudState,
+ private ClusterState processMessage(ClusterState clusterState,
final ZkNodeProps message, final String operation) {
if ("state".equals(operation)) {
- cloudState = updateState(cloudState, message);
+ clusterState = updateState(clusterState, message);
} else if (DELETECORE.equals(operation)) {
- cloudState = removeCore(cloudState, message);
+ clusterState = removeCore(clusterState, message);
} else if (ZkStateReader.LEADER_PROP.equals(operation)) {
StringBuilder sb = new StringBuilder();
String baseUrl = message.get(ZkStateReader.BASE_URL_PROP);
@@ -172,14 +172,14 @@ public class Overseer {
sb.append(coreName == null ? "" : coreName);
if (!(sb.substring(sb.length() - 1).equals("/"))) sb
.append("/");
- cloudState = setShardLeader(cloudState,
+ clusterState = setShardLeader(clusterState,
message.get(ZkStateReader.COLLECTION_PROP),
message.get(ZkStateReader.SHARD_ID_PROP), sb.toString());
} else {
throw new RuntimeException("unknown operation:" + operation
+ " contents:" + message.getProperties());
}
- return cloudState;
+ return clusterState;
}
private boolean amILeader() {
@@ -199,7 +199,7 @@ public class Overseer {
/**
* Try to assign core to the cluster.
*/
- private CloudState updateState(CloudState state, final ZkNodeProps message) {
+ private ClusterState updateState(ClusterState state, final ZkNodeProps message) {
final String collection = message.get(ZkStateReader.COLLECTION_PROP);
final String zkCoreNodeName = message.get(ZkStateReader.NODE_NAME_PROP) + "_" + message.get(ZkStateReader.CORE_NAME_PROP);
final Integer numShards = message.get(ZkStateReader.NUM_SHARDS_PROP)!=null?Integer.parseInt(message.get(ZkStateReader.NUM_SHARDS_PROP)):null;
@@ -214,7 +214,7 @@ public class Overseer {
String shardId = message.get(ZkStateReader.SHARD_ID_PROP);
if (shardId == null) {
String nodeName = message.get(ZkStateReader.NODE_NAME_PROP);
- //get shardId from CloudState
+ //get shardId from ClusterState
shardId = getAssignedId(state, nodeName, message);
}
if(shardId == null) {
@@ -242,11 +242,11 @@ public class Overseer {
shardProps.put(zkCoreNodeName, zkProps);
slice = new Slice(shardId, shardProps);
- CloudState newCloudState = updateSlice(state, collection, slice);
- return newCloudState;
+ ClusterState newClusterState = updateSlice(state, collection, slice);
+ return newClusterState;
}
- private CloudState createCollection(CloudState state, String collectionName, int numShards) {
+ private ClusterState createCollection(ClusterState state, String collectionName, int numShards) {
Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String, Slice>>();
Map<String, Slice> newSlices = new LinkedHashMap<String,Slice>();
newStates.putAll(state.getCollectionStates());
@@ -255,14 +255,14 @@ public class Overseer {
newSlices.put(sliceName, new Slice(sliceName, Collections.EMPTY_MAP));
}
newStates.put(collectionName, newSlices);
- CloudState newCloudState = new CloudState(state.getLiveNodes(), newStates);
- return newCloudState;
+ ClusterState newClusterState = new ClusterState(state.getLiveNodes(), newStates);
+ return newClusterState;
}
/*
* Return an already assigned id or null if not assigned
*/
- private String getAssignedId(final CloudState state, final String nodeName,
+ private String getAssignedId(final ClusterState state, final String nodeName,
final ZkNodeProps coreState) {
final String key = coreState.get(ZkStateReader.NODE_NAME_PROP) + "_" + coreState.get(ZkStateReader.CORE_NAME_PROP);
Map<String, Slice> slices = state.getSlices(coreState.get(ZkStateReader.COLLECTION_PROP));
@@ -276,7 +276,7 @@ public class Overseer {
return null;
}
- private CloudState updateSlice(CloudState state, String collection, Slice slice) {
+ private ClusterState updateSlice(ClusterState state, String collection, Slice slice) {
final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
newStates.putAll(state.getCollectionStates());
@@ -306,10 +306,10 @@ public class Overseer {
final Slice updatedSlice = new Slice(slice.getName(), shards);
slices.put(slice.getName(), updatedSlice);
}
- return new CloudState(state.getLiveNodes(), newStates);
+ return new ClusterState(state.getLiveNodes(), newStates);
}
- private CloudState setShardLeader(CloudState state, String collection, String sliceName, String leaderUrl) {
+ private ClusterState setShardLeader(ClusterState state, String collection, String sliceName, String leaderUrl) {
final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
newStates.putAll(state.getCollectionStates());
@@ -341,21 +341,21 @@ public class Overseer {
Slice slice = new Slice(sliceName, newShards);
slices.put(sliceName, slice);
}
- return new CloudState(state.getLiveNodes(), newStates);
+ return new ClusterState(state.getLiveNodes(), newStates);
}
/*
* Remove core from cloudstate
*/
- private CloudState removeCore(final CloudState cloudState, ZkNodeProps message) {
+ private ClusterState removeCore(final ClusterState clusterState, ZkNodeProps message) {
final String coreNodeName = message.get(ZkStateReader.NODE_NAME_PROP) + "_" + message.get(ZkStateReader.CORE_NAME_PROP);
final String collection = message.get(ZkStateReader.COLLECTION_PROP);
final LinkedHashMap<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
- for(String collectionName: cloudState.getCollections()) {
+ for(String collectionName: clusterState.getCollections()) {
if(collection.equals(collectionName)) {
- Map<String, Slice> slices = cloudState.getSlices(collection);
+ Map<String, Slice> slices = clusterState.getSlices(collection);
LinkedHashMap<String, Slice> newSlices = new LinkedHashMap<String, Slice>();
for(Slice slice: slices.values()) {
if(slice.getShards().containsKey(coreNodeName)) {
@@ -393,48 +393,72 @@ public class Overseer {
}
}
} else {
- newStates.put(collectionName, cloudState.getSlices(collectionName));
+ newStates.put(collectionName, clusterState.getSlices(collectionName));
}
}
- CloudState newState = new CloudState(cloudState.getLiveNodes(), newStates);
+ ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newStates);
return newState;
}
}
+
+ private Thread ccThread;
+
+ private Thread updaterThread;
+
+ private volatile boolean isClosed;
+
+ private ZkStateReader reader;
+
+ private ShardHandler shardHandler;
+
+ private String adminPath;
+
+ public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader) throws KeeperException, InterruptedException {
+ this.reader = reader;
+ this.shardHandler = shardHandler;
+ this.adminPath = adminPath;
+ }
- public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader, final String id) throws KeeperException, InterruptedException {
+ public void start(String id) {
log.info("Overseer (id=" + id + ") starting");
createOverseerNode(reader.getZkClient());
//launch cluster state updater thread
ThreadGroup tg = new ThreadGroup("Overseer state updater.");
- Thread updaterThread = new Thread(tg, new CloudStateUpdater(reader, id));
+ updaterThread = new Thread(tg, new ClusterStateUpdater(reader, id));
updaterThread.setDaemon(true);
- updaterThread.start();
-
+
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
- Thread ccThread = new Thread(ccTg, new OverseerCollectionProcessor(reader, id, shardHandler, adminPath));
+ ccThread = new Thread(ccTg, new OverseerCollectionProcessor(reader, id, shardHandler, adminPath),
+ "Overseer-" + id);
ccThread.setDaemon(true);
+
+ updaterThread.start();
ccThread.start();
}
+
+ public void close() {
+ isClosed = true;
+ }
/**
* Get queue that can be used to send messages to Overseer.
*/
public static DistributedQueue getInQueue(final SolrZkClient zkClient) {
createOverseerNode(zkClient);
- return new DistributedQueue(zkClient.getSolrZooKeeper(), "/overseer/queue", null);
+ return new DistributedQueue(zkClient, "/overseer/queue", null);
}
/* Internal queue, not to be used outside of Overseer */
static DistributedQueue getInternalQueue(final SolrZkClient zkClient) {
createOverseerNode(zkClient);
- return new DistributedQueue(zkClient.getSolrZooKeeper(), "/overseer/queue-work", null);
+ return new DistributedQueue(zkClient, "/overseer/queue-work", null);
}
/* Collection creation queue */
static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) {
createOverseerNode(zkClient);
- return new DistributedQueue(zkClient.getSolrZooKeeper(), "/overseer/collection-queue-work", null);
+ return new DistributedQueue(zkClient, "/overseer/collection-queue-work", null);
}
private static void createOverseerNode(final SolrZkClient zkClient) {
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Mon Aug 13 13:52:46 2012
@@ -25,7 +25,7 @@ import java.util.Set;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -64,6 +64,8 @@ public class OverseerCollectionProcessor
private String adminPath;
private ZkStateReader zkStateReader;
+
+ private boolean isClosed;
public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath) {
this.zkStateReader = zkStateReader;
@@ -76,7 +78,7 @@ public class OverseerCollectionProcessor
@Override
public void run() {
log.info("Process current queue of collection creations");
- while (amILeader()) {
+ while (amILeader() && !isClosed) {
try {
byte[] head = workQueue.peek(true);
@@ -108,6 +110,10 @@ public class OverseerCollectionProcessor
}
}
+ public void close() {
+ isClosed = true;
+ }
+
private boolean amILeader() {
try {
ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData(
@@ -126,22 +132,22 @@ public class OverseerCollectionProcessor
private boolean processMessage(ZkNodeProps message, String operation) {
if (CREATECOLLECTION.equals(operation)) {
- return createCollection(zkStateReader.getCloudState(), message);
+ return createCollection(zkStateReader.getClusterState(), message);
} else if (DELETECOLLECTION.equals(operation)) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
- return collectionCmd(zkStateReader.getCloudState(), message, params);
+ return collectionCmd(zkStateReader.getClusterState(), message, params);
} else if (RELOADCOLLECTION.equals(operation)) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
- return collectionCmd(zkStateReader.getCloudState(), message, params);
+ return collectionCmd(zkStateReader.getClusterState(), message, params);
}
// unknown command, toss it from our queue
return true;
}
- private boolean createCollection(CloudState cloudState, ZkNodeProps message) {
+ private boolean createCollection(ClusterState clusterState, ZkNodeProps message) {
// look at the replication factor and see if it matches reality
// if it does not, find best nodes to create more cores
@@ -176,7 +182,7 @@ public class OverseerCollectionProcessor
// TODO: add smarter options that look at the current number of cores per node?
// for now we just go random
- Set<String> nodes = cloudState.getLiveNodes();
+ Set<String> nodes = clusterState.getLiveNodes();
List<String> nodeList = new ArrayList<String>(nodes.size());
nodeList.addAll(nodes);
Collections.shuffle(nodeList);
@@ -229,11 +235,11 @@ public class OverseerCollectionProcessor
return true;
}
- private boolean collectionCmd(CloudState cloudState, ZkNodeProps message, ModifiableSolrParams params) {
+ private boolean collectionCmd(ClusterState clusterState, ZkNodeProps message, ModifiableSolrParams params) {
log.info("Executing Collection Cmd : " + params);
String name = message.get("name");
- Map<String,Slice> slices = cloudState.getCollectionStates().get(name);
+ Map<String,Slice> slices = clusterState.getCollectionStates().get(name);
if (slices == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Could not find collection:" + name);
@@ -245,7 +251,7 @@ public class OverseerCollectionProcessor
Set<Map.Entry<String,ZkNodeProps>> shardEntries = shards.entrySet();
for (Map.Entry<String,ZkNodeProps> shardEntry : shardEntries) {
final ZkNodeProps node = shardEntry.getValue();
- if (cloudState.liveNodesContain(node.get(ZkStateReader.NODE_NAME_PROP))) {
+ if (clusterState.liveNodesContain(node.get(ZkStateReader.NODE_NAME_PROP))) {
params.set(CoreAdminParams.CORE, node.get(ZkStateReader.CORE_NAME_PROP));
String replica = node.get(ZkStateReader.BASE_URL_PROP);
Modified: lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1372423&r1=1372422&r2=1372423&view=diff
==============================================================================
--- lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/branches/LUCENE-2878/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Mon Aug 13 13:52:46 2012
@@ -18,6 +18,7 @@ package org.apache.solr.cloud;
*/
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -58,7 +59,7 @@ import org.slf4j.LoggerFactory;
public class RecoveryStrategy extends Thread implements SafeStopThread {
private static final int MAX_RETRIES = 500;
private static final int INTERRUPTED = MAX_RETRIES + 1;
- private static final int START_TIMEOUT = 100;
+ private static final int STARTING_RECOVERY_DELAY = 1000;
private static final String REPLICATION_HANDLER = "/replication";
@@ -99,7 +100,7 @@ public class RecoveryStrategy extends Th
private void recoveryFailed(final SolrCore core,
final ZkController zkController, final String baseUrl,
final String shardZkNodeName, final CoreDescriptor cd) throws KeeperException, InterruptedException {
- SolrException.log(log, "Recovery failed - I give up.");
+ SolrException.log(log, "Recovery failed - I give up. core=" + coreName);
try {
zkController.publish(cd, ZkStateReader.RECOVERY_FAILED);
} finally {
@@ -114,7 +115,7 @@ public class RecoveryStrategy extends Th
ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops);
String leaderUrl = leaderCNodeProps.getCoreUrl();
- log.info("Attempting to replicate from " + leaderUrl);
+ log.info("Attempting to replicate from " + leaderUrl + ". core=" + coreName);
// if we are the leader, either we are trying to recover faster
// then our ephemeral timed out or we are the only node
@@ -204,7 +205,7 @@ public class RecoveryStrategy extends Th
SolrQueryResponse rsp = new SolrQueryResponse();
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp));
- log.info("Starting recovery process. recoveringAfterStartup=" + recoveringAfterStartup);
+ log.info("Starting recovery process. core=" + coreName + " recoveringAfterStartup=" + recoveringAfterStartup);
try {
doRecovery(core);
@@ -232,7 +233,7 @@ public class RecoveryStrategy extends Th
UpdateLog ulog;
ulog = core.getUpdateHandler().getUpdateLog();
if (ulog == null) {
- SolrException.log(log, "No UpdateLog found - cannot recover");
+ SolrException.log(log, "No UpdateLog found - cannot recover. core=" + coreName);
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
return;
@@ -240,11 +241,17 @@ public class RecoveryStrategy extends Th
List<Long> recentVersions;
- UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
+ UpdateLog.RecentUpdates recentUpdates = null;
try {
+ recentUpdates = ulog.getRecentUpdates();
recentVersions = recentUpdates.getVersions(ulog.numRecordsToKeep);
+ } catch (Throwable t) {
+ SolrException.log(log, "Corrupt tlog - ignoring. core=" + coreName, t);
+ recentVersions = new ArrayList<Long>(0);
} finally {
- recentUpdates.close();
+ if (recentUpdates != null) {
+ recentUpdates.close();
+ }
}
List<Long> startingVersions = ulog.getStartingVersions();
@@ -278,6 +285,7 @@ public class RecoveryStrategy extends Th
// last operation at the time of startup had the GAP flag set...
// this means we were previously doing a full index replication
// that probably didn't complete and buffering updates in the meantime.
+ log.info("Looks like a previous replication recovery did not complete - skipping peer sync. core=" + coreName);
firstTime = false; // skip peersync
}
}
@@ -303,7 +311,7 @@ public class RecoveryStrategy extends Th
// first thing we just try to sync
if (firstTime) {
firstTime = false; // only try sync the first time through the loop
- log.info("Attempting to PeerSync from " + leaderUrl + " recoveringAfterStartup="+recoveringAfterStartup);
+ log.info("Attempting to PeerSync from " + leaderUrl + " core=" + coreName + " - recoveringAfterStartup="+recoveringAfterStartup);
// System.out.println("Attempting to PeerSync from " + leaderUrl
// + " i am:" + zkController.getNodeName());
PeerSync peerSync = new PeerSync(core,
@@ -314,7 +322,7 @@ public class RecoveryStrategy extends Th
SolrQueryRequest req = new LocalSolrQueryRequest(core,
new ModifiableSolrParams());
core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
- log.info("Sync Recovery was successful - registering as Active");
+ log.info("PeerSync Recovery was successful - registering as Active. core=" + coreName);
// System.out
// .println("Sync Recovery was successful - registering as Active "
// + zkController.getNodeName());
@@ -343,11 +351,11 @@ public class RecoveryStrategy extends Th
return;
}
- log.info("Sync Recovery was not successful - trying replication");
+ log.info("PeerSync Recovery was not successful - trying replication. core=" + coreName);
}
//System.out.println("Sync Recovery was not successful - trying replication");
-
- log.info("Begin buffering updates");
+ log.info("Starting Replication Recovery. core=" + coreName);
+ log.info("Begin buffering updates. core=" + coreName);
ulog.bufferUpdates();
replayed = false;
@@ -359,7 +367,7 @@ public class RecoveryStrategy extends Th
replay(ulog);
replayed = true;
- log.info("Recovery was successful - registering as Active");
+ log.info("Replication Recovery was successful - registering as Active. core=" + coreName);
// if there are pending recovery requests, don't advert as active
zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
close = true;
@@ -382,7 +390,7 @@ public class RecoveryStrategy extends Th
}
} catch (Throwable t) {
- log.error("Error while trying to recover.", t);
+ log.error("Error while trying to recover. core=" + coreName, t);
}
if (!successfulRecovery) {
@@ -391,13 +399,13 @@ public class RecoveryStrategy extends Th
// Or do a fall off retry...
try {
- log.error("Recovery failed - trying again...");
+ log.error("Recovery failed - trying again... core=" + coreName);
retries++;
if (retries >= MAX_RETRIES) {
if (retries == INTERRUPTED) {
} else {
- log.error("Recovery failed - max retries exceeded.");
+ log.error("Recovery failed - max retries exceeded. core=" + coreName);
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
}
@@ -405,24 +413,25 @@ public class RecoveryStrategy extends Th
}
} catch (Exception e) {
- log.error("", e);
+ log.error("core=" + coreName, e);
}
try {
- // if (!isClosed()) Thread.sleep(Math.min(START_TIMEOUT * retries, 60000));
- for (int i = 0; i<Math.min(retries, 600); i++) {
+ // start at 1 sec and work up to a couple min
+ double loopCount = Math.min(Math.pow(2, retries), 600);
+ for (int i = 0; i < loopCount; i++) {
if (isClosed()) break; // check if someone closed us
- Thread.sleep(START_TIMEOUT);
+ Thread.sleep(STARTING_RECOVERY_DELAY);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- log.warn("Recovery was interrupted", e);
+ log.warn("Recovery was interrupted. core=" + coreName, e);
retries = INTERRUPTED;
}
}
}
- log.info("Finished recovery process");
+ log.info("Finished recovery process. core=" + coreName);
}
@@ -431,9 +440,9 @@ public class RecoveryStrategy extends Th
Future<RecoveryInfo> future = ulog.applyBufferedUpdates();
if (future == null) {
// no replay needed\
- log.info("No replay needed");
+ log.info("No replay needed. core=" + coreName);
} else {
- log.info("Replaying buffered documents");
+ log.info("Replaying buffered documents. core=" + coreName);
// wait for replay
future.get();
}