You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2012/09/03 18:36:21 UTC

svn commit: r1380287 - in /lucene/dev/trunk/solr: core/src/java/org/apache/solr/core/ core/src/java/org/apache/solr/handler/admin/ core/src/java/org/apache/solr/update/ solrj/src/java/org/apache/solr/common/cloud/

Author: yonik
Date: Mon Sep  3 16:36:20 2012
New Revision: 1380287

URL: http://svn.apache.org/viewvc?rev=1380287&view=rev
Log:
SOLR-3755: shard splitting - create cores, use writers from cores

Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1380287&r1=1380286&r2=1380287&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java Mon Sep  3 16:36:20 2012
@@ -809,7 +809,8 @@ public final class SolrCore implements S
   // this core current usage count
   private final AtomicInteger refCount = new AtomicInteger(1);
 
-  final void open() {
+  /** expert: increments the core reference count */
+  public void open() {
     refCount.incrementAndGet();
   }
   

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1380287&r1=1380286&r2=1380287&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Mon Sep  3 16:36:20 2012
@@ -69,6 +69,9 @@ import org.apache.solr.util.RefCounted;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
 
 /**
  *
@@ -212,6 +215,53 @@ public class CoreAdminHandler extends Re
     rsp.setHttpCaching(false);
   }
 
+  /** Creates a new core and registers it. The returned core will have it's reference count incremented an extra time and close() should be called when finished. */
+  private SolrCore createCore(SolrCore current, int ord, HashPartitioner.Range newRange) throws IOException, SAXException, ParserConfigurationException {
+    CoreDescriptor currCoreD = current.getCoreDescriptor();
+    CloudDescriptor currCloudD = currCoreD.getCloudDescriptor();
+
+    String currName = currCoreD.getName();
+
+    // TODO: nicer way to come up with core names?
+    String name = currName + "_" + ord;
+
+    String instanceDir = name;
+
+
+    // TODO: do this via a clone / constructor?
+    CoreDescriptor dcore = new CoreDescriptor(coreContainer, name, instanceDir);
+    dcore.setConfigName( currCoreD.getConfigName() );
+    dcore.setSchemaName(currCoreD.getSchemaName());
+    // default dcore.setDataDir()
+
+    // TODO: for this to work in non-cloud mode, we will either need to make a copy of the conf directory, or
+    // develop named configs like we have in cloud mode.
+
+
+    CloudDescriptor cd = null;
+    if (currCloudD != null) {
+      cd = new CloudDescriptor();
+
+      // TODO: should we copy these?  any params that are core specific?
+      cd.setParams( currCloudD.getParams() );
+      cd.setCollectionName( currCloudD.getCollectionName() );
+      cd.setRoles( currCloudD.getRoles() );
+
+      // TODO: we must be careful that an unrelated node starting up does not try
+      // to become the new shard leader!  How do we designate ourselves as the
+      // leader but prevent new shards from trying to replicate from us before we are ready (i.e. have the split index)?
+      String shardId = currCloudD.getShardId() + "_" + ord;
+      cd.setShardId( shardId );
+
+      dcore.setCloudDescriptor(cd);
+    }
+
+    SolrCore core = coreContainer.create(dcore);
+    core.open();  // inc ref count before registering to ensure no one can close the core before we are done with it
+    coreContainer.register(name, core, false);
+    return core;
+  }
+
 
   protected boolean handleSplitAction(SolrQueryRequest adminReq, SolrQueryResponse rsp) throws IOException {
     SolrParams params = adminReq.getParams();
@@ -222,39 +272,49 @@ public class CoreAdminHandler extends Re
     // boolean closeDirectories = true;
     // DirectoryFactory dirFactory = null;
 
+    String[] pathsArr = params.getParams("path");
+    String rangesStr = params.get("ranges");    // ranges=a-b,c-d,e-f
 
     String cname = params.get(CoreAdminParams.CORE, "");
     SolrCore core = coreContainer.getCore(cname);
     SolrQueryRequest req = new LocalSolrQueryRequest(core, params);
-    try {
+    List<SolrCore> newCores = null;
 
-      String[] pathsArr = params.getParams("path");
+    try {
+      // TODO: allow use of rangesStr in the future
       List<String> paths = null;
-
-      String rangesStr = params.get("ranges");    // ranges=a-b,c-d,e-f
+      int partitions = pathsArr != null ? pathsArr.length : params.getInt("partitions", 2);
 
 
-      // dirFactory = core.getDirectoryFactory();
+      // TODO: if we don't know the real range of the current core, we should just
+      //  split on every other doc rather than hash.
 
+      // TODO (cloud): get from the current core
+      HashPartitioner.Range currentRange = new HashPartitioner.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
 
-      if (pathsArr != null) {
+      HashPartitioner hp = new HashPartitioner();
+      ranges = hp.partitionRange(partitions, currentRange);
 
-        paths = Arrays.asList(pathsArr);
-
-        if (rangesStr == null) {
-          HashPartitioner hp = new HashPartitioner();
-          // should this be static?
-          // TODO: use real range if we know it.  If we don't know it, we should prob
-          // split on every other doc rather than on a hash?
-          ranges = hp.partitionRange(pathsArr.length, Integer.MIN_VALUE, Integer.MAX_VALUE);
+      if (pathsArr == null) {
+        newCores = new ArrayList<SolrCore>(partitions);
+        for (int i=0; i<partitions; i++) {
+          SolrCore newCore = createCore(core, i, ranges.get(i));
+          newCores.add(newCore);
         }
 
+        // TODO (cloud): cores should be registered, should be in recovery / buffering-updates mode, and the shard
+        // leader should be forwarding updates to the new shards *before* we split the current shard
+        // into the new shards.
+      } else {
+        paths = Arrays.asList(pathsArr);
       }
 
 
-      SplitIndexCommand cmd = new SplitIndexCommand(req, paths, ranges);
+      SplitIndexCommand cmd = new SplitIndexCommand(req, paths, newCores, ranges);
       core.getUpdateHandler().split(cmd);
 
+      // After the split has completed, someone (here?) should start the process of replaying the buffered updates.
+
     } catch (Exception e) {
       log.error("ERROR executing split:", e);
       throw new RuntimeException(e);
@@ -262,6 +322,11 @@ public class CoreAdminHandler extends Re
     } finally {
       if (req != null) req.close();
       if (core != null) core.close();
+      if (newCores != null) {
+        for (SolrCore newCore : newCores) {
+          newCore.close();
+        }
+      }
     }
 
     return false;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java?rev=1380287&r1=1380286&r2=1380287&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java Mon Sep  3 16:36:20 2012
@@ -35,6 +35,7 @@ import org.apache.solr.common.util.Hash;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.schema.SchemaField;
 import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.util.RefCounted;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +51,7 @@ public class SolrIndexSplitter {
   List<HashPartitioner.Range> ranges;
   HashPartitioner.Range[] rangesArr; // same as ranges list, but an array for extra speed in inner loops
   List<String> paths;
+  List<SolrCore> cores;
 
   public SolrIndexSplitter(SplitIndexCommand cmd) {
     field = cmd.getReq().getSchema().getUniqueKeyField();
@@ -57,6 +59,7 @@ public class SolrIndexSplitter {
     ranges = cmd.ranges;
     rangesArr = ranges.toArray(new HashPartitioner.Range[ranges.size()]);
     paths = cmd.paths;
+    cores = cmd.cores;
   }
 
   public void split() throws IOException {
@@ -76,6 +79,7 @@ public class SolrIndexSplitter {
     // would it be more efficient to write segment-at-a-time to each new index?
     // - need to worry about number of open descriptors
     // - need to worry about if IW.addIndexes does a sync or not...
+    // - would be more efficient on the read side, but prob less efficient merging
 
     IndexReader[] subReaders = new IndexReader[leaves.size()];
     for (int partitionNumber=0; partitionNumber<ranges.size(); partitionNumber++) {
@@ -85,22 +89,35 @@ public class SolrIndexSplitter {
         subReaders[segmentNumber] = new LiveDocsReader( leaves.get(segmentNumber), segmentDocSets.get(segmentNumber)[partitionNumber] );
       }
 
-      String path = paths.get(partitionNumber);
       boolean success = false;
-      SolrCore core = searcher.getCore();
-      IndexWriter iw = new SolrIndexWriter("SplittingIndexWriter"+partitionNumber + " " + ranges.get(partitionNumber), path,
-          core.getDirectoryFactory(), true, core.getSchema(),
-          core.getSolrConfig().indexConfig, core.getDeletionPolicy(), core.getCodec(), true);
+
+      RefCounted<IndexWriter> iwRef = null;
+      IndexWriter iw = null;
+      if (cores != null) {
+        SolrCore subCore = cores.get(partitionNumber);
+        iwRef = subCore.getUpdateHandler().getSolrCoreState().getIndexWriter(subCore);
+        iw = iwRef.get();
+      } else {
+        SolrCore core = searcher.getCore();
+        String path = paths.get(partitionNumber);
+        iw = new SolrIndexWriter("SplittingIndexWriter"+partitionNumber + " " + ranges.get(partitionNumber), path,
+                                 core.getDirectoryFactory(), true, core.getSchema(),
+                                 core.getSolrConfig().indexConfig, core.getDeletionPolicy(), core.getCodec(), true);
+      }
 
       try {
+        // This merges the subreaders and will thus remove deletions (i.e. no optimize needed)
         iw.addIndexes(subReaders);
-        // TODO: will many deletes have been removed, or should we optimize?
         success = true;
       } finally {
-        if (success) {
-          IOUtils.close(iw);
+        if (iwRef != null) {
+          iwRef.decref();
         } else {
-          IOUtils.closeWhileHandlingException(iw);
+          if (success) {
+            IOUtils.close(iw);
+          } else {
+            IOUtils.closeWhileHandlingException(iw);
+          }
         }
       }
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java?rev=1380287&r1=1380286&r2=1380287&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java Mon Sep  3 16:36:20 2012
@@ -18,6 +18,7 @@
 package org.apache.solr.update;
 
 import org.apache.solr.common.cloud.HashPartitioner;
+import org.apache.solr.core.SolrCore;
 import org.apache.solr.request.SolrQueryRequest;
 
 import java.util.List;
@@ -31,12 +32,14 @@ import java.util.List;
 public class SplitIndexCommand extends UpdateCommand {
   // public List<Directory> dirs;
   public List<String> paths;
+  public List<SolrCore> cores;  // either paths or cores should be specified
   public List<HashPartitioner.Range> ranges;
   // TODO: allow specification of custom hash function
 
-  public SplitIndexCommand(SolrQueryRequest req, List<String> paths, List<HashPartitioner.Range> ranges) {
+  public SplitIndexCommand(SolrQueryRequest req, List<String> paths,  List<SolrCore> cores, List<HashPartitioner.Range> ranges) {
     super(req);
     this.paths = paths;
+    this.cores = cores;
     this.ranges = ranges;
   }
 
@@ -49,6 +52,7 @@ public class SplitIndexCommand extends U
   public String toString() {
     StringBuilder sb = new StringBuilder(super.toString());
     sb.append(",paths=" + paths);
+    sb.append(",cores=" + cores);
     sb.append(",ranges=" + ranges);
     sb.append('}');
     return sb.toString();

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java?rev=1380287&r1=1380286&r2=1380287&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java Mon Sep  3 16:36:20 2012
@@ -27,6 +27,8 @@ import java.util.List;
 public class HashPartitioner {
 
   // Hash ranges can't currently "wrap" - i.e. max must be greater or equal to min.
+  // TODO: ranges may not be all contiguous in the future (either that or we will
+  // need an extra class to model a collection of ranges)
   public static class Range {
     public int min;  // inclusive
     public int max;  // inclusive
@@ -48,7 +50,12 @@ public class HashPartitioner {
       return null; // TODO
     }
   }
-  
+
+
+
+  public List<Range> partitionRange(int partitions, Range range) {
+    return partitionRange(partitions, range.min, range.max);
+  }
 
   /**
    *