You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sa...@apache.org on 2017/03/24 16:37:10 UTC

[29/62] lucene-solr:master: SOLR-9221: Remove Solr contribs: map-reduce, morphlines-core and morphlines-cell

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53e5f34f/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/MapReduceIndexerTool.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/MapReduceIndexerTool.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/MapReduceIndexerTool.java
deleted file mode 100644
index 1c17855..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/MapReduceIndexerTool.java
+++ /dev/null
@@ -1,1388 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-
-import java.io.BufferedInputStream;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.lang.invoke.MethodHandles;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.charset.StandardCharsets;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Random;
-import net.sourceforge.argparse4j.ArgumentParsers;
-import net.sourceforge.argparse4j.impl.Arguments;
-import net.sourceforge.argparse4j.impl.action.HelpArgumentAction;
-import net.sourceforge.argparse4j.impl.choice.RangeArgumentChoice;
-import net.sourceforge.argparse4j.impl.type.FileArgumentType;
-import net.sourceforge.argparse4j.inf.Argument;
-import net.sourceforge.argparse4j.inf.ArgumentGroup;
-import net.sourceforge.argparse4j.inf.ArgumentParser;
-import net.sourceforge.argparse4j.inf.ArgumentParserException;
-import net.sourceforge.argparse4j.inf.FeatureControl;
-import net.sourceforge.argparse4j.inf.Namespace;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.hadoop.dedup.RetainMostRecentUpdateConflictResolver;
-import org.apache.solr.hadoop.morphline.MorphlineMapRunner;
-import org.apache.solr.hadoop.morphline.MorphlineMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.kitesdk.morphline.base.Fields;
-
-import com.google.common.base.Preconditions;
-import com.google.common.io.ByteStreams;
-
-
-/**
- * Public API for a MapReduce batch job driver that creates a set of Solr index shards from a set of
- * input files and writes the indexes into HDFS, in a flexible, scalable and fault-tolerant manner.
- * Also supports merging the output shards into a set of live customer facing Solr servers,
- * typically a SolrCloud.
- */
-public class MapReduceIndexerTool extends Configured implements Tool {
-  
-  Job job; // visible for testing only
-  
-  public static final String RESULTS_DIR = "results";
-
-  static final String MAIN_MEMORY_RANDOMIZATION_THRESHOLD = 
-      MapReduceIndexerTool.class.getName() + ".mainMemoryRandomizationThreshold";
-  
-  private static final String FULL_INPUT_LIST = "full-input-list.txt";
-  
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  
-  /**
-   * See http://argparse4j.sourceforge.net and for details see http://argparse4j.sourceforge.net/usage.html
-   */
-  static final class MyArgumentParser {
-    
-    private static final String SHOW_NON_SOLR_CLOUD = "--show-non-solr-cloud";
-
-    private boolean showNonSolrCloud = false;
-
-    /**
-     * Parses the given command line arguments.
-     * 
-     * @return exitCode null indicates the caller shall proceed with processing,
-     *         non-null indicates the caller shall exit the program with the
-     *         given exit status code.
-     */
-    public Integer parseArgs(String[] args, Configuration conf, Options opts) {
-      assert args != null;
-      assert conf != null;
-      assert opts != null;
-
-      if (args.length == 0) {
-        args = new String[] { "--help" };
-      }
-      
-      showNonSolrCloud = Arrays.asList(args).contains(SHOW_NON_SOLR_CLOUD); // intercept it first
-      
-      ArgumentParser parser = ArgumentParsers
-        .newArgumentParser("hadoop [GenericOptions]... jar solr-map-reduce-*.jar ", false)
-        .defaultHelp(true)
-        .description(
-          "MapReduce batch job driver that takes a morphline and creates a set of Solr index shards from a set of input files " +
-          "and writes the indexes into HDFS, in a flexible, scalable and fault-tolerant manner. " +
-          "It also supports merging the output shards into a set of live customer facing Solr servers, " +
-          "typically a SolrCloud. The program proceeds in several consecutive MapReduce based phases, as follows:" +
-          "\n\n" +
-          "1) Randomization phase: This (parallel) phase randomizes the list of input files in order to spread " +
-          "indexing load more evenly among the mappers of the subsequent phase." +  
-          "\n\n" +
-          "2) Mapper phase: This (parallel) phase takes the input files, extracts the relevant content, transforms it " +
-          "and hands SolrInputDocuments to a set of reducers. " +
-          "The ETL functionality is flexible and " +
-          "customizable using chains of arbitrary morphline commands that pipe records from one transformation command to another. " + 
-          "Commands to parse and transform a set of standard data formats such as Avro, CSV, Text, HTML, XML, " +
-          "PDF, Word, Excel, etc. are provided out of the box, and additional custom commands and parsers for additional " +
-          "file or data formats can be added as morphline plugins. " +
-          "This is done by implementing a simple Java interface that consumes a record (e.g. a file in the form of an InputStream " +
-          "plus some headers plus contextual metadata) and generates as output zero or more records. " + 
-          "Any kind of data format can be indexed and any Solr documents for any kind of Solr schema can be generated, " +
-          "and any custom ETL logic can be registered and executed.\n" +
-          "Record fields, including MIME types, can also explicitly be passed by force from the CLI to the morphline, for example: " +
-          "hadoop ... -D " + MorphlineMapRunner.MORPHLINE_FIELD_PREFIX + Fields.ATTACHMENT_MIME_TYPE + "=text/csv" +
-          "\n\n" +
-          "3) Reducer phase: This (parallel) phase loads the mapper's SolrInputDocuments into one EmbeddedSolrServer per reducer. " +
-          "Each such reducer and Solr server can be seen as a (micro) shard. The Solr servers store their " +
-          "data in HDFS." + 
-          "\n\n" +
-          "4) Mapper-only merge phase: This (parallel) phase merges the set of reducer shards into the number of solr " +
-          "shards expected by the user, using a mapper-only job. This phase is omitted if the number " +
-          "of shards is already equal to the number of shards expected by the user. " +
-          "\n\n" +
-          "5) Go-live phase: This optional (parallel) phase merges the output shards of the previous phase into a set of " +
-          "live customer facing Solr servers, typically a SolrCloud. " +
-          "If this phase is omitted you can explicitly point each Solr server to one of the HDFS output shard directories." +
-          "\n\n" +
-          "Fault Tolerance: Mapper and reducer task attempts are retried on failure per the standard MapReduce semantics. " +
-          "On program startup all data in the --output-dir is deleted if that output directory already exists. " +
-          "If the whole job fails you can retry simply by rerunning the program again using the same arguments." 
-          );
-
-      parser.addArgument("--help", "-help", "-h")
-        .help("Show this help message and exit")
-        .action(new HelpArgumentAction() {
-          @Override
-          public void run(ArgumentParser parser, Argument arg, Map<String, Object> attrs, String flag, Object value) throws ArgumentParserException {
-            parser.printHelp();
-            System.out.println();
-            System.out.print(ToolRunnerHelpFormatter.getGenericCommandUsage());
-            //ToolRunner.printGenericCommandUsage(System.out);
-            System.out.println(
-              "Examples: \n\n" + 
-
-              "# (Re)index an Avro based Twitter tweet file:\n" +
-              "sudo -u hdfs hadoop \\\n" + 
-              "  --config /etc/hadoop/conf.cloudera.mapreduce1 \\\n" +
-              "  jar target/solr-map-reduce-*.jar \\\n" +
-              "  -D 'mapred.child.java.opts=-Xmx500m' \\\n" + 
-//            "  -D 'mapreduce.child.java.opts=-Xmx500m' \\\n" + 
-              "  --log4j src/test/resources/log4j.properties \\\n" + 
-              "  --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \\\n" + 
-              "  --solr-home-dir src/test/resources/solr/minimr \\\n" +
-              "  --output-dir hdfs://c2202.mycompany.com/user/$USER/test \\\n" + 
-              "  --shards 1 \\\n" + 
-              "  hdfs:///user/$USER/test-documents/sample-statuses-20120906-141433.avro\n" +
-              "\n" +
-              "# Go live by merging resulting index shards into a live Solr cluster\n" +
-              "# (explicitly specify Solr URLs - for a SolrCloud cluster see next example):\n" +
-              "sudo -u hdfs hadoop \\\n" + 
-              "  --config /etc/hadoop/conf.cloudera.mapreduce1 \\\n" +
-              "  jar target/solr-map-reduce-*.jar \\\n" +
-              "  -D 'mapred.child.java.opts=-Xmx500m' \\\n" + 
-//            "  -D 'mapreduce.child.java.opts=-Xmx500m' \\\n" + 
-              "  --log4j src/test/resources/log4j.properties \\\n" + 
-              "  --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \\\n" + 
-              "  --solr-home-dir src/test/resources/solr/minimr \\\n" + 
-              "  --output-dir hdfs://c2202.mycompany.com/user/$USER/test \\\n" + 
-              "  --shard-url http://solr001.mycompany.com:8983/solr/collection1 \\\n" + 
-              "  --shard-url http://solr002.mycompany.com:8983/solr/collection1 \\\n" + 
-              "  --go-live \\\n" + 
-              "  hdfs:///user/foo/indir\n" +  
-              "\n" +
-              "# Go live by merging resulting index shards into a live SolrCloud cluster\n" +
-              "# (discover shards and Solr URLs through ZooKeeper):\n" +
-              "sudo -u hdfs hadoop \\\n" + 
-              "  --config /etc/hadoop/conf.cloudera.mapreduce1 \\\n" +
-              "  jar target/solr-map-reduce-*.jar \\\n" +
-              "  -D 'mapred.child.java.opts=-Xmx500m' \\\n" + 
-//            "  -D 'mapreduce.child.java.opts=-Xmx500m' \\\n" + 
-              "  --log4j src/test/resources/log4j.properties \\\n" + 
-              "  --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \\\n" + 
-              "  --output-dir hdfs://c2202.mycompany.com/user/$USER/test \\\n" + 
-              "  --zk-host zk01.mycompany.com:2181/solr \\\n" + 
-              "  --collection collection1 \\\n" + 
-              "  --go-live \\\n" + 
-              "  hdfs:///user/foo/indir\n"
-            );
-            throw new FoundHelpArgument(); // Trick to prevent processing of any remaining arguments
-          }
-        });
-      
-      ArgumentGroup requiredGroup = parser.addArgumentGroup("Required arguments");
-      
-      Argument outputDirArg = requiredGroup.addArgument("--output-dir")
-        .metavar("HDFS_URI")
-        .type(new PathArgumentType(conf) {
-          @Override
-          public Path convert(ArgumentParser parser, Argument arg, String value) throws ArgumentParserException {
-            Path path = super.convert(parser, arg, value);
-            if ("hdfs".equals(path.toUri().getScheme()) && path.toUri().getAuthority() == null) {
-              // TODO: consider defaulting to hadoop's fs.default.name here or in SolrRecordWriter.createEmbeddedSolrServer()
-              throw new ArgumentParserException("Missing authority in path URI: " + path, parser); 
-            }
-            return path;
-          }
-        }.verifyHasScheme().verifyIsAbsolute().verifyCanWriteParent())
-        .required(true)
-        .help("HDFS directory to write Solr indexes to. Inside there one output directory per shard will be generated. " +
-              "Example: hdfs://c2202.mycompany.com/user/$USER/test");
-      
-      Argument inputListArg = parser.addArgument("--input-list")
-        .action(Arguments.append())
-        .metavar("URI")
-  //      .type(new PathArgumentType(fs).verifyExists().verifyCanRead())
-        .type(Path.class)
-        .help("Local URI or HDFS URI of a UTF-8 encoded file containing a list of HDFS URIs to index, " +
-              "one URI per line in the file. If '-' is specified, URIs are read from the standard input. " + 
-              "Multiple --input-list arguments can be specified.");
-        
-      Argument morphlineFileArg = requiredGroup.addArgument("--morphline-file")
-        .metavar("FILE")
-        .type(new FileArgumentType().verifyExists().verifyIsFile().verifyCanRead())
-        .required(true)
-        .help("Relative or absolute path to a local config file that contains one or more morphlines. " +
-              "The file must be UTF-8 encoded. Example: /path/to/morphline.conf");
-          
-      Argument morphlineIdArg = parser.addArgument("--morphline-id")
-        .metavar("STRING")
-        .type(String.class)
-        .help("The identifier of the morphline that shall be executed within the morphline config file " +
-              "specified by --morphline-file. If the --morphline-id option is ommitted the first (i.e. " +
-              "top-most) morphline within the config file is used. Example: morphline1");
-            
-      Argument solrHomeDirArg = nonSolrCloud(parser.addArgument("--solr-home-dir")
-        .metavar("DIR")
-        .type(new FileArgumentType() {
-          @Override
-          public File convert(ArgumentParser parser, Argument arg, String value) throws ArgumentParserException {
-            File solrHomeDir = super.convert(parser, arg, value);
-            File solrConfigFile = new File(new File(solrHomeDir, "conf"), "solrconfig.xml");
-            new FileArgumentType().verifyExists().verifyIsFile().verifyCanRead().convert(
-                parser, arg, solrConfigFile.getPath());
-            return solrHomeDir;
-          }
-        }.verifyIsDirectory().verifyCanRead())
-        .required(false)
-        .help("Relative or absolute path to a local dir containing Solr conf/ dir and in particular " +
-              "conf/solrconfig.xml and optionally also lib/ dir. This directory will be uploaded to each MR task. " +
-              "Example: src/test/resources/solr/minimr"));
-        
-      Argument updateConflictResolverArg = parser.addArgument("--update-conflict-resolver")
-        .metavar("FQCN")
-        .type(String.class)
-        .setDefault(RetainMostRecentUpdateConflictResolver.class.getName())
-        .help("Fully qualified class name of a Java class that implements the UpdateConflictResolver interface. " +
-            "This enables deduplication and ordering of a series of document updates for the same unique document " +
-            "key. For example, a MapReduce batch job might index multiple files in the same job where some of the " +
-            "files contain old and new versions of the very same document, using the same unique document key.\n" +
-            "Typically, implementations of this interface forbid collisions by throwing an exception, or ignore all but " +
-            "the most recent document version, or, in the general case, order colliding updates ascending from least " +
-            "recent to most recent (partial) update. The caller of this interface (i.e. the Hadoop Reducer) will then " +
-            "apply the updates to Solr in the order returned by the orderUpdates() method.\n" +
-            "The default RetainMostRecentUpdateConflictResolver implementation ignores all but the most recent document " +
-            "version, based on a configurable numeric Solr field, which defaults to the file_last_modified timestamp");
-      
-      Argument mappersArg = parser.addArgument("--mappers")
-        .metavar("INTEGER")
-        .type(Integer.class)
-        .choices(new RangeArgumentChoice(-1, Integer.MAX_VALUE)) // TODO: also support X% syntax where X is an integer
-        .setDefault(-1)
-        .help("Tuning knob that indicates the maximum number of MR mapper tasks to use. -1 indicates use all map slots " +
-            "available on the cluster.");
-  
-      Argument reducersArg = parser.addArgument("--reducers")
-        .metavar("INTEGER")
-        .type(Integer.class)
-        .choices(new RangeArgumentChoice(-2, Integer.MAX_VALUE)) // TODO: also support X% syntax where X is an integer
-        .setDefault(-1)
-        .help("Tuning knob that indicates the number of reducers to index into. " +
-            "0 is reserved for a mapper-only feature that may ship in a future release. " +
-            "-1 indicates use all reduce slots available on the cluster. " +
-            "-2 indicates use one reducer per output shard, which disables the mtree merge MR algorithm. " +
-            "The mtree merge MR algorithm improves scalability by spreading load " +
-            "(in particular CPU load) among a number of parallel reducers that can be much larger than the number " +
-            "of solr shards expected by the user. It can be seen as an extension of concurrent lucene merges " +
-            "and tiered lucene merges to the clustered case. The subsequent mapper-only phase " +
-            "merges the output of said large number of reducers to the number of shards expected by the user, " +
-            "again by utilizing more available parallelism on the cluster.");
-
-      Argument fanoutArg = parser.addArgument("--fanout")
-        .metavar("INTEGER")
-        .type(Integer.class)
-        .choices(new RangeArgumentChoice(2, Integer.MAX_VALUE))
-        .setDefault(Integer.MAX_VALUE)
-        .help(FeatureControl.SUPPRESS);
-  
-      Argument maxSegmentsArg = parser.addArgument("--max-segments")
-        .metavar("INTEGER")  
-        .type(Integer.class)
-        .choices(new RangeArgumentChoice(1, Integer.MAX_VALUE))
-        .setDefault(1)
-        .help("Tuning knob that indicates the maximum number of segments to be contained on output in the index of " +
-            "each reducer shard. After a reducer has built its output index it applies a merge policy to merge segments " +
-            "until there are <= maxSegments lucene segments left in this index. " + 
-            "Merging segments involves reading and rewriting all data in all these segment files, " + 
-            "potentially multiple times, which is very I/O intensive and time consuming. " + 
-            "However, an index with fewer segments can later be merged faster, " +
-            "and it can later be queried faster once deployed to a live Solr serving shard. " + 
-            "Set maxSegments to 1 to optimize the index for low query latency. " + 
-            "In a nutshell, a small maxSegments value trades indexing latency for subsequently improved query latency. " + 
-            "This can be a reasonable trade-off for batch indexing systems.");
-      
-      Argument fairSchedulerPoolArg = parser.addArgument("--fair-scheduler-pool")
-        .metavar("STRING")
-        .help("Optional tuning knob that indicates the name of the fair scheduler pool to submit jobs to. " +
-              "The Fair Scheduler is a pluggable MapReduce scheduler that provides a way to share large clusters. " +
-              "Fair scheduling is a method of assigning resources to jobs such that all jobs get, on average, an " +
-              "equal share of resources over time. When there is a single job running, that job uses the entire " +
-              "cluster. When other jobs are submitted, tasks slots that free up are assigned to the new jobs, so " +
-              "that each job gets roughly the same amount of CPU time. Unlike the default Hadoop scheduler, which " +
-              "forms a queue of jobs, this lets short jobs finish in reasonable time while not starving long jobs. " +
-              "It is also an easy way to share a cluster between multiple of users. Fair sharing can also work with " +
-              "job priorities - the priorities are used as weights to determine the fraction of total compute time " +
-              "that each job gets.");
-  
-      Argument dryRunArg = parser.addArgument("--dry-run")
-        .action(Arguments.storeTrue())
-        .help("Run in local mode and print documents to stdout instead of loading them into Solr. This executes " +
-              "the morphline in the client process (without submitting a job to MR) for quicker turnaround during " +
-              "early trial & debug sessions.");
-    
-      Argument log4jConfigFileArg = parser.addArgument("--log4j")
-        .metavar("FILE")
-        .type(new FileArgumentType().verifyExists().verifyIsFile().verifyCanRead())
-        .help("Relative or absolute path to a log4j.properties config file on the local file system. This file " +
-            "will be uploaded to each MR task. Example: /path/to/log4j.properties");
-    
-      Argument verboseArg = parser.addArgument("--verbose", "-v")
-        .action(Arguments.storeTrue())
-        .help("Turn on verbose output.");
-  
-      parser.addArgument(SHOW_NON_SOLR_CLOUD)
-        .action(Arguments.storeTrue())
-        .help("Also show options for Non-SolrCloud mode as part of --help.");
-      
-      ArgumentGroup clusterInfoGroup = parser
-          .addArgumentGroup("Cluster arguments")
-          .description(
-              "Arguments that provide information about your Solr cluster. "
-            + nonSolrCloud("If you are building shards for a SolrCloud cluster, pass the --zk-host argument. "
-            + "If you are building shards for "
-            + "a Non-SolrCloud cluster, pass the --shard-url argument one or more times. To build indexes for "
-            + "a replicated Non-SolrCloud cluster with --shard-url, pass replica urls consecutively and also pass --shards. "
-            + "Using --go-live requires either --zk-host or --shard-url."));
-
-      Argument zkHostArg = clusterInfoGroup.addArgument("--zk-host")
-        .metavar("STRING")
-        .type(String.class)
-        .help("The address of a ZooKeeper ensemble being used by a SolrCloud cluster. "
-            + "This ZooKeeper ensemble will be examined to determine the number of output "
-            + "shards to create as well as the Solr URLs to merge the output shards into when using the --go-live option. "
-            + "Requires that you also pass the --collection to merge the shards into.\n"
-            + "\n"
-            + "The --zk-host option implements the same partitioning semantics as the standard SolrCloud " 
-            + "Near-Real-Time (NRT) API. This enables to mix batch updates from MapReduce ingestion with "
-            + "updates from standard Solr NRT ingestion on the same SolrCloud cluster, "
-            + "using identical unique document keys.\n"
-            + "\n"
-            + "Format is: a list of comma separated host:port pairs, each corresponding to a zk "
-            + "server. Example: '127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183' If "
-            + "the optional chroot suffix is used the example would look "
-            + "like: '127.0.0.1:2181/solr,127.0.0.1:2182/solr,127.0.0.1:2183/solr' "
-            + "where the client would be rooted at '/solr' and all paths "
-            + "would be relative to this root - i.e. getting/setting/etc... "
-            + "'/foo/bar' would result in operations being run on "
-            + "'/solr/foo/bar' (from the server perspective).\n"
-            + nonSolrCloud("\n"
-            + "If --solr-home-dir is not specified, the Solr home directory for the collection "
-            + "will be downloaded from this ZooKeeper ensemble."));
-
-      Argument shardUrlsArg = nonSolrCloud(clusterInfoGroup.addArgument("--shard-url")
-        .metavar("URL")
-        .type(String.class)
-        .action(Arguments.append())
-        .help("Solr URL to merge resulting shard into if using --go-live. " +
-              "Example: http://solr001.mycompany.com:8983/solr/collection1. " + 
-              "Multiple --shard-url arguments can be specified, one for each desired shard. " +
-              "If you are merging shards into a SolrCloud cluster, use --zk-host instead."));
-      
-      Argument shardsArg = nonSolrCloud(clusterInfoGroup.addArgument("--shards")
-        .metavar("INTEGER")
-        .type(Integer.class)
-        .choices(new RangeArgumentChoice(1, Integer.MAX_VALUE))
-        .help("Number of output shards to generate."));
-      
-      ArgumentGroup goLiveGroup = parser.addArgumentGroup("Go live arguments")
-        .description("Arguments for merging the shards that are built into a live Solr cluster. " +
-                     "Also see the Cluster arguments.");
-
-      Argument goLiveArg = goLiveGroup.addArgument("--go-live")
-        .action(Arguments.storeTrue())
-        .help("Allows you to optionally merge the final index shards into a live Solr cluster after they are built. " +
-              "You can pass the ZooKeeper address with --zk-host and the relevant cluster information will be auto detected. " +
-              nonSolrCloud("If you are not using a SolrCloud cluster, --shard-url arguments can be used to specify each SolrCore to merge " +
-              "each shard into."));
-
-      Argument collectionArg = goLiveGroup.addArgument("--collection")
-        .metavar("STRING")
-        .help("The SolrCloud collection to merge shards into when using --go-live and --zk-host. Example: collection1");
-      
-      Argument goLiveThreadsArg = goLiveGroup.addArgument("--go-live-threads")
-        .metavar("INTEGER")
-        .type(Integer.class)
-        .choices(new RangeArgumentChoice(1, Integer.MAX_VALUE))
-        .setDefault(1000)
-        .help("Tuning knob that indicates the maximum number of live merges to run in parallel at one time.");
-      
-      // trailing positional arguments
-      Argument inputFilesArg = parser.addArgument("input-files")
-        .metavar("HDFS_URI")
-        .type(new PathArgumentType(conf).verifyHasScheme().verifyExists().verifyCanRead())
-        .nargs("*")
-        .setDefault()
-        .help("HDFS URI of file or directory tree to index.");
-          
-      Namespace ns;
-      try {
-        ns = parser.parseArgs(args);
-      } catch (FoundHelpArgument e) {
-        return 0;
-      } catch (ArgumentParserException e) {
-        parser.handleError(e);
-        return 1;
-      }
-      
-      opts.log4jConfigFile = (File) ns.get(log4jConfigFileArg.getDest());
-      if (opts.log4jConfigFile != null) {
-        Utils.configureLog4jProperties(opts.log4jConfigFile.getPath());
-      }
-      LOG.debug("Parsed command line args: {}", ns);
-      
-      opts.inputLists = ns.getList(inputListArg.getDest());
-      if (opts.inputLists == null) {
-        opts.inputLists = Collections.EMPTY_LIST;
-      }
-      opts.inputFiles = ns.getList(inputFilesArg.getDest());
-      opts.outputDir = (Path) ns.get(outputDirArg.getDest());
-      opts.mappers = ns.getInt(mappersArg.getDest());
-      opts.reducers = ns.getInt(reducersArg.getDest());
-      opts.updateConflictResolver = ns.getString(updateConflictResolverArg.getDest());
-      opts.fanout = ns.getInt(fanoutArg.getDest());
-      opts.maxSegments = ns.getInt(maxSegmentsArg.getDest());
-      opts.morphlineFile = (File) ns.get(morphlineFileArg.getDest());
-      opts.morphlineId = ns.getString(morphlineIdArg.getDest());
-      opts.solrHomeDir = (File) ns.get(solrHomeDirArg.getDest());
-      opts.fairSchedulerPool = ns.getString(fairSchedulerPoolArg.getDest());
-      opts.isDryRun = ns.getBoolean(dryRunArg.getDest());
-      opts.isVerbose = ns.getBoolean(verboseArg.getDest());
-      opts.zkHost = ns.getString(zkHostArg.getDest());
-      opts.shards = ns.getInt(shardsArg.getDest());
-      opts.shardUrls = buildShardUrls(ns.getList(shardUrlsArg.getDest()), opts.shards);
-      opts.goLive = ns.getBoolean(goLiveArg.getDest());
-      opts.goLiveThreads = ns.getInt(goLiveThreadsArg.getDest());
-      opts.collection = ns.getString(collectionArg.getDest());
-
-      try {
-        if (opts.reducers == 0) {
-          throw new ArgumentParserException("--reducers must not be zero", parser); 
-        }
-        verifyGoLiveArgs(opts, parser);
-      } catch (ArgumentParserException e) {
-        parser.handleError(e);
-        return 1;
-      }
-
-      if (opts.inputLists.isEmpty() && opts.inputFiles.isEmpty()) {
-        LOG.info("No input files specified - nothing to process");
-        return 0; // nothing to process
-      }
-      return null;     
-    }
-
-    // make it a "hidden" option, i.e. the option is functional and enabled but not shown in --help output
-    private Argument nonSolrCloud(Argument arg) {
-        return showNonSolrCloud ? arg : arg.help(FeatureControl.SUPPRESS); 
-    }
-
-    private String nonSolrCloud(String msg) {
-        return showNonSolrCloud ? msg : "";
-    }
-
-    /** Marker trick to prevent processing of any remaining arguments once --help option has been parsed */
-    private static final class FoundHelpArgument extends RuntimeException {      
-    }
-  }
-  // END OF INNER CLASS  
-
-  static List<List<String>> buildShardUrls(List<Object> urls, Integer numShards) {
-    if (urls == null) return null;
-    List<List<String>> shardUrls = new ArrayList<>(urls.size());
-    List<String> list = null;
-    
-    int sz;
-    if (numShards == null) {
-      numShards = urls.size();
-    }
-    sz = (int) Math.ceil(urls.size() / (float)numShards);
-    for (int i = 0; i < urls.size(); i++) {
-      if (i % sz == 0) {
-        list = new ArrayList<>();
-        shardUrls.add(list);
-      }
-      list.add((String) urls.get(i));
-    }
-
-    return shardUrls;
-  }
-  
-  static final class Options {    
-    boolean goLive;
-    String collection;
-    String zkHost;
-    Integer goLiveThreads;
-    List<List<String>> shardUrls;
-    List<Path> inputLists;
-    List<Path> inputFiles;
-    Path outputDir;
-    int mappers;
-    int reducers;
-    String updateConflictResolver;
-    int fanout;
-    Integer shards;
-    int maxSegments;
-    File morphlineFile;
-    String morphlineId;
-    File solrHomeDir;
-    String fairSchedulerPool;
-    boolean isDryRun;
-    File log4jConfigFile;
-    boolean isVerbose;
-  }
-  // END OF INNER CLASS  
-
-  
-  /** API for command line clients */
-  public static void main(String[] args) throws Exception  {
-    int res = ToolRunner.run(new Configuration(), new MapReduceIndexerTool(), args);
-    System.exit(res);
-  }
-
-  public MapReduceIndexerTool() {}
-
-  @Override
-  public int run(String[] args) throws Exception {
-    Options opts = new Options();
-    Integer exitCode = new MyArgumentParser().parseArgs(args, getConf(), opts);
-    if (exitCode != null) {
-      return exitCode;
-    }
-    return run(opts);
-  }
-  
-  /** API for Java clients; visible for testing; may become a public API eventually */
-  int run(Options options) throws Exception {
-    if (getConf().getBoolean("isMR1", false) && "local".equals(getConf().get("mapred.job.tracker"))) {
-      throw new IllegalStateException(
-        "Running with LocalJobRunner (i.e. all of Hadoop inside a single JVM) is not supported " +
-        "because LocalJobRunner does not (yet) implement the Hadoop Distributed Cache feature, " +
-        "which is required for passing files via --files and --libjars");
-    }
-
-    long programStartTime = System.nanoTime();
-    if (options.fairSchedulerPool != null) {
-      getConf().set("mapred.fairscheduler.pool", options.fairSchedulerPool);
-    }
-    getConf().setInt(SolrOutputFormat.SOLR_RECORD_WRITER_MAX_SEGMENTS, options.maxSegments);
-    
-    // switch off a false warning about allegedly not implementing Tool
-    // also see http://hadoop.6.n7.nabble.com/GenericOptionsParser-warning-td8103.html
-    // also see https://issues.apache.org/jira/browse/HADOOP-8183
-    getConf().setBoolean("mapred.used.genericoptionsparser", true);
-    
-    if (options.log4jConfigFile != null) {
-      Utils.setLogConfigFile(options.log4jConfigFile, getConf());
-      addDistributedCacheFile(options.log4jConfigFile, getConf());
-    }
-
-    job = Job.getInstance(getConf());
-    job.setJarByClass(getClass());
-
-    if (options.morphlineFile == null) {
-      throw new ArgumentParserException("Argument --morphline-file is required", null);
-    }
-    verifyGoLiveArgs(options, null);
-    verifyZKStructure(options, null);
-    
-    int mappers = new JobClient(job.getConfiguration()).getClusterStatus().getMaxMapTasks(); // MR1
-    //int mappers = job.getCluster().getClusterStatus().getMapSlotCapacity(); // Yarn only
-    LOG.info("Cluster reports {} mapper slots", mappers);
-    
-    if (options.mappers == -1) { 
-      mappers = 8 * mappers; // better accomodate stragglers
-    } else {
-      mappers = options.mappers;
-    }
-    if (mappers <= 0) {
-      throw new IllegalStateException("Illegal number of mappers: " + mappers);
-    }
-    options.mappers = mappers;
-    
-    FileSystem fs = options.outputDir.getFileSystem(job.getConfiguration());
-    if (fs.exists(options.outputDir) && !delete(options.outputDir, true, fs)) {
-      return -1;
-    }
-    Path outputResultsDir = new Path(options.outputDir, RESULTS_DIR);
-    Path outputReduceDir = new Path(options.outputDir, "reducers");
-    Path outputStep1Dir = new Path(options.outputDir, "tmp1");    
-    Path outputStep2Dir = new Path(options.outputDir, "tmp2");    
-    Path outputTreeMergeStep = new Path(options.outputDir, "mtree-merge-output");
-    Path fullInputList = new Path(outputStep1Dir, FULL_INPUT_LIST);
-    
-    LOG.debug("Creating list of input files for mappers: {}", fullInputList);
-    long numFiles = addInputFiles(options.inputFiles, options.inputLists, fullInputList, job.getConfiguration());
-    if (numFiles == 0) {
-      LOG.info("No input files found - nothing to process");
-      return 0;
-    }
-    int numLinesPerSplit = (int) ceilDivide(numFiles, mappers);
-    if (numLinesPerSplit < 0) { // numeric overflow from downcasting long to int?
-      numLinesPerSplit = Integer.MAX_VALUE;
-    }
-    numLinesPerSplit = Math.max(1, numLinesPerSplit);
-
-    int realMappers = Math.min(mappers, (int) ceilDivide(numFiles, numLinesPerSplit));
-    calculateNumReducers(options, realMappers);
-    int reducers = options.reducers;
-    LOG.info("Using these parameters: " +
-        "numFiles: {}, mappers: {}, realMappers: {}, reducers: {}, shards: {}, fanout: {}, maxSegments: {}",
-        new Object[] {numFiles, mappers, realMappers, reducers, options.shards, options.fanout, options.maxSegments});
-        
-    
-    LOG.info("Randomizing list of {} input files to spread indexing load more evenly among mappers", numFiles);
-    long startTime = System.nanoTime();      
-    if (numFiles < job.getConfiguration().getInt(MAIN_MEMORY_RANDOMIZATION_THRESHOLD, 100001)) {
-      // If there are few input files reduce latency by directly running main memory randomization 
-      // instead of launching a high latency MapReduce job
-      randomizeFewInputFiles(fs, outputStep2Dir, fullInputList);
-    } else {
-      // Randomize using a MapReduce job. Use sequential algorithm below a certain threshold because there's no
-      // benefit in using many parallel mapper tasks just to randomize the order of a few lines each
-      int numLinesPerRandomizerSplit = Math.max(10 * 1000 * 1000, numLinesPerSplit);
-      Job randomizerJob = randomizeManyInputFiles(getConf(), fullInputList, outputStep2Dir, numLinesPerRandomizerSplit);
-      if (!waitForCompletion(randomizerJob, options.isVerbose)) {
-        return -1; // job failed
-      }
-    }
-    float secs = (System.nanoTime() - startTime) / (float)(10^9);
-    LOG.info("Done. Randomizing list of {} input files took {} secs", numFiles, secs);
-    
-    
-    job.setInputFormatClass(NLineInputFormat.class);
-    NLineInputFormat.addInputPath(job, outputStep2Dir);
-    NLineInputFormat.setNumLinesPerSplit(job, numLinesPerSplit);    
-    FileOutputFormat.setOutputPath(job, outputReduceDir);
-    
-    String mapperClass = job.getConfiguration().get(JobContext.MAP_CLASS_ATTR);
-    if (mapperClass == null) { // enable customization
-      Class clazz = MorphlineMapper.class;
-      mapperClass = clazz.getName();
-      job.setMapperClass(clazz);
-    }
-    job.setJobName(getClass().getName() + "/" + Utils.getShortClassName(mapperClass));
-    
-    if (job.getConfiguration().get(JobContext.REDUCE_CLASS_ATTR) == null) { // enable customization
-      job.setReducerClass(SolrReducer.class);
-    }
-    if (options.updateConflictResolver == null) {
-      throw new IllegalArgumentException("updateConflictResolver must not be null");
-    }
-    job.getConfiguration().set(SolrReducer.UPDATE_CONFLICT_RESOLVER, options.updateConflictResolver);
-    
-    if (options.zkHost != null) {
-      assert options.collection != null;
-      /*
-       * MapReduce partitioner that partitions the Mapper output such that each
-       * SolrInputDocument gets sent to the SolrCloud shard that it would have
-       * been sent to if the document were ingested via the standard SolrCloud
-       * Near Real Time (NRT) API.
-       * 
-       * In other words, this class implements the same partitioning semantics
-       * as the standard SolrCloud NRT API. This enables to mix batch updates
-       * from MapReduce ingestion with updates from standard NRT ingestion on
-       * the same SolrCloud cluster, using identical unique document keys.
-       */
-      if (job.getConfiguration().get(JobContext.PARTITIONER_CLASS_ATTR) == null) { // enable customization
-        job.setPartitionerClass(SolrCloudPartitioner.class);
-      }
-      job.getConfiguration().set(SolrCloudPartitioner.ZKHOST, options.zkHost);
-      job.getConfiguration().set(SolrCloudPartitioner.COLLECTION, options.collection);
-    }
-    job.getConfiguration().setInt(SolrCloudPartitioner.SHARDS, options.shards);
-
-    job.setOutputFormatClass(SolrOutputFormat.class);
-    if (options.solrHomeDir != null) {
-      SolrOutputFormat.setupSolrHomeCache(options.solrHomeDir, job);
-    } else {
-      assert options.zkHost != null;
-      // use the config that this collection uses for the SolrHomeCache.
-      ZooKeeperInspector zki = new ZooKeeperInspector();
-      SolrZkClient zkClient = zki.getZkClient(options.zkHost);
-      try {
-        String configName = zki.readConfigName(zkClient, options.collection);
-        File tmpSolrHomeDir = zki.downloadConfigDir(zkClient, configName);
-        SolrOutputFormat.setupSolrHomeCache(tmpSolrHomeDir, job);
-        options.solrHomeDir = tmpSolrHomeDir;
-      } finally {
-        zkClient.close();
-      }
-    }
-    
-    MorphlineMapRunner runner = setupMorphline(options);
-    if (options.isDryRun && runner != null) {
-      LOG.info("Indexing {} files in dryrun mode", numFiles);
-      startTime = System.nanoTime();
-      dryRun(runner, fs, fullInputList);
-      secs = (System.nanoTime() - startTime) / (float)(10^9);
-      LOG.info("Done. Indexing {} files in dryrun mode took {} secs", numFiles, secs);
-      goodbye(null, programStartTime);
-      return 0;
-    }          
-    job.getConfiguration().set(MorphlineMapRunner.MORPHLINE_FILE_PARAM, options.morphlineFile.getName());
-
-    job.setNumReduceTasks(reducers);  
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(SolrInputDocumentWritable.class);
-    LOG.info("Indexing {} files using {} real mappers into {} reducers", new Object[] {numFiles, realMappers, reducers});
-    startTime = System.nanoTime();
-    if (!waitForCompletion(job, options.isVerbose)) {
-      return -1; // job failed
-    }
-
-    secs = (System.nanoTime() - startTime) / (float)(10^9);
-    LOG.info("Done. Indexing {} files using {} real mappers into {} reducers took {} secs", new Object[] {numFiles, realMappers, reducers, secs});
-
-    int mtreeMergeIterations = 0;
-    if (reducers > options.shards) {
-      mtreeMergeIterations = (int) Math.round(log(options.fanout, reducers / options.shards));
-    }
-    LOG.debug("MTree merge iterations to do: {}", mtreeMergeIterations);
-    int mtreeMergeIteration = 1;
-    while (reducers > options.shards) { // run a mtree merge iteration
-      job = Job.getInstance(getConf());
-      job.setJarByClass(getClass());
-      job.setJobName(getClass().getName() + "/" + Utils.getShortClassName(TreeMergeMapper.class));
-      job.setMapperClass(TreeMergeMapper.class);
-      job.setOutputFormatClass(TreeMergeOutputFormat.class);
-      job.setNumReduceTasks(0);  
-      job.setOutputKeyClass(Text.class);
-      job.setOutputValueClass(NullWritable.class);    
-      job.setInputFormatClass(NLineInputFormat.class);
-      
-      Path inputStepDir = new Path(options.outputDir, "mtree-merge-input-iteration" + mtreeMergeIteration);
-      fullInputList = new Path(inputStepDir, FULL_INPUT_LIST);    
-      LOG.debug("MTree merge iteration {}/{}: Creating input list file for mappers {}", new Object[] {mtreeMergeIteration, mtreeMergeIterations, fullInputList});
-      numFiles = createTreeMergeInputDirList(outputReduceDir, fs, fullInputList);    
-      if (numFiles != reducers) {
-        throw new IllegalStateException("Not same reducers: " + reducers + ", numFiles: " + numFiles);
-      }
-      NLineInputFormat.addInputPath(job, fullInputList);
-      NLineInputFormat.setNumLinesPerSplit(job, options.fanout);    
-      FileOutputFormat.setOutputPath(job, outputTreeMergeStep);
-      
-      LOG.info("MTree merge iteration {}/{}: Merging {} shards into {} shards using fanout {}", new Object[] { 
-          mtreeMergeIteration, mtreeMergeIterations, reducers, (reducers / options.fanout), options.fanout});
-      startTime = System.nanoTime();
-      if (!waitForCompletion(job, options.isVerbose)) {
-        return -1; // job failed
-      }
-      if (!renameTreeMergeShardDirs(outputTreeMergeStep, job, fs)) {
-        return -1;
-      }
-      secs = (System.nanoTime() - startTime) / (float)(10^9);
-      LOG.info("MTree merge iteration {}/{}: Done. Merging {} shards into {} shards using fanout {} took {} secs",
-          new Object[] {mtreeMergeIteration, mtreeMergeIterations, reducers, (reducers / options.fanout), options.fanout, secs});
-      
-      if (!delete(outputReduceDir, true, fs)) {
-        return -1;
-      }
-      if (!rename(outputTreeMergeStep, outputReduceDir, fs)) {
-        return -1;
-      }
-      assert reducers % options.fanout == 0;
-      reducers = reducers / options.fanout;
-      mtreeMergeIteration++;
-    }
-    assert reducers == options.shards;
-    
-    // normalize output shard dir prefix, i.e.
-    // rename part-r-00000 to part-00000 (stems from zero tree merge iterations)
-    // rename part-m-00000 to part-00000 (stems from > 0 tree merge iterations)
-    for (FileStatus stats : fs.listStatus(outputReduceDir)) {
-      String dirPrefix = SolrOutputFormat.getOutputName(job);
-      Path srcPath = stats.getPath();
-      if (stats.isDirectory() && srcPath.getName().startsWith(dirPrefix)) {
-        String dstName = dirPrefix + srcPath.getName().substring(dirPrefix.length() + "-m".length());
-        Path dstPath = new Path(srcPath.getParent(), dstName);
-        if (!rename(srcPath, dstPath, fs)) {
-          return -1;
-        }        
-      }
-    };    
-    
-    // publish results dir    
-    if (!rename(outputReduceDir, outputResultsDir, fs)) {
-      return -1;
-    }
-
-    if (options.goLive && !new GoLive().goLive(options, listSortedOutputShardDirs(outputResultsDir, fs))) {
-      return -1;
-    }
-    
-    goodbye(job, programStartTime);    
-    return 0;
-  }
-
-  private void calculateNumReducers(Options options, int realMappers) throws IOException {
-    if (options.shards <= 0) {
-      throw new IllegalStateException("Illegal number of shards: " + options.shards);
-    }
-    if (options.fanout <= 1) {
-      throw new IllegalStateException("Illegal fanout: " + options.fanout);
-    }
-    if (realMappers <= 0) {
-      throw new IllegalStateException("Illegal realMappers: " + realMappers);
-    }
-    
-
-    int reducers = new JobClient(job.getConfiguration()).getClusterStatus().getMaxReduceTasks(); // MR1
-    //reducers = job.getCluster().getClusterStatus().getReduceSlotCapacity(); // Yarn only      
-    LOG.info("Cluster reports {} reduce slots", reducers);
-
-    if (options.reducers == -2) {
-      reducers = options.shards;
-    } else if (options.reducers == -1) {
-      reducers = Math.min(reducers, realMappers); // no need to use many reducers when using few mappers
-    } else {
-      if (options.reducers == 0) {
-        throw new IllegalStateException("Illegal zero reducers");
-      }
-      reducers = options.reducers;
-    }
-    reducers = Math.max(reducers, options.shards);
-    
-    if (reducers != options.shards) {
-      // Ensure fanout isn't misconfigured. fanout can't meaningfully be larger than what would be 
-      // required to merge all leaf shards in one single tree merge iteration into root shards
-      options.fanout = Math.min(options.fanout, (int) ceilDivide(reducers, options.shards));
-      
-      // Ensure invariant reducers == options.shards * (fanout ^ N) where N is an integer >= 1.
-      // N is the number of mtree merge iterations.
-      // This helps to evenly spread docs among root shards and simplifies the impl of the mtree merge algorithm.
-      int s = options.shards;
-      while (s < reducers) { 
-        s = s * options.fanout;
-      }
-      reducers = s;
-      assert reducers % options.fanout == 0;
-    }
-    options.reducers = reducers;
-  }
-  
-  private long addInputFiles(List<Path> inputFiles, List<Path> inputLists, Path fullInputList, Configuration conf) 
-      throws IOException {
-    
-    long numFiles = 0;
-    FileSystem fs = fullInputList.getFileSystem(conf);
-    FSDataOutputStream out = fs.create(fullInputList);
-    try {
-      Writer writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8));
-      
-      for (Path inputFile : inputFiles) {
-        FileSystem inputFileFs = inputFile.getFileSystem(conf);
-        if (inputFileFs.exists(inputFile)) {
-          PathFilter pathFilter = new PathFilter() {      
-            @Override
-            public boolean accept(Path path) { // ignore "hidden" files and dirs
-              return !(path.getName().startsWith(".") || path.getName().startsWith("_")); 
-            }
-          };
-          numFiles += addInputFilesRecursively(inputFile, writer, inputFileFs, pathFilter);
-        }
-      }
-
-      for (Path inputList : inputLists) {
-        InputStream in;
-        if (inputList.toString().equals("-")) {
-          in = System.in;
-        } else if (inputList.isAbsoluteAndSchemeAuthorityNull()) {
-          in = new BufferedInputStream(new FileInputStream(inputList.toString()));
-        } else {
-          in = inputList.getFileSystem(conf).open(inputList);
-        }
-        try {
-          BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
-          String line;
-          while ((line = reader.readLine()) != null) {
-            writer.write(line + "\n");
-            numFiles++;
-          }
-          reader.close();
-        } finally {
-          in.close();
-        }
-      }
-      
-      writer.close();
-    } finally {
-      out.close();
-    }    
-    return numFiles;
-  }
-  
-  /**
-   * Add the specified file to the input set, if path is a directory then
-   * add the files contained therein.
-   */
-  private long addInputFilesRecursively(Path path, Writer writer, FileSystem fs, PathFilter pathFilter) throws IOException {
-    long numFiles = 0;
-    for (FileStatus stat : fs.listStatus(path, pathFilter)) {
-      LOG.debug("Adding path {}", stat.getPath());
-      if (stat.isDirectory()) {
-        numFiles += addInputFilesRecursively(stat.getPath(), writer, fs, pathFilter);
-      } else {
-        writer.write(stat.getPath().toString() + "\n");
-        numFiles++;
-      }
-    }
-    return numFiles;
-  }
-  
-  private void randomizeFewInputFiles(FileSystem fs, Path outputStep2Dir, Path fullInputList) throws IOException {    
-    List<String> lines = new ArrayList();
-    BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(fullInputList), StandardCharsets.UTF_8));
-    try {
-      String line;
-      while ((line = reader.readLine()) != null) {
-        lines.add(line);
-      }
-    } finally {
-      reader.close();
-    }
-    
-    Collections.shuffle(lines, new Random(421439783L)); // constant seed for reproducability
-    
-    FSDataOutputStream out = fs.create(new Path(outputStep2Dir, FULL_INPUT_LIST));
-    Writer writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8));
-    try {
-      for (String line : lines) {
-        writer.write(line + "\n");
-      } 
-    } finally {
-      writer.close();
-    }
-  }
-
-  /**
-   * To uniformly spread load across all mappers we randomize fullInputList
-   * with a separate small Mapper & Reducer preprocessing step. This way
-   * each input line ends up on a random position in the output file list.
-   * Each mapper indexes a disjoint consecutive set of files such that each
-   * set has roughly the same size, at least from a probabilistic
-   * perspective.
-   * 
-   * For example an input file with the following input list of URLs:
-   * 
-   * A
-   * B
-   * C
-   * D
-   * 
-   * might be randomized into the following output list of URLs:
-   * 
-   * C
-   * A
-   * D
-   * B
-   * 
-   * The implementation sorts the list of lines by randomly generated numbers.
-   */
-  private Job randomizeManyInputFiles(Configuration baseConfig, Path fullInputList, Path outputStep2Dir, int numLinesPerSplit) 
-      throws IOException {
-    
-    Job job2 = Job.getInstance(baseConfig);
-    job2.setJarByClass(getClass());
-    job2.setJobName(getClass().getName() + "/" + Utils.getShortClassName(LineRandomizerMapper.class));
-    job2.setInputFormatClass(NLineInputFormat.class);
-    NLineInputFormat.addInputPath(job2, fullInputList);
-    NLineInputFormat.setNumLinesPerSplit(job2, numLinesPerSplit);          
-    job2.setMapperClass(LineRandomizerMapper.class);
-    job2.setReducerClass(LineRandomizerReducer.class);
-    job2.setOutputFormatClass(TextOutputFormat.class);
-    FileOutputFormat.setOutputPath(job2, outputStep2Dir);
-    job2.setNumReduceTasks(1);
-    job2.setOutputKeyClass(LongWritable.class);
-    job2.setOutputValueClass(Text.class);
-    return job2;
-  }
-
-  // do the same as if the user had typed 'hadoop ... --files <file>' 
-  private void addDistributedCacheFile(File file, Configuration conf) throws IOException {
-    String HADOOP_TMP_FILES = "tmpfiles"; // see Hadoop's GenericOptionsParser
-    String tmpFiles = conf.get(HADOOP_TMP_FILES, "");
-    if (tmpFiles.length() > 0) { // already present?
-      tmpFiles = tmpFiles + ","; 
-    }
-    GenericOptionsParser parser = new GenericOptionsParser(
-        new Configuration(conf), 
-        new String[] { "--files", file.getCanonicalPath() });
-    String additionalTmpFiles = parser.getConfiguration().get(HADOOP_TMP_FILES);
-    assert additionalTmpFiles != null;
-    assert additionalTmpFiles.length() > 0;
-    tmpFiles += additionalTmpFiles;
-    conf.set(HADOOP_TMP_FILES, tmpFiles);
-  }
-  
-  private MorphlineMapRunner setupMorphline(Options options) throws IOException, URISyntaxException {
-    if (options.morphlineId != null) {
-      job.getConfiguration().set(MorphlineMapRunner.MORPHLINE_ID_PARAM, options.morphlineId);
-    }
-    addDistributedCacheFile(options.morphlineFile, job.getConfiguration());    
-    if (!options.isDryRun) {
-      return null;
-    }
-    
-    /*
-     * Ensure scripting support for Java via morphline "java" command works even in dryRun mode,
-     * i.e. when executed in the client side driver JVM. To do so, collect all classpath URLs from
-     * the class loaders chain that org.apache.hadoop.util.RunJar (hadoop jar xyz-job.jar) and
-     * org.apache.hadoop.util.GenericOptionsParser (--libjars) have installed, then tell
-     * FastJavaScriptEngine.parse() where to find classes that JavaBuilder scripts might depend on.
-     * This ensures that scripts that reference external java classes compile without exceptions
-     * like this:
-     * 
-     * ... caused by compilation failed: mfm:///MyJavaClass1.java:2: package
-     * org.kitesdk.morphline.api does not exist
-     */
-    LOG.trace("dryRun: java.class.path: {}", System.getProperty("java.class.path"));
-    String fullClassPath = "";
-    ClassLoader loader = Thread.currentThread().getContextClassLoader(); // see org.apache.hadoop.util.RunJar
-    while (loader != null) { // walk class loaders, collect all classpath URLs
-      if (loader instanceof URLClassLoader) { 
-        URL[] classPathPartURLs = ((URLClassLoader) loader).getURLs(); // see org.apache.hadoop.util.RunJar
-        LOG.trace("dryRun: classPathPartURLs: {}", Arrays.asList(classPathPartURLs));
-        StringBuilder classPathParts = new StringBuilder();
-        for (URL url : classPathPartURLs) {
-          File file = new File(url.toURI());
-          if (classPathPartURLs.length > 0) {
-            classPathParts.append(File.pathSeparator);
-          }
-          classPathParts.append(file.getPath());
-        }
-        LOG.trace("dryRun: classPathParts: {}", classPathParts);
-        String separator = File.pathSeparator;
-        if (fullClassPath.length() == 0 || classPathParts.length() == 0) {
-          separator = "";
-        }
-        fullClassPath = classPathParts + separator + fullClassPath;
-      }
-      loader = loader.getParent();
-    }
-    
-    // tell FastJavaScriptEngine.parse() where to find the classes that the script might depend on
-    if (fullClassPath.length() > 0) {
-      assert System.getProperty("java.class.path") != null;
-      fullClassPath = System.getProperty("java.class.path") + File.pathSeparator + fullClassPath;
-      LOG.trace("dryRun: fullClassPath: {}", fullClassPath);
-      System.setProperty("java.class.path", fullClassPath); // see FastJavaScriptEngine.parse()
-    }
-    
-    job.getConfiguration().set(MorphlineMapRunner.MORPHLINE_FILE_PARAM, options.morphlineFile.getPath());
-    return new MorphlineMapRunner(
-        job.getConfiguration(), new DryRunDocumentLoader(), options.solrHomeDir.getPath());
-  }
-  
-  /*
-   * Executes the morphline in the current process (without submitting a job to MR) for quicker
-   * turnaround during trial & debug sessions
-   */
-  private void dryRun(MorphlineMapRunner runner, FileSystem fs, Path fullInputList) throws IOException {    
-    BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(fullInputList), StandardCharsets.UTF_8));
-    try {
-      String line;
-      while ((line = reader.readLine()) != null) {
-        runner.map(line, job.getConfiguration(), null);
-      }
-      runner.cleanup();
-    } finally {
-      reader.close();
-    }
-  }
-  
-  private int createTreeMergeInputDirList(Path outputReduceDir, FileSystem fs, Path fullInputList)
-      throws FileNotFoundException, IOException {
-    
-    FileStatus[] dirs = listSortedOutputShardDirs(outputReduceDir, fs);
-    int numFiles = 0;
-    FSDataOutputStream out = fs.create(fullInputList);
-    try {
-      Writer writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8));
-      for (FileStatus stat : dirs) {
-        LOG.debug("Adding path {}", stat.getPath());
-        Path dir = new Path(stat.getPath(), "data/index");
-        if (!fs.isDirectory(dir)) {
-          throw new IllegalStateException("Not a directory: " + dir);
-        }
-        writer.write(dir.toString() + "\n");
-        numFiles++;
-      }
-      writer.close();
-    } finally {
-      out.close();
-    }
-    return numFiles;
-  }
-
-  private FileStatus[] listSortedOutputShardDirs(Path outputReduceDir, FileSystem fs) throws FileNotFoundException,
-      IOException {
-    
-    final String dirPrefix = SolrOutputFormat.getOutputName(job);
-    FileStatus[] dirs = fs.listStatus(outputReduceDir, new PathFilter() {      
-      @Override
-      public boolean accept(Path path) {
-        return path.getName().startsWith(dirPrefix);
-      }
-    });
-    for (FileStatus dir : dirs) {
-      if (!dir.isDirectory()) {
-        throw new IllegalStateException("Not a directory: " + dir.getPath());
-      }
-    }
-    
-    // use alphanumeric sort (rather than lexicographical sort) to properly handle more than 99999 shards
-    Arrays.sort(dirs, (f1, f2) -> new AlphaNumericComparator().compare(f1.getPath().getName(), f2.getPath().getName()));
-
-    return dirs;
-  }
-
-  /*
-   * You can run MapReduceIndexerTool in Solrcloud mode, and once the MR job completes, you can use
-   * the standard solrj Solrcloud API to send doc updates and deletes to SolrCloud, and those updates
-   * and deletes will go to the right Solr shards, and it will work just fine.
-   * 
-   * The MapReduce framework doesn't guarantee that input split N goes to the map task with the
-   * taskId = N. The job tracker and Yarn schedule and assign tasks, considering data locality
-   * aspects, but without regard of the input split# withing the overall list of input splits. In
-   * other words, split# != taskId can be true.
-   * 
-   * To deal with this issue, our mapper tasks write a little auxiliary metadata file (per task)
-   * that tells the job driver which taskId processed which split#. Once the mapper-only job is
-   * completed, the job driver renames the output dirs such that the dir name contains the true solr
-   * shard id, based on these auxiliary files.
-   * 
-   * This way each doc gets assigned to the right Solr shard even with #reducers > #solrshards
-   * 
-   * Example for a merge with two shards:
-   * 
-   * part-m-00000 and part-m-00001 goes to outputShardNum = 0 and will end up in merged part-m-00000
-   * part-m-00002 and part-m-00003 goes to outputShardNum = 1 and will end up in merged part-m-00001
-   * part-m-00004 and part-m-00005 goes to outputShardNum = 2 and will end up in merged part-m-00002
-   * ... and so on
-   * 
-   * Also see run() method above where it uses NLineInputFormat.setNumLinesPerSplit(job,
-   * options.fanout)
-   * 
-   * Also see TreeMergeOutputFormat.TreeMergeRecordWriter.writeShardNumberFile()
-   */
-  private boolean renameTreeMergeShardDirs(Path outputTreeMergeStep, Job job, FileSystem fs) throws IOException {
-    final String dirPrefix = SolrOutputFormat.getOutputName(job);
-    FileStatus[] dirs = fs.listStatus(outputTreeMergeStep, new PathFilter() {      
-      @Override
-      public boolean accept(Path path) {
-        return path.getName().startsWith(dirPrefix);
-      }
-    });
-    
-    for (FileStatus dir : dirs) {
-      if (!dir.isDirectory()) {
-        throw new IllegalStateException("Not a directory: " + dir.getPath());
-      }
-    }
-
-    // Example: rename part-m-00004 to _part-m-00004
-    for (FileStatus dir : dirs) {
-      Path path = dir.getPath();
-      Path renamedPath = new Path(path.getParent(), "_" + path.getName());
-      if (!rename(path, renamedPath, fs)) {
-        return false;
-      }
-    }
-    
-    // Example: rename _part-m-00004 to part-m-00002
-    for (FileStatus dir : dirs) {
-      Path path = dir.getPath();
-      Path renamedPath = new Path(path.getParent(), "_" + path.getName());
-      
-      // read auxiliary metadata file (per task) that tells which taskId 
-      // processed which split# aka solrShard
-      Path solrShardNumberFile = new Path(renamedPath, TreeMergeMapper.SOLR_SHARD_NUMBER);
-      InputStream in = fs.open(solrShardNumberFile);
-      byte[] bytes = ByteStreams.toByteArray(in);
-      in.close();
-      Preconditions.checkArgument(bytes.length > 0);
-      int solrShard = Integer.parseInt(new String(bytes, StandardCharsets.UTF_8));
-      if (!delete(solrShardNumberFile, false, fs)) {
-        return false;
-      }
-      
-      // same as FileOutputFormat.NUMBER_FORMAT
-      NumberFormat numberFormat = NumberFormat.getInstance(Locale.ENGLISH);
-      numberFormat.setMinimumIntegerDigits(5);
-      numberFormat.setGroupingUsed(false);
-      Path finalPath = new Path(renamedPath.getParent(), dirPrefix + "-m-" + numberFormat.format(solrShard));
-      
-      LOG.info("MTree merge renaming solr shard: " + solrShard + " from dir: " + dir.getPath() + " to dir: " + finalPath);
-      if (!rename(renamedPath, finalPath, fs)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private static void verifyGoLiveArgs(Options opts, ArgumentParser parser) throws ArgumentParserException {
-    if (opts.zkHost == null && opts.solrHomeDir == null) {
-      throw new ArgumentParserException("At least one of --zk-host or --solr-home-dir is required", parser);
-    }
-    if (opts.goLive && opts.zkHost == null && opts.shardUrls == null) {
-      throw new ArgumentParserException("--go-live requires that you also pass --shard-url or --zk-host", parser);
-    }
-    
-    if (opts.zkHost != null && opts.collection == null) {
-      throw new ArgumentParserException("--zk-host requires that you also pass --collection", parser);
-    }
-    
-    if (opts.zkHost != null) {
-      return;
-      // verify structure of ZK directory later, to avoid checking run-time errors during parsing.
-    } else if (opts.shardUrls != null) {
-      if (opts.shardUrls.size() == 0) {
-        throw new ArgumentParserException("--shard-url requires at least one URL", parser);
-      }
-    } else if (opts.shards != null) {
-      if (opts.shards <= 0) {
-        throw new ArgumentParserException("--shards must be a positive number: " + opts.shards, parser);
-      }
-    } else {
-      throw new ArgumentParserException("You must specify one of the following (mutually exclusive) arguments: "
-          + "--zk-host or --shard-url or --shards", parser);
-    }
-
-    if (opts.shardUrls != null) {
-      opts.shards = opts.shardUrls.size();
-    }
-    
-    assert opts.shards != null;
-    assert opts.shards > 0;
-  }
-
-  private static void verifyZKStructure(Options opts, ArgumentParser parser) throws ArgumentParserException {
-    if (opts.zkHost != null) {
-      assert opts.collection != null;
-      ZooKeeperInspector zki = new ZooKeeperInspector();
-      try {
-        opts.shardUrls = zki.extractShardUrls(opts.zkHost, opts.collection);
-      } catch (Exception e) {
-        LOG.debug("Cannot extract SolrCloud shard URLs from ZooKeeper", e);
-        throw new ArgumentParserException(e, parser);
-      }
-      assert opts.shardUrls != null;
-      if (opts.shardUrls.size() == 0) {
-        throw new ArgumentParserException("--zk-host requires ZooKeeper " + opts.zkHost
-          + " to contain at least one SolrCore for collection: " + opts.collection, parser);
-      }
-      opts.shards = opts.shardUrls.size();
-      LOG.debug("Using SolrCloud shard URLs: {}", opts.shardUrls);
-    }
-  }
-
-  private boolean waitForCompletion(Job job, boolean isVerbose) 
-      throws IOException, InterruptedException, ClassNotFoundException {
-    
-    LOG.debug("Running job: " + getJobInfo(job));
-    boolean success = job.waitForCompletion(isVerbose);
-    if (!success) {
-      LOG.error("Job failed! " + getJobInfo(job));
-    }
-    return success;
-  }
-
-  private void goodbye(Job job, long startTime) {
-    float secs = (System.nanoTime() - startTime) / (float)(10^9);
-    if (job != null) {
-      LOG.info("Succeeded with job: " + getJobInfo(job));
-    }
-    LOG.info("Success. Done. Program took {} secs. Goodbye.", secs);
-  }
-
-  private String getJobInfo(Job job) {
-    return "jobName: " + job.getJobName() + ", jobId: " + job.getJobID();
-  }
-  
-  private boolean rename(Path src, Path dst, FileSystem fs) throws IOException {
-    boolean success = fs.rename(src, dst);
-    if (!success) {
-      LOG.error("Cannot rename " + src + " to " + dst);
-    }
-    return success;
-  }
-  
-  private boolean delete(Path path, boolean recursive, FileSystem fs) throws IOException {
-    boolean success = fs.delete(path, recursive);
-    if (!success) {
-      LOG.error("Cannot delete " + path);
-    }
-    return success;
-  }
-
-  // same as IntMath.divide(p, q, RoundingMode.CEILING)
-  private long ceilDivide(long p, long q) {
-    long result = p / q;
-    if (p % q != 0) {
-      result++;
-    }
-    return result;
-  }
-  
-  /**
-   * Returns <tt>log<sub>base</sub>value</tt>.
-   */
-  private double log(double base, double value) {
-    return Math.log(value) / Math.log(base);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53e5f34f/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/PathArgumentType.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/PathArgumentType.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/PathArgumentType.java
deleted file mode 100644
index 770a2f9..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/PathArgumentType.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import java.io.IOException;
-
-import net.sourceforge.argparse4j.inf.Argument;
-import net.sourceforge.argparse4j.inf.ArgumentParser;
-import net.sourceforge.argparse4j.inf.ArgumentParserException;
-import net.sourceforge.argparse4j.inf.ArgumentType;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-
-/**
- * ArgumentType subclass for HDFS Path type, using fluent style API.
- */
-public class PathArgumentType implements ArgumentType<Path> {
-  
-  private final Configuration conf;
-  private FileSystem fs;
-  private boolean acceptSystemIn = false;
-  private boolean verifyExists = false;
-  private boolean verifyNotExists = false;
-  private boolean verifyIsFile = false;
-  private boolean verifyIsDirectory = false;
-  private boolean verifyCanRead = false;
-  private boolean verifyCanWrite = false;
-  private boolean verifyCanWriteParent = false;
-  private boolean verifyCanExecute = false;
-  private boolean verifyIsAbsolute = false;
-  private boolean verifyHasScheme = false;
-  private String verifyScheme = null;
-
-  public PathArgumentType(Configuration conf) {
-    this.conf = conf;
-  }
-  
-  public PathArgumentType acceptSystemIn() {
-    acceptSystemIn = true;
-    return this;
-  }
-  
-  public PathArgumentType verifyExists() {
-    verifyExists = true;
-    return this;
-  }
-  
-  public PathArgumentType verifyNotExists() {
-    verifyNotExists = true;
-    return this;
-  }
-  
-  public PathArgumentType verifyIsFile() {
-    verifyIsFile = true;
-    return this;
-  }
-  
-  public PathArgumentType verifyIsDirectory() {
-    verifyIsDirectory = true;
-    return this;
-  }
-  
-  public PathArgumentType verifyCanRead() {
-    verifyCanRead = true;
-    return this;
-  }
-  
-  public PathArgumentType verifyCanWrite() {
-    verifyCanWrite = true;
-    return this;
-  }
-  
-  public PathArgumentType verifyCanWriteParent() {
-    verifyCanWriteParent = true;
-    return this;
-  }
-  
-  public PathArgumentType verifyCanExecute() {
-    verifyCanExecute = true;
-    return this;
-  }
-  
-  public PathArgumentType verifyIsAbsolute() {
-    verifyIsAbsolute = true;
-    return this;
-  }
-  
-  public PathArgumentType verifyHasScheme() {
-    verifyHasScheme = true;
-    return this;
-  }
-  
-  public PathArgumentType verifyScheme(String scheme) {
-    verifyScheme = scheme;
-    return this;
-  }
-  
-  @Override
-  public Path convert(ArgumentParser parser, Argument arg, String value) throws ArgumentParserException {
-    Path file = new Path(value);
-    try {
-      fs = file.getFileSystem(conf);
-      if (verifyHasScheme && !isSystemIn(file)) {
-        verifyHasScheme(parser, file);
-      }        
-      if (verifyScheme != null && !isSystemIn(file)) {
-        verifyScheme(parser, file);
-      }        
-      if (verifyIsAbsolute && !isSystemIn(file)) {
-        verifyIsAbsolute(parser, file);
-      }
-      if (verifyExists && !isSystemIn(file)) {
-        verifyExists(parser, file);
-      }
-      if (verifyNotExists && !isSystemIn(file)) {
-        verifyNotExists(parser, file);
-      }
-      if (verifyIsFile && !isSystemIn(file)) {
-        verifyIsFile(parser, file);
-      }
-      if (verifyIsDirectory && !isSystemIn(file)) {
-        verifyIsDirectory(parser, file);
-      }
-      if (verifyCanRead && !isSystemIn(file)) {
-        verifyCanRead(parser, file);
-      }
-      if (verifyCanWrite && !isSystemIn(file)) {
-        verifyCanWrite(parser, file);
-      }
-      if (verifyCanWriteParent && !isSystemIn(file)) {
-        verifyCanWriteParent(parser, file);
-      }
-      if (verifyCanExecute && !isSystemIn(file)) {
-        verifyCanExecute(parser, file);
-      }
-    } catch (IOException e) {
-      throw new ArgumentParserException(e, parser);
-    }
-    return file;
-  }
-  
-  private void verifyExists(ArgumentParser parser, Path file) throws ArgumentParserException, IOException {
-    if (!fs.exists(file)) {
-      throw new ArgumentParserException("File not found: " + file, parser);
-    }
-  }    
-  
-  private void verifyNotExists(ArgumentParser parser, Path file) throws ArgumentParserException, IOException {
-    if (fs.exists(file)) {
-      throw new ArgumentParserException("File found: " + file, parser);
-    }
-  }    
-  
-  private void verifyIsFile(ArgumentParser parser, Path file) throws ArgumentParserException, IOException {
-    if (!fs.isFile(file)) {
-      throw new ArgumentParserException("Not a file: " + file, parser);
-    }
-  }    
-  
-  private void verifyIsDirectory(ArgumentParser parser, Path file) throws ArgumentParserException, IOException {
-    if (!fs.isDirectory(file)) {
-      throw new ArgumentParserException("Not a directory: " + file, parser);
-    }
-  }    
-  
-  private void verifyCanRead(ArgumentParser parser, Path file) throws ArgumentParserException, IOException {
-    verifyExists(parser, file);
-    if (!fs.getFileStatus(file).getPermission().getUserAction().implies(FsAction.READ)) {
-      throw new ArgumentParserException("Insufficient permissions to read file: " + file, parser);
-    }
-  }    
-  
-  private void verifyCanWrite(ArgumentParser parser, Path file) throws ArgumentParserException, IOException {
-    verifyExists(parser, file);
-    if (!fs.getFileStatus(file).getPermission().getUserAction().implies(FsAction.WRITE)) {
-      throw new ArgumentParserException("Insufficient permissions to write file: " + file, parser);
-    }
-  }    
-  
-  private void verifyCanWriteParent(ArgumentParser parser, Path file) throws ArgumentParserException, IOException {
-    Path parent = file.getParent();
-    if (parent == null || !fs.exists(parent) || !fs.getFileStatus(parent).getPermission().getUserAction().implies(FsAction.WRITE)) {
-      throw new ArgumentParserException("Cannot write parent of file: " + file, parser);
-    }
-  }    
-  
-  private void verifyCanExecute(ArgumentParser parser, Path file) throws ArgumentParserException, IOException {
-    verifyExists(parser, file);
-    if (!fs.getFileStatus(file).getPermission().getUserAction().implies(FsAction.EXECUTE)) {
-      throw new ArgumentParserException("Insufficient permissions to execute file: " + file, parser);
-    }
-  }    
-  
-  private void verifyIsAbsolute(ArgumentParser parser, Path file) throws ArgumentParserException {
-    if (!file.isAbsolute()) {
-      throw new ArgumentParserException("Not an absolute file: " + file, parser);
-    }
-  }    
-  
-  private void verifyHasScheme(ArgumentParser parser, Path file) throws ArgumentParserException {
-    if (file.toUri().getScheme() == null) {
-      throw new ArgumentParserException("URI scheme is missing in path: " + file, parser);
-    }
-  }
-
-  private void verifyScheme(ArgumentParser parser, Path file) throws ArgumentParserException {
-    if (!verifyScheme.equals(file.toUri().getScheme())) {
-      throw new ArgumentParserException("Scheme of path: " + file + " must be: " + verifyScheme, parser);
-    }
-  }
-
-  private boolean isSystemIn(Path file) {
-    return acceptSystemIn && file.toString().equals("-");
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53e5f34f/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/PathParts.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/PathParts.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/PathParts.java
deleted file mode 100644
index 690901b..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/PathParts.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-
-/**
- * Extracts various components of an HDFS Path
- */
-public final class PathParts {
-
-  private final String uploadURL;
-  private final Configuration conf;
-  private final FileSystem fs;
-  private final Path normalizedPath;
-  private FileStatus stats;
-
-  public PathParts(String uploadURL, Configuration conf) throws IOException {
-    if (uploadURL == null) {
-      throw new IllegalArgumentException("Path must not be null: " + uploadURL);    
-    }
-    this.uploadURL = uploadURL;
-    if (conf == null) {
-      throw new IllegalArgumentException("Configuration must not be null: " + uploadURL);    
-    }
-    this.conf = conf;
-    URI uri = stringToUri(uploadURL);
-    this.fs = FileSystem.get(uri, conf);
-    if (fs == null) {
-      throw new IllegalArgumentException("File system must not be null: " + uploadURL);    
-    }
-    this.normalizedPath = fs.makeQualified(new Path(uri));
-    if (!normalizedPath.isAbsolute()) {
-      throw new IllegalArgumentException("Path must be absolute: " + uploadURL);    
-    }
-    if (getScheme() == null) {
-      throw new IllegalArgumentException("Scheme must not be null: " + uploadURL);    
-    }
-    if (getHost() == null) {
-      throw new IllegalArgumentException("Host must not be null: " + uploadURL);    
-    }
-    if (getPort() < 0) {
-      throw new IllegalArgumentException("Port must not be negative: " + uploadURL);    
-    }
-  }
-  
-  public String getUploadURL() {
-    return uploadURL;
-  }
-
-  public Path getUploadPath() {
-    return new Path(getUploadURL());
-  }
-  
-  public String getURIPath() {
-    return normalizedPath.toUri().getPath();
-  }
-
-  public String getName() {
-    return normalizedPath.getName();
-  }
-
-  public String getScheme() {
-    return normalizedPath.toUri().getScheme();
-  }
-
-  public String getHost() {
-    return normalizedPath.toUri().getHost();
-  }
-
-  public int getPort() {
-    int port = normalizedPath.toUri().getPort();
-    if (port == -1) {
-      port = fs.getWorkingDirectory().toUri().getPort();
-      if (port == -1) {
-        port = NameNode.DEFAULT_PORT;
-      }
-    }
-    return port;
-  }
-
-  public String getId() {
-    return getScheme() + "://" + getHost() + ":" + getPort() + getURIPath();
-  }
-  
-  public String getDownloadURL() {
-    return getId();
-  }
-
-  public Configuration getConfiguration() {
-    return conf;
-  }
-
-  public FileSystem getFileSystem() {
-    return fs;
-  }
-
-  public FileStatus getFileStatus() throws IOException {
-    if (stats == null) {
-      stats = getFileSystem().getFileStatus(getUploadPath());
-    }
-    return stats;
-  }
-  
-  private URI stringToUri(String pathString) {
-    //return new Path(pathString).toUri().normalize();
-    return URI.create(pathString).normalize();
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53e5f34f/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrCloudPartitioner.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrCloudPartitioner.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrCloudPartitioner.java
deleted file mode 100644
index c8ad1a7..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrCloudPartitioner.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import java.lang.invoke.MethodHandles;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.params.MapSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.Hash;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * MapReduce partitioner that partitions the Mapper output such that each
- * SolrInputDocument gets sent to the SolrCloud shard that it would have been
- * sent to if the document were ingested via the standard SolrCloud Near Real
- * Time (NRT) API.
- * 
- * In other words, this class implements the same partitioning semantics as the
- * standard SolrCloud NRT API. This enables to mix batch updates from MapReduce
- * ingestion with updates from standard NRT ingestion on the same SolrCloud
- * cluster, using identical unique document keys.
- */
-public class SolrCloudPartitioner extends Partitioner<Text, SolrInputDocumentWritable> implements Configurable {
-  
-  private Configuration conf;
-  private DocCollection docCollection;
-  private Map<String, Integer> shardNumbers;
-  private int shards = 0;
-  private final SolrParams emptySolrParams = new MapSolrParams(Collections.EMPTY_MAP);
-  
-  public static final String SHARDS = SolrCloudPartitioner.class.getName() + ".shards";
-  public static final String ZKHOST = SolrCloudPartitioner.class.getName() + ".zkHost";
-  public static final String COLLECTION = SolrCloudPartitioner.class.getName() + ".collection";
-  
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  
-  public SolrCloudPartitioner() {}
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;    
-    this.shards = conf.getInt(SHARDS, -1);
-    if (shards <= 0) {
-      throw new IllegalArgumentException("Illegal shards: " + shards);
-    }
-    String zkHost = conf.get(ZKHOST);
-    if (zkHost == null) {
-      throw new IllegalArgumentException("zkHost must not be null");
-    }
-    String collection = conf.get(COLLECTION);
-    if (collection == null) {
-      throw new IllegalArgumentException("collection must not be null");
-    }    
-    LOG.info("Using SolrCloud zkHost: {}, collection: {}", zkHost, collection);
-    docCollection = new ZooKeeperInspector().extractDocCollection(zkHost, collection);
-    if (docCollection == null) {
-      throw new IllegalArgumentException("docCollection must not be null");
-    }
-    if (docCollection.getSlicesMap().size() != shards) {
-      throw new IllegalArgumentException("Incompatible shards: + " + shards + " for docCollection: " + docCollection);
-    }    
-    List<Slice> slices = new ZooKeeperInspector().getSortedSlices(docCollection.getSlices());
-    if (slices.size() != shards) {
-      throw new IllegalStateException("Incompatible sorted shards: + " + shards + " for docCollection: " + docCollection);
-    }    
-    shardNumbers = new HashMap(10 * slices.size()); // sparse for performance
-    for (int i = 0; i < slices.size(); i++) {
-      shardNumbers.put(slices.get(i).getName(), i);
-    }
-    LOG.debug("Using SolrCloud docCollection: {}", docCollection);
-    DocRouter docRouter = docCollection.getRouter();
-    if (docRouter == null) {
-      throw new IllegalArgumentException("docRouter must not be null");
-    }
-    LOG.info("Using SolrCloud docRouterClass: {}", docRouter.getClass());    
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-  
-  @Override
-  public int getPartition(Text key, SolrInputDocumentWritable value, int numPartitions) {
-    DocRouter docRouter = docCollection.getRouter();
-    SolrInputDocument doc = value.getSolrInputDocument();
-    String keyStr = key.toString();
-    
-    // TODO: scalability: replace linear search in HashBasedRouter.hashToSlice() with binary search on sorted hash ranges
-    Slice slice = docRouter.getTargetSlice(keyStr, doc, null, emptySolrParams, docCollection); 
-    
-//    LOG.info("slice: {}", slice);
-    if (slice == null) {
-      throw new IllegalStateException("No matching slice found! The slice seems unavailable. docRouterClass: "
-          + docRouter.getClass().getName());
-    }
-    int rootShard = shardNumbers.get(slice.getName());
-    if (rootShard < 0 || rootShard >= shards) {
-      throw new IllegalStateException("Illegal shard number " + rootShard + " for slice: " + slice + ", docCollection: "
-          + docCollection);
-    }      
-
-    // map doc to micro shard aka leaf shard, akin to HashBasedRouter.sliceHash()
-    // taking into account mtree merge algorithm
-    assert numPartitions % shards == 0; // Also note that numPartitions is equal to the number of reducers
-    int hashCode = Hash.murmurhash3_x86_32(keyStr, 0, keyStr.length(), 0); 
-    int offset = (hashCode & Integer.MAX_VALUE) % (numPartitions / shards);
-    int microShard = (rootShard * (numPartitions / shards)) + offset;
-//    LOG.info("Subpartitions rootShard: {}, offset: {}", rootShard, offset);
-//    LOG.info("Partitioned to p: {} for numPartitions: {}, shards: {}, key: {}, value: {}", microShard, numPartitions, shards, key, value);
-    
-    assert microShard >= 0 && microShard < numPartitions;
-    return microShard;
-  }
-
-}