You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2012/09/03 18:36:21 UTC
svn commit: r1380287 - in /lucene/dev/trunk/solr:
core/src/java/org/apache/solr/core/
core/src/java/org/apache/solr/handler/admin/
core/src/java/org/apache/solr/update/
solrj/src/java/org/apache/solr/common/cloud/
Author: yonik
Date: Mon Sep 3 16:36:20 2012
New Revision: 1380287
URL: http://svn.apache.org/viewvc?rev=1380287&view=rev
Log:
SOLR-3755: shard splitting - create cores, use writers from cores
Modified:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1380287&r1=1380286&r2=1380287&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java Mon Sep 3 16:36:20 2012
@@ -809,7 +809,8 @@ public final class SolrCore implements S
// this core current usage count
private final AtomicInteger refCount = new AtomicInteger(1);
- final void open() {
+ /** expert: increments the core reference count */
+ public void open() {
refCount.incrementAndGet();
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1380287&r1=1380286&r2=1380287&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Mon Sep 3 16:36:20 2012
@@ -69,6 +69,9 @@ import org.apache.solr.util.RefCounted;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
/**
*
@@ -212,6 +215,53 @@ public class CoreAdminHandler extends Re
rsp.setHttpCaching(false);
}
+ /** Creates a new core and registers it. The returned core will have it's reference count incremented an extra time and close() should be called when finished. */
+ private SolrCore createCore(SolrCore current, int ord, HashPartitioner.Range newRange) throws IOException, SAXException, ParserConfigurationException {
+ CoreDescriptor currCoreD = current.getCoreDescriptor();
+ CloudDescriptor currCloudD = currCoreD.getCloudDescriptor();
+
+ String currName = currCoreD.getName();
+
+ // TODO: nicer way to come up with core names?
+ String name = currName + "_" + ord;
+
+ String instanceDir = name;
+
+
+ // TODO: do this via a clone / constructor?
+ CoreDescriptor dcore = new CoreDescriptor(coreContainer, name, instanceDir);
+ dcore.setConfigName( currCoreD.getConfigName() );
+ dcore.setSchemaName(currCoreD.getSchemaName());
+ // default dcore.setDataDir()
+
+ // TODO: for this to work in non-cloud mode, we will either need to make a copy of the conf directory, or
+ // develop named configs like we have in cloud mode.
+
+
+ CloudDescriptor cd = null;
+ if (currCloudD != null) {
+ cd = new CloudDescriptor();
+
+ // TODO: should we copy these? any params that are core specific?
+ cd.setParams( currCloudD.getParams() );
+ cd.setCollectionName( currCloudD.getCollectionName() );
+ cd.setRoles( currCloudD.getRoles() );
+
+ // TODO: we must be careful that an unrelated node starting up does not try
+ // to become the new shard leader! How do we designate ourselves as the
+ // leader but prevent new shards from trying to replicate from us before we are ready (i.e. have the split index)?
+ String shardId = currCloudD.getShardId() + "_" + ord;
+ cd.setShardId( shardId );
+
+ dcore.setCloudDescriptor(cd);
+ }
+
+ SolrCore core = coreContainer.create(dcore);
+ core.open(); // inc ref count before registering to ensure no one can close the core before we are done with it
+ coreContainer.register(name, core, false);
+ return core;
+ }
+
protected boolean handleSplitAction(SolrQueryRequest adminReq, SolrQueryResponse rsp) throws IOException {
SolrParams params = adminReq.getParams();
@@ -222,39 +272,49 @@ public class CoreAdminHandler extends Re
// boolean closeDirectories = true;
// DirectoryFactory dirFactory = null;
+ String[] pathsArr = params.getParams("path");
+ String rangesStr = params.get("ranges"); // ranges=a-b,c-d,e-f
String cname = params.get(CoreAdminParams.CORE, "");
SolrCore core = coreContainer.getCore(cname);
SolrQueryRequest req = new LocalSolrQueryRequest(core, params);
- try {
+ List<SolrCore> newCores = null;
- String[] pathsArr = params.getParams("path");
+ try {
+ // TODO: allow use of rangesStr in the future
List<String> paths = null;
-
- String rangesStr = params.get("ranges"); // ranges=a-b,c-d,e-f
+ int partitions = pathsArr != null ? pathsArr.length : params.getInt("partitions", 2);
- // dirFactory = core.getDirectoryFactory();
+ // TODO: if we don't know the real range of the current core, we should just
+ // split on every other doc rather than hash.
+ // TODO (cloud): get from the current core
+ HashPartitioner.Range currentRange = new HashPartitioner.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
- if (pathsArr != null) {
+ HashPartitioner hp = new HashPartitioner();
+ ranges = hp.partitionRange(partitions, currentRange);
- paths = Arrays.asList(pathsArr);
-
- if (rangesStr == null) {
- HashPartitioner hp = new HashPartitioner();
- // should this be static?
- // TODO: use real range if we know it. If we don't know it, we should prob
- // split on every other doc rather than on a hash?
- ranges = hp.partitionRange(pathsArr.length, Integer.MIN_VALUE, Integer.MAX_VALUE);
+ if (pathsArr == null) {
+ newCores = new ArrayList<SolrCore>(partitions);
+ for (int i=0; i<partitions; i++) {
+ SolrCore newCore = createCore(core, i, ranges.get(i));
+ newCores.add(newCore);
}
+ // TODO (cloud): cores should be registered, should be in recovery / buffering-updates mode, and the shard
+ // leader should be forwarding updates to the new shards *before* we split the current shard
+ // into the new shards.
+ } else {
+ paths = Arrays.asList(pathsArr);
}
- SplitIndexCommand cmd = new SplitIndexCommand(req, paths, ranges);
+ SplitIndexCommand cmd = new SplitIndexCommand(req, paths, newCores, ranges);
core.getUpdateHandler().split(cmd);
+ // After the split has completed, someone (here?) should start the process of replaying the buffered updates.
+
} catch (Exception e) {
log.error("ERROR executing split:", e);
throw new RuntimeException(e);
@@ -262,6 +322,11 @@ public class CoreAdminHandler extends Re
} finally {
if (req != null) req.close();
if (core != null) core.close();
+ if (newCores != null) {
+ for (SolrCore newCore : newCores) {
+ newCore.close();
+ }
+ }
}
return false;
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java?rev=1380287&r1=1380286&r2=1380287&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java Mon Sep 3 16:36:20 2012
@@ -35,6 +35,7 @@ import org.apache.solr.common.util.Hash;
import org.apache.solr.core.SolrCore;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +51,7 @@ public class SolrIndexSplitter {
List<HashPartitioner.Range> ranges;
HashPartitioner.Range[] rangesArr; // same as ranges list, but an array for extra speed in inner loops
List<String> paths;
+ List<SolrCore> cores;
public SolrIndexSplitter(SplitIndexCommand cmd) {
field = cmd.getReq().getSchema().getUniqueKeyField();
@@ -57,6 +59,7 @@ public class SolrIndexSplitter {
ranges = cmd.ranges;
rangesArr = ranges.toArray(new HashPartitioner.Range[ranges.size()]);
paths = cmd.paths;
+ cores = cmd.cores;
}
public void split() throws IOException {
@@ -76,6 +79,7 @@ public class SolrIndexSplitter {
// would it be more efficient to write segment-at-a-time to each new index?
// - need to worry about number of open descriptors
// - need to worry about if IW.addIndexes does a sync or not...
+ // - would be more efficient on the read side, but prob less efficient merging
IndexReader[] subReaders = new IndexReader[leaves.size()];
for (int partitionNumber=0; partitionNumber<ranges.size(); partitionNumber++) {
@@ -85,22 +89,35 @@ public class SolrIndexSplitter {
subReaders[segmentNumber] = new LiveDocsReader( leaves.get(segmentNumber), segmentDocSets.get(segmentNumber)[partitionNumber] );
}
- String path = paths.get(partitionNumber);
boolean success = false;
- SolrCore core = searcher.getCore();
- IndexWriter iw = new SolrIndexWriter("SplittingIndexWriter"+partitionNumber + " " + ranges.get(partitionNumber), path,
- core.getDirectoryFactory(), true, core.getSchema(),
- core.getSolrConfig().indexConfig, core.getDeletionPolicy(), core.getCodec(), true);
+
+ RefCounted<IndexWriter> iwRef = null;
+ IndexWriter iw = null;
+ if (cores != null) {
+ SolrCore subCore = cores.get(partitionNumber);
+ iwRef = subCore.getUpdateHandler().getSolrCoreState().getIndexWriter(subCore);
+ iw = iwRef.get();
+ } else {
+ SolrCore core = searcher.getCore();
+ String path = paths.get(partitionNumber);
+ iw = new SolrIndexWriter("SplittingIndexWriter"+partitionNumber + " " + ranges.get(partitionNumber), path,
+ core.getDirectoryFactory(), true, core.getSchema(),
+ core.getSolrConfig().indexConfig, core.getDeletionPolicy(), core.getCodec(), true);
+ }
try {
+ // This merges the subreaders and will thus remove deletions (i.e. no optimize needed)
iw.addIndexes(subReaders);
- // TODO: will many deletes have been removed, or should we optimize?
success = true;
} finally {
- if (success) {
- IOUtils.close(iw);
+ if (iwRef != null) {
+ iwRef.decref();
} else {
- IOUtils.closeWhileHandlingException(iw);
+ if (success) {
+ IOUtils.close(iw);
+ } else {
+ IOUtils.closeWhileHandlingException(iw);
+ }
}
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java?rev=1380287&r1=1380286&r2=1380287&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java Mon Sep 3 16:36:20 2012
@@ -18,6 +18,7 @@
package org.apache.solr.update;
import org.apache.solr.common.cloud.HashPartitioner;
+import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import java.util.List;
@@ -31,12 +32,14 @@ import java.util.List;
public class SplitIndexCommand extends UpdateCommand {
// public List<Directory> dirs;
public List<String> paths;
+ public List<SolrCore> cores; // either paths or cores should be specified
public List<HashPartitioner.Range> ranges;
// TODO: allow specification of custom hash function
- public SplitIndexCommand(SolrQueryRequest req, List<String> paths, List<HashPartitioner.Range> ranges) {
+ public SplitIndexCommand(SolrQueryRequest req, List<String> paths, List<SolrCore> cores, List<HashPartitioner.Range> ranges) {
super(req);
this.paths = paths;
+ this.cores = cores;
this.ranges = ranges;
}
@@ -49,6 +52,7 @@ public class SplitIndexCommand extends U
public String toString() {
StringBuilder sb = new StringBuilder(super.toString());
sb.append(",paths=" + paths);
+ sb.append(",cores=" + cores);
sb.append(",ranges=" + ranges);
sb.append('}');
return sb.toString();
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java?rev=1380287&r1=1380286&r2=1380287&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java Mon Sep 3 16:36:20 2012
@@ -27,6 +27,8 @@ import java.util.List;
public class HashPartitioner {
// Hash ranges can't currently "wrap" - i.e. max must be greater or equal to min.
+ // TODO: ranges may not be all contiguous in the future (either that or we will
+ // need an extra class to model a collection of ranges)
public static class Range {
public int min; // inclusive
public int max; // inclusive
@@ -48,7 +50,12 @@ public class HashPartitioner {
return null; // TODO
}
}
-
+
+
+
+ public List<Range> partitionRange(int partitions, Range range) {
+ return partitionRange(partitions, range.min, range.max);
+ }
/**
*