You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2014/01/15 20:33:22 UTC

svn commit: r1558541 - in /lucene/dev/branches/branch_4x: ./ solr/ solr/contrib/ solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/

Author: markrmiller
Date: Wed Jan 15 19:33:21 2014
New Revision: 1558541

URL: http://svn.apache.org/r1558541
Log:
SOLR-1301: Merge in latest solr-map-reduce updates.

Added:
    lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/AlphaNumericComparator.java
      - copied unchanged from r1547871, lucene/dev/trunk/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/AlphaNumericComparator.java
    lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/AlphaNumericComparatorTest.java
      - copied unchanged from r1547871, lucene/dev/trunk/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/AlphaNumericComparatorTest.java
Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/solr/   (props changed)
    lucene/dev/branches/branch_4x/solr/contrib/   (props changed)
    lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/BatchWriter.java
    lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/MapReduceIndexerTool.java
    lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrReducer.java
    lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeMapper.java
    lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
    lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java
    lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MRUnitBase.java
    lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineBasicMiniMRTest.java
    lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineGoLiveMiniMRTest.java
    lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineReducerTest.java

Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/BatchWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/BatchWriter.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/BatchWriter.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/BatchWriter.java Wed Jan 15 19:33:21 2014
@@ -196,8 +196,8 @@ class BatchWriter {
         batchPool.awaitTermination(5, TimeUnit.SECONDS);
       }
     }
-    //reporter.setStatus("Committing Solr");
-    //solr.commit(true, false);
+    context.setStatus("Committing Solr Phase 1");
+    solr.commit(true, false);
     context.setStatus("Optimizing Solr");
     int maxSegments = context.getConfiguration().getInt(SolrOutputFormat.SOLR_RECORD_WRITER_MAX_SEGMENTS, 1);
     LOG.info("Optimizing Solr: forcing merge down to {} segments", maxSegments);
@@ -206,9 +206,9 @@ class BatchWriter {
     context.getCounter(SolrCounters.class.getName(), SolrCounters.PHYSICAL_REDUCER_MERGE_TIME.toString()).increment(System.currentTimeMillis() - start);
     float secs = (System.currentTimeMillis() - start) / 1000.0f;
     LOG.info("Optimizing Solr: done forcing merge down to {} segments in {} secs", maxSegments, secs);
+    context.setStatus("Committing Solr Phase 2");
+    solr.commit(true, false);
     context.setStatus("Shutting down Solr");
-    // TODO is core close needed? - according to TestEmbeddedSolrServer it's not...
-    //core.close();
     solr.shutdown();
   }
 

Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/MapReduceIndexerTool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/MapReduceIndexerTool.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/MapReduceIndexerTool.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/MapReduceIndexerTool.java Wed Jan 15 19:33:21 2014
@@ -33,10 +33,13 @@ import java.io.Writer;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Random;
 
@@ -80,6 +83,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.cloudera.cdk.morphline.base.Fields;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
 
 
 /**
@@ -107,6 +113,10 @@ public class MapReduceIndexerTool extend
    */
   static final class MyArgumentParser {
     
+    private static final String SHOW_NON_SOLR_CLOUD = "--show-non-solr-cloud";
+
+    private boolean showNonSolrCloud = false;
+
     /**
      * Parses the given command line arguments.
      * 
@@ -123,6 +133,8 @@ public class MapReduceIndexerTool extend
         args = new String[] { "--help" };
       }
       
+      showNonSolrCloud = Arrays.asList(args).contains(SHOW_NON_SOLR_CLOUD); // intercept it first
+      
       ArgumentParser parser = ArgumentParsers
         .newArgumentParser("hadoop [GenericOptions]... jar search-mr-*-job.jar " + MapReduceIndexerTool.class.getName(), false)
         .defaultHelp(true)
@@ -297,7 +309,7 @@ public class MapReduceIndexerTool extend
               "specified by --morphline-file. If the --morphline-id option is ommitted the first (i.e. " +
               "top-most) morphline within the config file is used. Example: morphline1");
             
-      Argument solrHomeDirArg = parser.addArgument("--solr-home-dir")
+      Argument solrHomeDirArg = nonSolrCloud(parser.addArgument("--solr-home-dir")
         .metavar("DIR")
         .type(new FileArgumentType() {
           @Override
@@ -312,7 +324,7 @@ public class MapReduceIndexerTool extend
         .required(false)
         .help("Relative or absolute path to a local dir containing Solr conf/ dir and in particular " +
               "conf/solrconfig.xml and optionally also lib/ dir. This directory will be uploaded to each MR task. " +
-              "Example: src/test/resources/solr/minimr");
+              "Example: src/test/resources/solr/minimr"));
         
       Argument updateConflictResolverArg = parser.addArgument("--update-conflict-resolver")
         .metavar("FQCN")
@@ -404,25 +416,20 @@ public class MapReduceIndexerTool extend
         .action(Arguments.storeTrue())
         .help("Turn on verbose output.");
   
+      parser.addArgument(SHOW_NON_SOLR_CLOUD)
+        .action(Arguments.storeTrue())
+        .help("Also show options for Non-SolrCloud mode as part of --help.");
+      
       ArgumentGroup clusterInfoGroup = parser
           .addArgumentGroup("Cluster arguments")
           .description(
               "Arguments that provide information about your Solr cluster. "
-                  + "If you are not using --go-live, pass the --shards argument. If you are building shards for "
-                  + "a Non-SolrCloud cluster, pass the --shard-url argument one or more times. To build indexes for"
-                  + " a replicated cluster with --shard-url, pass replica urls consecutively and also pass --shards. " 
-                  + "If you are building shards for a SolrCloud cluster, pass the --zk-host argument. "
-                  + "Using --go-live requires either --shard-url or --zk-host.");
+            + nonSolrCloud("If you are building shards for a SolrCloud cluster, pass the --zk-host argument. "
+            + "If you are building shards for "
+            + "a Non-SolrCloud cluster, pass the --shard-url argument one or more times. To build indexes for "
+            + "a replicated Non-SolrCloud cluster with --shard-url, pass replica urls consecutively and also pass --shards. "
+            + "Using --go-live requires either --zk-host or --shard-url."));
 
-      Argument shardUrlsArg = clusterInfoGroup.addArgument("--shard-url")
-        .metavar("URL")
-        .type(String.class)
-        .action(Arguments.append())
-        .help("Solr URL to merge resulting shard into if using --go-live. " +
-              "Example: http://solr001.mycompany.com:8983/solr/collection1. " + 
-              "Multiple --shard-url arguments can be specified, one for each desired shard. " +
-              "If you are merging shards into a SolrCloud cluster, use --zk-host instead.");
-      
       Argument zkHostArg = clusterInfoGroup.addArgument("--zk-host")
         .metavar("STRING")
         .type(String.class)
@@ -444,15 +451,24 @@ public class MapReduceIndexerTool extend
             + "would be relative to this root - i.e. getting/setting/etc... "
             + "'/foo/bar' would result in operations being run on "
             + "'/solr/foo/bar' (from the server perspective).\n"
-            + "\n"
+            + nonSolrCloud("\n"
             + "If --solr-home-dir is not specified, the Solr home directory for the collection "
-            + "will be downloaded from this ZooKeeper ensemble.");
+            + "will be downloaded from this ZooKeeper ensemble."));
 
-      Argument shardsArg = clusterInfoGroup.addArgument("--shards")
+      Argument shardUrlsArg = nonSolrCloud(clusterInfoGroup.addArgument("--shard-url")
+        .metavar("URL")
+        .type(String.class)
+        .action(Arguments.append())
+        .help("Solr URL to merge resulting shard into if using --go-live. " +
+              "Example: http://solr001.mycompany.com:8983/solr/collection1. " + 
+              "Multiple --shard-url arguments can be specified, one for each desired shard. " +
+              "If you are merging shards into a SolrCloud cluster, use --zk-host instead."));
+      
+      Argument shardsArg = nonSolrCloud(clusterInfoGroup.addArgument("--shards")
         .metavar("INTEGER")
         .type(Integer.class)
         .choices(new RangeArgumentChoice(1, Integer.MAX_VALUE))
-        .help("Number of output shards to generate.");
+        .help("Number of output shards to generate."));
       
       ArgumentGroup goLiveGroup = parser.addArgumentGroup("Go live arguments")
         .description("Arguments for merging the shards that are built into a live Solr cluster. " +
@@ -462,8 +478,8 @@ public class MapReduceIndexerTool extend
         .action(Arguments.storeTrue())
         .help("Allows you to optionally merge the final index shards into a live Solr cluster after they are built. " +
               "You can pass the ZooKeeper address with --zk-host and the relevant cluster information will be auto detected. " +
-              "If you are not using a SolrCloud cluster, --shard-url arguments can be used to specify each SolrCore to merge " +
-              "each shard into.");
+              nonSolrCloud("If you are not using a SolrCloud cluster, --shard-url arguments can be used to specify each SolrCore to merge " +
+              "each shard into."));
 
       Argument collectionArg = goLiveGroup.addArgument("--collection")
         .metavar("STRING")
@@ -538,6 +554,15 @@ public class MapReduceIndexerTool extend
       return null;     
     }
 
+    // make it a "hidden" option, i.e. the option is functional and enabled but not shown in --help output
+    private Argument nonSolrCloud(Argument arg) {
+        return showNonSolrCloud ? arg : arg.help(FeatureControl.SUPPRESS); 
+    }
+
+    private String nonSolrCloud(String msg) {
+        return showNonSolrCloud ? msg : "";
+    }
+
     /** Marker trick to prevent processing of any remaining arguments once --help option has been parsed */
     private static final class FoundHelpArgument extends RuntimeException {      
     }
@@ -785,7 +810,7 @@ public class MapReduceIndexerTool extend
     job.setOutputValueClass(SolrInputDocumentWritable.class);
     LOG.info("Indexing {} files using {} real mappers into {} reducers", new Object[] {numFiles, realMappers, reducers});
     startTime = System.currentTimeMillis();
-    if (!waitForCompletion(job, true)) {
+    if (!waitForCompletion(job, options.isVerbose)) {
       return -1; // job failed
     }
 
@@ -826,6 +851,9 @@ public class MapReduceIndexerTool extend
       if (!waitForCompletion(job, options.isVerbose)) {
         return -1; // job failed
       }
+      if (!renameTreeMergeShardDirs(outputTreeMergeStep, job, fs)) {
+        return -1;
+      }
       secs = (System.currentTimeMillis() - startTime) / 1000.0f;
       LOG.info("MTree merge iteration {}/{}: Done. Merging {} shards into {} shards using fanout {} took {} secs",
           new Object[] {mtreeMergeIteration, mtreeMergeIterations, reducers, (reducers / options.fanout), options.fanout, secs});
@@ -1182,10 +1210,102 @@ public class MapReduceIndexerTool extend
         throw new IllegalStateException("Not a directory: " + dir.getPath());
       }
     }
-    Arrays.sort(dirs); // FIXME: handle more than 99999 shards (need numeric sort rather than lexicographical sort)
+    
+    // use alphanumeric sort (rather than lexicographical sort) to properly handle more than 99999 shards
+    Arrays.sort(dirs, new Comparator<FileStatus>() {
+      @Override
+      public int compare(FileStatus f1, FileStatus f2) {
+        return new AlphaNumericComparator().compare(f1.getPath().getName(), f2.getPath().getName());
+      }
+    });
+
     return dirs;
   }
 
+  /*
+   * You can run MapReduceIndexerTool in Solrcloud mode, and once the MR job completes, you can use
+   * the standard solrj Solrcloud API to send doc updates and deletes to SolrCloud, and those updates
+   * and deletes will go to the right Solr shards, and it will work just fine.
+   * 
+   * The MapReduce framework doesn't guarantee that input split N goes to the map task with the
+   * taskId = N. The job tracker and Yarn schedule and assign tasks, considering data locality
+   * aspects, but without regard of the input split# withing the overall list of input splits. In
+   * other words, split# != taskId can be true.
+   * 
+   * To deal with this issue, our mapper tasks write a little auxiliary metadata file (per task)
+   * that tells the job driver which taskId processed which split#. Once the mapper-only job is
+   * completed, the job driver renames the output dirs such that the dir name contains the true solr
+   * shard id, based on these auxiliary files.
+   * 
+   * This way each doc gets assigned to the right Solr shard even with #reducers > #solrshards
+   * 
+   * Example for a merge with two shards:
+   * 
+   * part-m-00000 and part-m-00001 goes to outputShardNum = 0 and will end up in merged part-m-00000
+   * part-m-00002 and part-m-00003 goes to outputShardNum = 1 and will end up in merged part-m-00001
+   * part-m-00004 and part-m-00005 goes to outputShardNum = 2 and will end up in merged part-m-00002
+   * ... and so on
+   * 
+   * Also see run() method above where it uses NLineInputFormat.setNumLinesPerSplit(job,
+   * options.fanout)
+   * 
+   * Also see TreeMergeOutputFormat.TreeMergeRecordWriter.writeShardNumberFile()
+   */
+  private boolean renameTreeMergeShardDirs(Path outputTreeMergeStep, Job job, FileSystem fs) throws IOException {
+    final String dirPrefix = SolrOutputFormat.getOutputName(job);
+    FileStatus[] dirs = fs.listStatus(outputTreeMergeStep, new PathFilter() {      
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith(dirPrefix);
+      }
+    });
+    
+    for (FileStatus dir : dirs) {
+      if (!dir.isDirectory()) {
+        throw new IllegalStateException("Not a directory: " + dir.getPath());
+      }
+    }
+
+    // Example: rename part-m-00004 to _part-m-00004
+    for (FileStatus dir : dirs) {
+      Path path = dir.getPath();
+      Path renamedPath = new Path(path.getParent(), "_" + path.getName());
+      if (!rename(path, renamedPath, fs)) {
+        return false;
+      }
+    }
+    
+    // Example: rename _part-m-00004 to part-m-00002
+    for (FileStatus dir : dirs) {
+      Path path = dir.getPath();
+      Path renamedPath = new Path(path.getParent(), "_" + path.getName());
+      
+      // read auxiliary metadata file (per task) that tells which taskId 
+      // processed which split# aka solrShard
+      Path solrShardNumberFile = new Path(renamedPath, TreeMergeMapper.SOLR_SHARD_NUMBER);
+      InputStream in = fs.open(solrShardNumberFile);
+      byte[] bytes = ByteStreams.toByteArray(in);
+      in.close();
+      Preconditions.checkArgument(bytes.length > 0);
+      int solrShard = Integer.parseInt(new String(bytes, Charsets.UTF_8));
+      if (!delete(solrShardNumberFile, false, fs)) {
+        return false;
+      }
+      
+      // same as FileOutputFormat.NUMBER_FORMAT
+      NumberFormat numberFormat = NumberFormat.getInstance(Locale.ENGLISH);
+      numberFormat.setMinimumIntegerDigits(5);
+      numberFormat.setGroupingUsed(false);
+      Path finalPath = new Path(renamedPath.getParent(), dirPrefix + "-m-" + numberFormat.format(solrShard));
+      
+      LOG.info("MTree merge renaming solr shard: " + solrShard + " from dir: " + dir.getPath() + " to dir: " + finalPath);
+      if (!rename(renamedPath, finalPath, fs)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   private static void verifyGoLiveArgs(Options opts, ArgumentParser parser) throws ArgumentParserException {
     if (opts.zkHost == null && opts.solrHomeDir == null) {
       throw new ArgumentParserException("At least one of --zk-host or --solr-home-dir is required", parser);

Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrReducer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrReducer.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrReducer.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrReducer.java Wed Jan 15 19:33:21 2014
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import com.cloudera.cdk.morphline.api.ExceptionHandler;
 import com.cloudera.cdk.morphline.base.FaultTolerance;
+import com.google.common.base.Preconditions;
 
 /**
  * This class loads the mapper's SolrInputDocuments into one EmbeddedSolrServer
@@ -54,6 +55,7 @@ public class SolrReducer extends Reducer
   
   @Override
   protected void setup(Context context) throws IOException, InterruptedException {
+    verifyPartitionAssignment(context);    
     SolrRecordWriter.addReducerContext(context);
     Class<? extends UpdateConflictResolver> resolverClass = context.getConfiguration().getClass(
         UPDATE_CONFLICT_RESOLVER, RetainMostRecentUpdateConflictResolver.class, UpdateConflictResolver.class);
@@ -107,6 +109,24 @@ public class SolrReducer extends Reducer
     super.cleanup(context);
   }
 
+  /*
+   * Verify that if a mappers's partitioner sends an item to partition X it implies that said item
+   * is sent to the reducer with taskID == X. This invariant is currently required for Solr
+   * documents to end up in the right Solr shard.
+   */
+  private void verifyPartitionAssignment(Context context) {
+    if ("true".equals(System.getProperty("verifyPartitionAssignment", "true"))) {
+      String partitionStr = context.getConfiguration().get("mapred.task.partition");
+      if (partitionStr == null) {
+        partitionStr = context.getConfiguration().get("mapreduce.task.partition");
+      }
+      int partition = Integer.parseInt(partitionStr);
+      int taskId = context.getTaskAttemptID().getTaskID().getId();
+      Preconditions.checkArgument(partition == taskId, 
+          "mapred.task.partition: " + partition + " not equal to reducer taskId: " + taskId);      
+    }
+  }
+  
   
   ///////////////////////////////////////////////////////////////////////////////
   // Nested classes:

Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeMapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeMapper.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeMapper.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeMapper.java Wed Jan 15 19:33:21 2014
@@ -34,6 +34,8 @@ public class TreeMergeMapper extends Map
 
   public static final String MAX_SEGMENTS_ON_TREE_MERGE = "maxSegmentsOnTreeMerge";
 
+  public static final String SOLR_SHARD_NUMBER = "_solrShardNumber";
+
   @Override
   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
     LOGGER.trace("map key: {}, value: {}", key, value);

Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java Wed Jan 15 19:33:21 2014
@@ -17,6 +17,9 @@
 package org.apache.solr.hadoop;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -39,6 +42,9 @@ import org.apache.solr.store.hdfs.HdfsDi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+
 /**
  * See {@link IndexMergeTool}.
  */
@@ -84,13 +90,15 @@ public class TreeMergeOutputFormat exten
     
     @Override
     public void close(TaskAttemptContext context) throws IOException {
-      LOG.debug("Merging into dstDir: " + workDir + ", srcDirs: {}", shards);
+      LOG.debug("Task " + context.getTaskAttemptID() + " merging into dstDir: " + workDir + ", srcDirs: " + shards);
+      writeShardNumberFile(context);      
       heartBeater.needHeartBeat();
       try {
         Directory mergedIndex = new HdfsDirectory(workDir, context.getConfiguration());
         
+        // TODO: shouldn't we pull the Version from the solrconfig.xml?
         IndexWriterConfig writerConfig = new IndexWriterConfig(Version.LUCENE_CURRENT, null)
-            .setOpenMode(OpenMode.CREATE)
+            .setOpenMode(OpenMode.CREATE).setUseCompoundFile(false)
             //.setMergePolicy(mergePolicy) // TODO: grab tuned MergePolicy from solrconfig.xml?
             //.setMergeScheduler(...) // TODO: grab tuned MergeScheduler from solrconfig.xml?
             ;
@@ -162,6 +170,27 @@ public class TreeMergeOutputFormat exten
         heartBeater.cancelHeartBeat();
         heartBeater.close();
       }
+    }
+
+    /*
+     * For background see MapReduceIndexerTool.renameTreeMergeShardDirs()
+     * 
+     * Also see MapReduceIndexerTool.run() method where it uses
+     * NLineInputFormat.setNumLinesPerSplit(job, options.fanout)
+     */
+    private void writeShardNumberFile(TaskAttemptContext context) throws IOException {
+      Preconditions.checkArgument(shards.size() > 0);
+      String shard = shards.get(0).getParent().getParent().getName(); // move up from "data/index"
+      String taskId = shard.substring("part-m-".length(), shard.length()); // e.g. part-m-00001
+      int taskNum = Integer.parseInt(taskId);
+      int outputShardNum = taskNum / shards.size();
+      LOG.debug("Merging into outputShardNum: " + outputShardNum + " from taskId: " + taskId);
+      Path shardNumberFile = new Path(workDir.getParent().getParent(), TreeMergeMapper.SOLR_SHARD_NUMBER);
+      OutputStream out = shardNumberFile.getFileSystem(context.getConfiguration()).create(shardNumberFile);
+      Writer writer = new OutputStreamWriter(out, Charsets.UTF_8);
+      writer.write(String.valueOf(outputShardNum));
+      writer.flush();
+      writer.close();
     }    
   }
 }

Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java Wed Jan 15 19:33:21 2014
@@ -117,9 +117,11 @@ final class ZooKeeperInspector {
     Collections.sort(sorted, new Comparator<Slice>() {
       @Override
       public int compare(Slice slice1, Slice slice2) {
-        return slice1.getName().compareTo(slice2.getName());
+        Comparator c = new AlphaNumericComparator();
+        return c.compare(slice1.getName(), slice2.getName());
       }      
     });
+    LOG.trace("Sorted slices: {}", sorted);
     return sorted;
   }
 

Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MRUnitBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MRUnitBase.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MRUnitBase.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MRUnitBase.java Wed Jan 15 19:33:21 2014
@@ -54,6 +54,7 @@ public abstract class MRUnitBase extends
     setupMorphline(tempDir, "test-morphlines/solrCellDocumentTypes");
     
     config.set(MorphlineMapRunner.MORPHLINE_FILE_PARAM, tempDir + "/test-morphlines/solrCellDocumentTypes.conf");
+    config.set(SolrOutputFormat.ZIP_NAME, solrHomeZip.getName());
   }
   
   public static void setupMorphline(String tempDir, String file) throws IOException {

Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineBasicMiniMRTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineBasicMiniMRTest.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineBasicMiniMRTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineBasicMiniMRTest.java Wed Jan 15 19:33:21 2014
@@ -43,6 +43,7 @@ import org.apache.lucene.util.LuceneTest
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.cloud.AbstractZkTestCase;
 import org.apache.solr.hadoop.hack.MiniMRCluster;
+import org.apache.solr.handler.extraction.ExtractingParams;
 import org.apache.solr.util.ExternalPaths;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -323,7 +324,7 @@ public class MorphlineBasicMiniMRTest ex
     jobConf.setMaxMapAttempts(1);
     jobConf.setMaxReduceAttempts(1);
     jobConf.setJar(SEARCH_ARCHIVES_JAR);
-    jobConf.setBoolean("ignoreTikaException", false);
+    jobConf.setBoolean(ExtractingParams.IGNORE_TIKA_EXCEPTION, false);
     
     int shards = 2;
     int maxReducers = Integer.MAX_VALUE;

Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineGoLiveMiniMRTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineGoLiveMiniMRTest.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineGoLiveMiniMRTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineGoLiveMiniMRTest.java Wed Jan 15 19:33:21 2014
@@ -25,9 +25,11 @@ import java.io.Writer;
 import java.lang.reflect.Array;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.io.FileUtils;
@@ -36,16 +38,16 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.lucene.util.Constants;
-import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
 import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrQuery.ORDER;
+import org.apache.solr.client.solrj.SolrServer;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.HttpSolrServer;
@@ -53,6 +55,9 @@ import org.apache.solr.client.solrj.requ
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
 import org.apache.solr.cloud.AbstractZkTestCase;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
@@ -62,12 +67,12 @@ import org.apache.solr.common.params.Mod
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.hadoop.hack.MiniMRClientCluster;
 import org.apache.solr.hadoop.hack.MiniMRClientClusterFactory;
+import org.apache.solr.handler.extraction.ExtractingParams;
 import org.apache.solr.util.ExternalPaths;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
@@ -86,16 +91,16 @@ import com.carrotsearch.randomizedtestin
 @Slow
 public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
   
+  private static final int RECORD_COUNT = 2104;
   private static final String RESOURCES_DIR = ExternalPaths.SOURCE_HOME + "/contrib/map-reduce/src/test-files";  
   private static final String DOCUMENTS_DIR = RESOURCES_DIR + "/test-documents";
   private static final File MINIMR_INSTANCE_DIR = new File(RESOURCES_DIR + "/solr/minimr");
   private static final File MINIMR_CONF_DIR = new File(RESOURCES_DIR + "/solr/minimr");
-
+  
   private static final String SEARCH_ARCHIVES_JAR = JarFinder.getJar(MapReduceIndexerTool.class);
   
   private static MiniDFSCluster dfsCluster = null;
   private static MiniMRClientCluster mrCluster = null;
-  private static int numRuns = 0;
   private static String tempDir;
  
   private final String inputAvroFile1;
@@ -115,17 +120,8 @@ public class MorphlineGoLiveMiniMRTest e
     this.inputAvroFile3 = "sample-statuses-20120906-141433-medium.avro";
     
     fixShardCount = true;
-    sliceCount = TEST_NIGHTLY ? 3 : 3;
-    shardCount = TEST_NIGHTLY ? 3 : 3;
-  }
-  
-  private static boolean isYarn() {
-    try {
-      Job.class.getMethod("getCluster");
-      return true;
-    } catch (NoSuchMethodException e) {
-      return false;
-    }    
+    sliceCount = TEST_NIGHTLY ? 7 : 3;
+    shardCount = TEST_NIGHTLY ? 7 : 3;
   }
   
   @BeforeClass
@@ -371,7 +367,7 @@ public class MorphlineGoLiveMiniMRTest e
     jobConf.setMaxMapAttempts(1);
     jobConf.setMaxReduceAttempts(1);
     jobConf.setJar(SEARCH_ARCHIVES_JAR);
-    jobConf.setBoolean("ignoreTikaException", false);
+    jobConf.setBoolean(ExtractingParams.IGNORE_TIKA_EXCEPTION, false);
 
     MapReduceIndexerTool tool;
     int res;
@@ -384,7 +380,7 @@ public class MorphlineGoLiveMiniMRTest e
         "--output-dir=" + outDir.toString(),
         "--log4j=" + ExternalPaths.SOURCE_HOME + "/core/src/test-files/log4j.properties",
         "--mappers=3",
-        ++numRuns % 2 == 0 ? "--input-list=" + INPATH.toString() : dataDir.toString(), 
+        random().nextBoolean() ? "--input-list=" + INPATH.toString() : dataDir.toString(),  
         "--go-live-threads", Integer.toString(random().nextInt(15) + 1),
         "--verbose",
         "--go-live"
@@ -396,9 +392,7 @@ public class MorphlineGoLiveMiniMRTest e
     
     if (true) {
       tool = new MapReduceIndexerTool();
-      
       res = ToolRunner.run(jobConf, tool, args);
-      
       assertEquals(0, res);
       assertTrue(tool.job.isComplete());
       assertTrue(tool.job.isSuccessful());
@@ -418,7 +412,7 @@ public class MorphlineGoLiveMiniMRTest e
         "--mappers=3",
         "--verbose",
         "--go-live",
-        ++numRuns % 2 == 0 ? "--input-list=" + INPATH.toString() : dataDir.toString(), 
+        random().nextBoolean() ? "--input-list=" + INPATH.toString() : dataDir.toString(), 
         "--go-live-threads", Integer.toString(random().nextInt(15) + 1)
     };
     args = prependInitialArgs(args);
@@ -449,14 +443,19 @@ public class MorphlineGoLiveMiniMRTest e
     fs.delete(outDir, true);  
     fs.delete(dataDir, true);    
     INPATH = upAvroFile(fs, inDir, DATADIR, dataDir, inputAvroFile3);
-    
+
+    cloudClient.deleteByQuery("*:*");
+    cloudClient.commit();
+    assertEquals(0, cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound());      
+
     args = new String[] {
         "--output-dir=" + outDir.toString(),
         "--mappers=3",
-        "--reducers=6",
+        "--reducers=12",
+        "--fanout=2",
         "--verbose",
         "--go-live",
-        ++numRuns % 2 == 0 ? "--input-list=" + INPATH.toString() : dataDir.toString(), 
+        random().nextBoolean() ? "--input-list=" + INPATH.toString() : dataDir.toString(), 
         "--zk-host", zkServer.getZkAddress(), 
         "--collection", collection
     };
@@ -469,15 +468,55 @@ public class MorphlineGoLiveMiniMRTest e
       assertTrue(tool.job.isComplete());
       assertTrue(tool.job.isSuccessful());
       
-      results = server.query(new SolrQuery("*:*"));      
-      assertEquals(2126, results.getResults().getNumFound());
+      SolrDocumentList resultDocs = executeSolrQuery(cloudClient, "*:*");      
+      assertEquals(RECORD_COUNT, resultDocs.getNumFound());
+      assertEquals(RECORD_COUNT, resultDocs.size());
+      
+      // perform updates
+      for (int i = 0; i < RECORD_COUNT; i++) {
+          SolrDocument doc = resultDocs.get(i);
+          SolrInputDocument update = new SolrInputDocument();
+          for (Map.Entry<String, Object> entry : doc.entrySet()) {
+              update.setField(entry.getKey(), entry.getValue());
+          }
+          update.setField("user_screen_name", "Nadja" + i);
+          update.removeField("_version_");
+          cloudClient.add(update);
+      }
+      cloudClient.commit();
+      
+      // verify updates
+      SolrDocumentList resultDocs2 = executeSolrQuery(cloudClient, "*:*");   
+      assertEquals(RECORD_COUNT, resultDocs2.getNumFound());
+      assertEquals(RECORD_COUNT, resultDocs2.size());
+      for (int i = 0; i < RECORD_COUNT; i++) {
+          SolrDocument doc = resultDocs.get(i);
+          SolrDocument doc2 = resultDocs2.get(i);
+          assertEquals(doc.getFirstValue("id"), doc2.getFirstValue("id"));
+          assertEquals("Nadja" + i, doc2.getFirstValue("user_screen_name"));
+          assertEquals(doc.getFirstValue("text"), doc2.getFirstValue("text"));
+          
+          // perform delete
+          cloudClient.deleteById((String)doc.getFirstValue("id"));
+      }
+      cloudClient.commit();
+      
+      // verify deletes
+      assertEquals(0, executeSolrQuery(cloudClient, "*:*").size());
     }    
     
+    cloudClient.deleteByQuery("*:*");
+    cloudClient.commit();
+    assertEquals(0, cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound());      
     server.shutdown();
     
     // try using zookeeper with replication
     String replicatedCollection = "replicated_collection";
-    createCollection(replicatedCollection, 2, 3, 2);
+    if (TEST_NIGHTLY) {
+      createCollection(replicatedCollection, 11, 3, 11);
+    } else {
+      createCollection(replicatedCollection, 2, 3, 2);
+    }
     waitForRecoveriesToFinish(false);
     cloudClient.setDefaultCollection(replicatedCollection);
     fs.delete(inDir, true);   
@@ -490,7 +529,8 @@ public class MorphlineGoLiveMiniMRTest e
         "--solr-home-dir=" + MINIMR_CONF_DIR.getAbsolutePath(),
         "--output-dir=" + outDir.toString(),
         "--mappers=3",
-        "--reducers=6",
+        "--reducers=22",
+        "--fanout=2",
         "--verbose",
         "--go-live",
         "--zk-host", zkServer.getZkAddress(), 
@@ -505,15 +545,51 @@ public class MorphlineGoLiveMiniMRTest e
       assertTrue(tool.job.isComplete());
       assertTrue(tool.job.isSuccessful());
       
-      results = cloudClient.query(new SolrQuery("*:*"));      
-      assertEquals(2104, results.getResults().getNumFound());
+      SolrDocumentList resultDocs = executeSolrQuery(cloudClient, "*:*");   
+      assertEquals(RECORD_COUNT, resultDocs.getNumFound());
+      assertEquals(RECORD_COUNT, resultDocs.size());
       
       checkConsistency(replicatedCollection);
-    }   
+      
+      // perform updates
+      for (int i = 0; i < RECORD_COUNT; i++) {
+          SolrDocument doc = resultDocs.get(i);          
+          SolrInputDocument update = new SolrInputDocument();
+          for (Map.Entry<String, Object> entry : doc.entrySet()) {
+              update.setField(entry.getKey(), entry.getValue());
+          }
+          update.setField("user_screen_name", "@Nadja" + i);
+          update.removeField("_version_");
+          cloudClient.add(update);
+      }
+      cloudClient.commit();
+      
+      // verify updates
+      SolrDocumentList resultDocs2 = executeSolrQuery(cloudClient, "*:*");   
+      assertEquals(RECORD_COUNT, resultDocs2.getNumFound());
+      assertEquals(RECORD_COUNT, resultDocs2.size());
+      for (int i = 0; i < RECORD_COUNT; i++) {
+          SolrDocument doc = resultDocs.get(i);
+          SolrDocument doc2 = resultDocs2.get(i);
+          assertEquals(doc.getFieldValues("id"), doc2.getFieldValues("id"));
+          assertEquals(1, doc.getFieldValues("id").size());
+          assertEquals(Arrays.asList("@Nadja" + i), doc2.getFieldValues("user_screen_name"));
+          assertEquals(doc.getFieldValues("text"), doc2.getFieldValues("text"));
+          
+          // perform delete
+          cloudClient.deleteById((String)doc.getFirstValue("id"));
+      }
+      cloudClient.commit();
+      
+      // verify deletes
+      assertEquals(0, executeSolrQuery(cloudClient, "*:*").size());
+    }
     
     // try using solr_url with replication
     cloudClient.deleteByQuery("*:*");
     cloudClient.commit();
+    assertEquals(0, executeSolrQuery(cloudClient, "*:*").getNumFound());
+    assertEquals(0, executeSolrQuery(cloudClient, "*:*").size());
     fs.delete(inDir, true);    
     fs.delete(dataDir, true);
     assertTrue(fs.mkdirs(dataDir));
@@ -543,8 +619,7 @@ public class MorphlineGoLiveMiniMRTest e
       
       checkConsistency(replicatedCollection);
       
-      results = cloudClient.query(new SolrQuery("*:*"));      
-      assertEquals(2104, results.getResults().getNumFound());
+      assertEquals(RECORD_COUNT, executeSolrQuery(cloudClient, "*:*").size());
     }  
     
   }
@@ -555,6 +630,12 @@ public class MorphlineGoLiveMiniMRTest e
       args.add(cloudJettys.get(i).url);
     }
   }
+  
+  private SolrDocumentList executeSolrQuery(SolrServer collection, String queryString) throws SolrServerException {
+    SolrQuery query = new SolrQuery(queryString).setRows(2 * RECORD_COUNT).addSort("id", ORDER.asc);
+    QueryResponse response = collection.query(query);
+    return response.getResults();
+  }
 
   private void checkConsistency(String replicatedCollection)
       throws SolrServerException {

Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineReducerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineReducerTest.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineReducerTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineReducerTest.java Wed Jan 15 19:33:21 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
 import org.apache.lucene.util.Constants;
 import org.apache.solr.common.SolrInputDocument;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -44,10 +45,17 @@ import com.google.common.collect.Lists;
 public class MorphlineReducerTest extends MRUnitBase {
   
   @BeforeClass
-  public static void beforeClass() {
+  public static void beforeClass2() {
     assumeFalse("Does not work on Windows, because it uses UNIX shell commands or POSIX paths", Constants.WINDOWS);
     assumeFalse("FIXME: This test fails under Java 8 due to the Saxon dependency - see SOLR-1301", Constants.JRE_IS_MINIMUM_JAVA8);
     assumeFalse("FIXME: This test fails under J9 due to the Saxon dependency - see SOLR-1301", System.getProperty("java.vm.info", "<?>").contains("IBM J9"));
+    
+    System.setProperty("verifyPartitionAssignment", "false");
+  }
+  
+  @AfterClass
+  public static void afterClass2() {
+    System.clearProperty("verifyPartitionAssignment");
   }
   
   public static class MySolrReducer extends SolrReducer {
@@ -89,28 +97,35 @@ public class MorphlineReducerTest extend
   @Test
   public void testReducer() throws Exception {
     MySolrReducer myReducer = new MySolrReducer();
-    ReduceDriver<Text, SolrInputDocumentWritable, Text, SolrInputDocumentWritable> reduceDriver = ReduceDriver.newReduceDriver(myReducer);
-
-    Configuration config = reduceDriver.getConfiguration();
-    setupHadoopConfig(config);
-
-    List<SolrInputDocumentWritable> values = new ArrayList<SolrInputDocumentWritable>();
-    SolrInputDocument sid = new SolrInputDocument();
-    String id = "myid1";
-    sid.addField("id", id);
-    sid.addField("text", "some unique text");
-    SolrInputDocumentWritable sidw = new SolrInputDocumentWritable(sid);
-    values.add(sidw);
-    reduceDriver.withInput(new Text(id), values);
-
-    reduceDriver.withCacheArchive(solrHomeZip.getAbsolutePath());
-    
-    reduceDriver.withOutputFormat(SolrOutputFormat.class, NullInputFormat.class);
-
-    reduceDriver.run();
-
-    assertEquals("Expected 1 counter increment", 1, reduceDriver.getCounters()
-        .findCounter(SolrCounters.class.getName(), SolrCounters.DOCUMENTS_WRITTEN.toString()).getValue());
+    try {
+      ReduceDriver<Text,SolrInputDocumentWritable,Text,SolrInputDocumentWritable> reduceDriver = ReduceDriver
+          .newReduceDriver(myReducer);
+      
+      Configuration config = reduceDriver.getConfiguration();
+      setupHadoopConfig(config);
+      
+      List<SolrInputDocumentWritable> values = new ArrayList<SolrInputDocumentWritable>();
+      SolrInputDocument sid = new SolrInputDocument();
+      String id = "myid1";
+      sid.addField("id", id);
+      sid.addField("text", "some unique text");
+      SolrInputDocumentWritable sidw = new SolrInputDocumentWritable(sid);
+      values.add(sidw);
+      reduceDriver.withInput(new Text(id), values);
+      
+      reduceDriver.withCacheArchive(solrHomeZip.getAbsolutePath());
+      
+      reduceDriver.withOutputFormat(SolrOutputFormat.class,
+          NullInputFormat.class);
+      
+      reduceDriver.run();
+      
+      assertEquals("Expected 1 counter increment", 1,
+          reduceDriver.getCounters().findCounter(SolrCounters.class.getName(),
+                  SolrCounters.DOCUMENTS_WRITTEN.toString()).getValue());
+    } finally {
+      myReducer.cleanup(myReducer.context);
+    }
   }
 
 }