You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2014/01/15 20:33:22 UTC
svn commit: r1558541 - in /lucene/dev/branches/branch_4x: ./ solr/
solr/contrib/ solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/
solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/
Author: markrmiller
Date: Wed Jan 15 19:33:21 2014
New Revision: 1558541
URL: http://svn.apache.org/r1558541
Log:
SOLR-1301: Merge in latest solr-map-reduce updates.
Added:
lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/AlphaNumericComparator.java
- copied unchanged from r1547871, lucene/dev/trunk/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/AlphaNumericComparator.java
lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/AlphaNumericComparatorTest.java
- copied unchanged from r1547871, lucene/dev/trunk/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/AlphaNumericComparatorTest.java
Modified:
lucene/dev/branches/branch_4x/ (props changed)
lucene/dev/branches/branch_4x/solr/ (props changed)
lucene/dev/branches/branch_4x/solr/contrib/ (props changed)
lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/BatchWriter.java
lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/MapReduceIndexerTool.java
lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrReducer.java
lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeMapper.java
lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java
lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MRUnitBase.java
lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineBasicMiniMRTest.java
lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineGoLiveMiniMRTest.java
lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineReducerTest.java
Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/BatchWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/BatchWriter.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/BatchWriter.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/BatchWriter.java Wed Jan 15 19:33:21 2014
@@ -196,8 +196,8 @@ class BatchWriter {
batchPool.awaitTermination(5, TimeUnit.SECONDS);
}
}
- //reporter.setStatus("Committing Solr");
- //solr.commit(true, false);
+ context.setStatus("Committing Solr Phase 1");
+ solr.commit(true, false);
context.setStatus("Optimizing Solr");
int maxSegments = context.getConfiguration().getInt(SolrOutputFormat.SOLR_RECORD_WRITER_MAX_SEGMENTS, 1);
LOG.info("Optimizing Solr: forcing merge down to {} segments", maxSegments);
@@ -206,9 +206,9 @@ class BatchWriter {
context.getCounter(SolrCounters.class.getName(), SolrCounters.PHYSICAL_REDUCER_MERGE_TIME.toString()).increment(System.currentTimeMillis() - start);
float secs = (System.currentTimeMillis() - start) / 1000.0f;
LOG.info("Optimizing Solr: done forcing merge down to {} segments in {} secs", maxSegments, secs);
+ context.setStatus("Committing Solr Phase 2");
+ solr.commit(true, false);
context.setStatus("Shutting down Solr");
- // TODO is core close needed? - according to TestEmbeddedSolrServer it's not...
- //core.close();
solr.shutdown();
}
Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/MapReduceIndexerTool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/MapReduceIndexerTool.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/MapReduceIndexerTool.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/MapReduceIndexerTool.java Wed Jan 15 19:33:21 2014
@@ -33,10 +33,13 @@ import java.io.Writer;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
+import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Random;
@@ -80,6 +83,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.cdk.morphline.base.Fields;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
/**
@@ -107,6 +113,10 @@ public class MapReduceIndexerTool extend
*/
static final class MyArgumentParser {
+ private static final String SHOW_NON_SOLR_CLOUD = "--show-non-solr-cloud";
+
+ private boolean showNonSolrCloud = false;
+
/**
* Parses the given command line arguments.
*
@@ -123,6 +133,8 @@ public class MapReduceIndexerTool extend
args = new String[] { "--help" };
}
+ showNonSolrCloud = Arrays.asList(args).contains(SHOW_NON_SOLR_CLOUD); // intercept it first
+
ArgumentParser parser = ArgumentParsers
.newArgumentParser("hadoop [GenericOptions]... jar search-mr-*-job.jar " + MapReduceIndexerTool.class.getName(), false)
.defaultHelp(true)
@@ -297,7 +309,7 @@ public class MapReduceIndexerTool extend
"specified by --morphline-file. If the --morphline-id option is ommitted the first (i.e. " +
"top-most) morphline within the config file is used. Example: morphline1");
- Argument solrHomeDirArg = parser.addArgument("--solr-home-dir")
+ Argument solrHomeDirArg = nonSolrCloud(parser.addArgument("--solr-home-dir")
.metavar("DIR")
.type(new FileArgumentType() {
@Override
@@ -312,7 +324,7 @@ public class MapReduceIndexerTool extend
.required(false)
.help("Relative or absolute path to a local dir containing Solr conf/ dir and in particular " +
"conf/solrconfig.xml and optionally also lib/ dir. This directory will be uploaded to each MR task. " +
- "Example: src/test/resources/solr/minimr");
+ "Example: src/test/resources/solr/minimr"));
Argument updateConflictResolverArg = parser.addArgument("--update-conflict-resolver")
.metavar("FQCN")
@@ -404,25 +416,20 @@ public class MapReduceIndexerTool extend
.action(Arguments.storeTrue())
.help("Turn on verbose output.");
+ parser.addArgument(SHOW_NON_SOLR_CLOUD)
+ .action(Arguments.storeTrue())
+ .help("Also show options for Non-SolrCloud mode as part of --help.");
+
ArgumentGroup clusterInfoGroup = parser
.addArgumentGroup("Cluster arguments")
.description(
"Arguments that provide information about your Solr cluster. "
- + "If you are not using --go-live, pass the --shards argument. If you are building shards for "
- + "a Non-SolrCloud cluster, pass the --shard-url argument one or more times. To build indexes for"
- + " a replicated cluster with --shard-url, pass replica urls consecutively and also pass --shards. "
- + "If you are building shards for a SolrCloud cluster, pass the --zk-host argument. "
- + "Using --go-live requires either --shard-url or --zk-host.");
+ + nonSolrCloud("If you are building shards for a SolrCloud cluster, pass the --zk-host argument. "
+ + "If you are building shards for "
+ + "a Non-SolrCloud cluster, pass the --shard-url argument one or more times. To build indexes for "
+ + "a replicated Non-SolrCloud cluster with --shard-url, pass replica urls consecutively and also pass --shards. "
+ + "Using --go-live requires either --zk-host or --shard-url."));
- Argument shardUrlsArg = clusterInfoGroup.addArgument("--shard-url")
- .metavar("URL")
- .type(String.class)
- .action(Arguments.append())
- .help("Solr URL to merge resulting shard into if using --go-live. " +
- "Example: http://solr001.mycompany.com:8983/solr/collection1. " +
- "Multiple --shard-url arguments can be specified, one for each desired shard. " +
- "If you are merging shards into a SolrCloud cluster, use --zk-host instead.");
-
Argument zkHostArg = clusterInfoGroup.addArgument("--zk-host")
.metavar("STRING")
.type(String.class)
@@ -444,15 +451,24 @@ public class MapReduceIndexerTool extend
+ "would be relative to this root - i.e. getting/setting/etc... "
+ "'/foo/bar' would result in operations being run on "
+ "'/solr/foo/bar' (from the server perspective).\n"
- + "\n"
+ + nonSolrCloud("\n"
+ "If --solr-home-dir is not specified, the Solr home directory for the collection "
- + "will be downloaded from this ZooKeeper ensemble.");
+ + "will be downloaded from this ZooKeeper ensemble."));
- Argument shardsArg = clusterInfoGroup.addArgument("--shards")
+ Argument shardUrlsArg = nonSolrCloud(clusterInfoGroup.addArgument("--shard-url")
+ .metavar("URL")
+ .type(String.class)
+ .action(Arguments.append())
+ .help("Solr URL to merge resulting shard into if using --go-live. " +
+ "Example: http://solr001.mycompany.com:8983/solr/collection1. " +
+ "Multiple --shard-url arguments can be specified, one for each desired shard. " +
+ "If you are merging shards into a SolrCloud cluster, use --zk-host instead."));
+
+ Argument shardsArg = nonSolrCloud(clusterInfoGroup.addArgument("--shards")
.metavar("INTEGER")
.type(Integer.class)
.choices(new RangeArgumentChoice(1, Integer.MAX_VALUE))
- .help("Number of output shards to generate.");
+ .help("Number of output shards to generate."));
ArgumentGroup goLiveGroup = parser.addArgumentGroup("Go live arguments")
.description("Arguments for merging the shards that are built into a live Solr cluster. " +
@@ -462,8 +478,8 @@ public class MapReduceIndexerTool extend
.action(Arguments.storeTrue())
.help("Allows you to optionally merge the final index shards into a live Solr cluster after they are built. " +
"You can pass the ZooKeeper address with --zk-host and the relevant cluster information will be auto detected. " +
- "If you are not using a SolrCloud cluster, --shard-url arguments can be used to specify each SolrCore to merge " +
- "each shard into.");
+ nonSolrCloud("If you are not using a SolrCloud cluster, --shard-url arguments can be used to specify each SolrCore to merge " +
+ "each shard into."));
Argument collectionArg = goLiveGroup.addArgument("--collection")
.metavar("STRING")
@@ -538,6 +554,15 @@ public class MapReduceIndexerTool extend
return null;
}
+ // make it a "hidden" option, i.e. the option is functional and enabled but not shown in --help output
+ private Argument nonSolrCloud(Argument arg) {
+ return showNonSolrCloud ? arg : arg.help(FeatureControl.SUPPRESS);
+ }
+
+ private String nonSolrCloud(String msg) {
+ return showNonSolrCloud ? msg : "";
+ }
+
/** Marker trick to prevent processing of any remaining arguments once --help option has been parsed */
private static final class FoundHelpArgument extends RuntimeException {
}
@@ -785,7 +810,7 @@ public class MapReduceIndexerTool extend
job.setOutputValueClass(SolrInputDocumentWritable.class);
LOG.info("Indexing {} files using {} real mappers into {} reducers", new Object[] {numFiles, realMappers, reducers});
startTime = System.currentTimeMillis();
- if (!waitForCompletion(job, true)) {
+ if (!waitForCompletion(job, options.isVerbose)) {
return -1; // job failed
}
@@ -826,6 +851,9 @@ public class MapReduceIndexerTool extend
if (!waitForCompletion(job, options.isVerbose)) {
return -1; // job failed
}
+ if (!renameTreeMergeShardDirs(outputTreeMergeStep, job, fs)) {
+ return -1;
+ }
secs = (System.currentTimeMillis() - startTime) / 1000.0f;
LOG.info("MTree merge iteration {}/{}: Done. Merging {} shards into {} shards using fanout {} took {} secs",
new Object[] {mtreeMergeIteration, mtreeMergeIterations, reducers, (reducers / options.fanout), options.fanout, secs});
@@ -1182,10 +1210,102 @@ public class MapReduceIndexerTool extend
throw new IllegalStateException("Not a directory: " + dir.getPath());
}
}
- Arrays.sort(dirs); // FIXME: handle more than 99999 shards (need numeric sort rather than lexicographical sort)
+
+ // use alphanumeric sort (rather than lexicographical sort) to properly handle more than 99999 shards
+ Arrays.sort(dirs, new Comparator<FileStatus>() {
+ @Override
+ public int compare(FileStatus f1, FileStatus f2) {
+ return new AlphaNumericComparator().compare(f1.getPath().getName(), f2.getPath().getName());
+ }
+ });
+
return dirs;
}
+ /*
+ * You can run MapReduceIndexerTool in Solrcloud mode, and once the MR job completes, you can use
+ * the standard solrj Solrcloud API to send doc updates and deletes to SolrCloud, and those updates
+ * and deletes will go to the right Solr shards, and it will work just fine.
+ *
+ * The MapReduce framework doesn't guarantee that input split N goes to the map task with the
+ * taskId = N. The job tracker and Yarn schedule and assign tasks, considering data locality
+ * aspects, but without regard of the input split# withing the overall list of input splits. In
+ * other words, split# != taskId can be true.
+ *
+ * To deal with this issue, our mapper tasks write a little auxiliary metadata file (per task)
+ * that tells the job driver which taskId processed which split#. Once the mapper-only job is
+ * completed, the job driver renames the output dirs such that the dir name contains the true solr
+ * shard id, based on these auxiliary files.
+ *
+ * This way each doc gets assigned to the right Solr shard even with #reducers > #solrshards
+ *
+ * Example for a merge with two shards:
+ *
+ * part-m-00000 and part-m-00001 goes to outputShardNum = 0 and will end up in merged part-m-00000
+ * part-m-00002 and part-m-00003 goes to outputShardNum = 1 and will end up in merged part-m-00001
+ * part-m-00004 and part-m-00005 goes to outputShardNum = 2 and will end up in merged part-m-00002
+ * ... and so on
+ *
+ * Also see run() method above where it uses NLineInputFormat.setNumLinesPerSplit(job,
+ * options.fanout)
+ *
+ * Also see TreeMergeOutputFormat.TreeMergeRecordWriter.writeShardNumberFile()
+ */
+ private boolean renameTreeMergeShardDirs(Path outputTreeMergeStep, Job job, FileSystem fs) throws IOException {
+ final String dirPrefix = SolrOutputFormat.getOutputName(job);
+ FileStatus[] dirs = fs.listStatus(outputTreeMergeStep, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith(dirPrefix);
+ }
+ });
+
+ for (FileStatus dir : dirs) {
+ if (!dir.isDirectory()) {
+ throw new IllegalStateException("Not a directory: " + dir.getPath());
+ }
+ }
+
+ // Example: rename part-m-00004 to _part-m-00004
+ for (FileStatus dir : dirs) {
+ Path path = dir.getPath();
+ Path renamedPath = new Path(path.getParent(), "_" + path.getName());
+ if (!rename(path, renamedPath, fs)) {
+ return false;
+ }
+ }
+
+ // Example: rename _part-m-00004 to part-m-00002
+ for (FileStatus dir : dirs) {
+ Path path = dir.getPath();
+ Path renamedPath = new Path(path.getParent(), "_" + path.getName());
+
+ // read auxiliary metadata file (per task) that tells which taskId
+ // processed which split# aka solrShard
+ Path solrShardNumberFile = new Path(renamedPath, TreeMergeMapper.SOLR_SHARD_NUMBER);
+ InputStream in = fs.open(solrShardNumberFile);
+ byte[] bytes = ByteStreams.toByteArray(in);
+ in.close();
+ Preconditions.checkArgument(bytes.length > 0);
+ int solrShard = Integer.parseInt(new String(bytes, Charsets.UTF_8));
+ if (!delete(solrShardNumberFile, false, fs)) {
+ return false;
+ }
+
+ // same as FileOutputFormat.NUMBER_FORMAT
+ NumberFormat numberFormat = NumberFormat.getInstance(Locale.ENGLISH);
+ numberFormat.setMinimumIntegerDigits(5);
+ numberFormat.setGroupingUsed(false);
+ Path finalPath = new Path(renamedPath.getParent(), dirPrefix + "-m-" + numberFormat.format(solrShard));
+
+ LOG.info("MTree merge renaming solr shard: " + solrShard + " from dir: " + dir.getPath() + " to dir: " + finalPath);
+ if (!rename(renamedPath, finalPath, fs)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private static void verifyGoLiveArgs(Options opts, ArgumentParser parser) throws ArgumentParserException {
if (opts.zkHost == null && opts.solrHomeDir == null) {
throw new ArgumentParserException("At least one of --zk-host or --solr-home-dir is required", parser);
Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrReducer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrReducer.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrReducer.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrReducer.java Wed Jan 15 19:33:21 2014
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import com.cloudera.cdk.morphline.api.ExceptionHandler;
import com.cloudera.cdk.morphline.base.FaultTolerance;
+import com.google.common.base.Preconditions;
/**
* This class loads the mapper's SolrInputDocuments into one EmbeddedSolrServer
@@ -54,6 +55,7 @@ public class SolrReducer extends Reducer
@Override
protected void setup(Context context) throws IOException, InterruptedException {
+ verifyPartitionAssignment(context);
SolrRecordWriter.addReducerContext(context);
Class<? extends UpdateConflictResolver> resolverClass = context.getConfiguration().getClass(
UPDATE_CONFLICT_RESOLVER, RetainMostRecentUpdateConflictResolver.class, UpdateConflictResolver.class);
@@ -107,6 +109,24 @@ public class SolrReducer extends Reducer
super.cleanup(context);
}
+ /*
+ * Verify that if a mappers's partitioner sends an item to partition X it implies that said item
+ * is sent to the reducer with taskID == X. This invariant is currently required for Solr
+ * documents to end up in the right Solr shard.
+ */
+ private void verifyPartitionAssignment(Context context) {
+ if ("true".equals(System.getProperty("verifyPartitionAssignment", "true"))) {
+ String partitionStr = context.getConfiguration().get("mapred.task.partition");
+ if (partitionStr == null) {
+ partitionStr = context.getConfiguration().get("mapreduce.task.partition");
+ }
+ int partition = Integer.parseInt(partitionStr);
+ int taskId = context.getTaskAttemptID().getTaskID().getId();
+ Preconditions.checkArgument(partition == taskId,
+ "mapred.task.partition: " + partition + " not equal to reducer taskId: " + taskId);
+ }
+ }
+
///////////////////////////////////////////////////////////////////////////////
// Nested classes:
Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeMapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeMapper.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeMapper.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeMapper.java Wed Jan 15 19:33:21 2014
@@ -34,6 +34,8 @@ public class TreeMergeMapper extends Map
public static final String MAX_SEGMENTS_ON_TREE_MERGE = "maxSegmentsOnTreeMerge";
+ public static final String SOLR_SHARD_NUMBER = "_solrShardNumber";
+
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
LOGGER.trace("map key: {}, value: {}", key, value);
Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java Wed Jan 15 19:33:21 2014
@@ -17,6 +17,9 @@
package org.apache.solr.hadoop;
import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
import java.util.ArrayList;
import java.util.List;
@@ -39,6 +42,9 @@ import org.apache.solr.store.hdfs.HdfsDi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+
/**
* See {@link IndexMergeTool}.
*/
@@ -84,13 +90,15 @@ public class TreeMergeOutputFormat exten
@Override
public void close(TaskAttemptContext context) throws IOException {
- LOG.debug("Merging into dstDir: " + workDir + ", srcDirs: {}", shards);
+ LOG.debug("Task " + context.getTaskAttemptID() + " merging into dstDir: " + workDir + ", srcDirs: " + shards);
+ writeShardNumberFile(context);
heartBeater.needHeartBeat();
try {
Directory mergedIndex = new HdfsDirectory(workDir, context.getConfiguration());
+ // TODO: shouldn't we pull the Version from the solrconfig.xml?
IndexWriterConfig writerConfig = new IndexWriterConfig(Version.LUCENE_CURRENT, null)
- .setOpenMode(OpenMode.CREATE)
+ .setOpenMode(OpenMode.CREATE).setUseCompoundFile(false)
//.setMergePolicy(mergePolicy) // TODO: grab tuned MergePolicy from solrconfig.xml?
//.setMergeScheduler(...) // TODO: grab tuned MergeScheduler from solrconfig.xml?
;
@@ -162,6 +170,27 @@ public class TreeMergeOutputFormat exten
heartBeater.cancelHeartBeat();
heartBeater.close();
}
+ }
+
+ /*
+ * For background see MapReduceIndexerTool.renameTreeMergeShardDirs()
+ *
+ * Also see MapReduceIndexerTool.run() method where it uses
+ * NLineInputFormat.setNumLinesPerSplit(job, options.fanout)
+ */
+ private void writeShardNumberFile(TaskAttemptContext context) throws IOException {
+ Preconditions.checkArgument(shards.size() > 0);
+ String shard = shards.get(0).getParent().getParent().getName(); // move up from "data/index"
+ String taskId = shard.substring("part-m-".length(), shard.length()); // e.g. part-m-00001
+ int taskNum = Integer.parseInt(taskId);
+ int outputShardNum = taskNum / shards.size();
+ LOG.debug("Merging into outputShardNum: " + outputShardNum + " from taskId: " + taskId);
+ Path shardNumberFile = new Path(workDir.getParent().getParent(), TreeMergeMapper.SOLR_SHARD_NUMBER);
+ OutputStream out = shardNumberFile.getFileSystem(context.getConfiguration()).create(shardNumberFile);
+ Writer writer = new OutputStreamWriter(out, Charsets.UTF_8);
+ writer.write(String.valueOf(outputShardNum));
+ writer.flush();
+ writer.close();
}
}
}
Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java Wed Jan 15 19:33:21 2014
@@ -117,9 +117,11 @@ final class ZooKeeperInspector {
Collections.sort(sorted, new Comparator<Slice>() {
@Override
public int compare(Slice slice1, Slice slice2) {
- return slice1.getName().compareTo(slice2.getName());
+ Comparator c = new AlphaNumericComparator();
+ return c.compare(slice1.getName(), slice2.getName());
}
});
+ LOG.trace("Sorted slices: {}", sorted);
return sorted;
}
Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MRUnitBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MRUnitBase.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MRUnitBase.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MRUnitBase.java Wed Jan 15 19:33:21 2014
@@ -54,6 +54,7 @@ public abstract class MRUnitBase extends
setupMorphline(tempDir, "test-morphlines/solrCellDocumentTypes");
config.set(MorphlineMapRunner.MORPHLINE_FILE_PARAM, tempDir + "/test-morphlines/solrCellDocumentTypes.conf");
+ config.set(SolrOutputFormat.ZIP_NAME, solrHomeZip.getName());
}
public static void setupMorphline(String tempDir, String file) throws IOException {
Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineBasicMiniMRTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineBasicMiniMRTest.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineBasicMiniMRTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineBasicMiniMRTest.java Wed Jan 15 19:33:21 2014
@@ -43,6 +43,7 @@ import org.apache.lucene.util.LuceneTest
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.hadoop.hack.MiniMRCluster;
+import org.apache.solr.handler.extraction.ExtractingParams;
import org.apache.solr.util.ExternalPaths;
import org.junit.After;
import org.junit.AfterClass;
@@ -323,7 +324,7 @@ public class MorphlineBasicMiniMRTest ex
jobConf.setMaxMapAttempts(1);
jobConf.setMaxReduceAttempts(1);
jobConf.setJar(SEARCH_ARCHIVES_JAR);
- jobConf.setBoolean("ignoreTikaException", false);
+ jobConf.setBoolean(ExtractingParams.IGNORE_TIKA_EXCEPTION, false);
int shards = 2;
int maxReducers = Integer.MAX_VALUE;
Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineGoLiveMiniMRTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineGoLiveMiniMRTest.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineGoLiveMiniMRTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineGoLiveMiniMRTest.java Wed Jan 15 19:33:21 2014
@@ -25,9 +25,11 @@ import java.io.Writer;
import java.lang.reflect.Array;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
@@ -36,16 +38,16 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.lucene.util.Constants;
-import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrQuery.ORDER;
+import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
@@ -53,6 +55,9 @@ import org.apache.solr.client.solrj.requ
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
@@ -62,12 +67,12 @@ import org.apache.solr.common.params.Mod
import org.apache.solr.common.util.NamedList;
import org.apache.solr.hadoop.hack.MiniMRClientCluster;
import org.apache.solr.hadoop.hack.MiniMRClientClusterFactory;
+import org.apache.solr.handler.extraction.ExtractingParams;
import org.apache.solr.util.ExternalPaths;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
@@ -86,16 +91,16 @@ import com.carrotsearch.randomizedtestin
@Slow
public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
+ private static final int RECORD_COUNT = 2104;
private static final String RESOURCES_DIR = ExternalPaths.SOURCE_HOME + "/contrib/map-reduce/src/test-files";
private static final String DOCUMENTS_DIR = RESOURCES_DIR + "/test-documents";
private static final File MINIMR_INSTANCE_DIR = new File(RESOURCES_DIR + "/solr/minimr");
private static final File MINIMR_CONF_DIR = new File(RESOURCES_DIR + "/solr/minimr");
-
+
private static final String SEARCH_ARCHIVES_JAR = JarFinder.getJar(MapReduceIndexerTool.class);
private static MiniDFSCluster dfsCluster = null;
private static MiniMRClientCluster mrCluster = null;
- private static int numRuns = 0;
private static String tempDir;
private final String inputAvroFile1;
@@ -115,17 +120,8 @@ public class MorphlineGoLiveMiniMRTest e
this.inputAvroFile3 = "sample-statuses-20120906-141433-medium.avro";
fixShardCount = true;
- sliceCount = TEST_NIGHTLY ? 3 : 3;
- shardCount = TEST_NIGHTLY ? 3 : 3;
- }
-
- private static boolean isYarn() {
- try {
- Job.class.getMethod("getCluster");
- return true;
- } catch (NoSuchMethodException e) {
- return false;
- }
+ sliceCount = TEST_NIGHTLY ? 7 : 3;
+ shardCount = TEST_NIGHTLY ? 7 : 3;
}
@BeforeClass
@@ -371,7 +367,7 @@ public class MorphlineGoLiveMiniMRTest e
jobConf.setMaxMapAttempts(1);
jobConf.setMaxReduceAttempts(1);
jobConf.setJar(SEARCH_ARCHIVES_JAR);
- jobConf.setBoolean("ignoreTikaException", false);
+ jobConf.setBoolean(ExtractingParams.IGNORE_TIKA_EXCEPTION, false);
MapReduceIndexerTool tool;
int res;
@@ -384,7 +380,7 @@ public class MorphlineGoLiveMiniMRTest e
"--output-dir=" + outDir.toString(),
"--log4j=" + ExternalPaths.SOURCE_HOME + "/core/src/test-files/log4j.properties",
"--mappers=3",
- ++numRuns % 2 == 0 ? "--input-list=" + INPATH.toString() : dataDir.toString(),
+ random().nextBoolean() ? "--input-list=" + INPATH.toString() : dataDir.toString(),
"--go-live-threads", Integer.toString(random().nextInt(15) + 1),
"--verbose",
"--go-live"
@@ -396,9 +392,7 @@ public class MorphlineGoLiveMiniMRTest e
if (true) {
tool = new MapReduceIndexerTool();
-
res = ToolRunner.run(jobConf, tool, args);
-
assertEquals(0, res);
assertTrue(tool.job.isComplete());
assertTrue(tool.job.isSuccessful());
@@ -418,7 +412,7 @@ public class MorphlineGoLiveMiniMRTest e
"--mappers=3",
"--verbose",
"--go-live",
- ++numRuns % 2 == 0 ? "--input-list=" + INPATH.toString() : dataDir.toString(),
+ random().nextBoolean() ? "--input-list=" + INPATH.toString() : dataDir.toString(),
"--go-live-threads", Integer.toString(random().nextInt(15) + 1)
};
args = prependInitialArgs(args);
@@ -449,14 +443,19 @@ public class MorphlineGoLiveMiniMRTest e
fs.delete(outDir, true);
fs.delete(dataDir, true);
INPATH = upAvroFile(fs, inDir, DATADIR, dataDir, inputAvroFile3);
-
+
+ cloudClient.deleteByQuery("*:*");
+ cloudClient.commit();
+ assertEquals(0, cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound());
+
args = new String[] {
"--output-dir=" + outDir.toString(),
"--mappers=3",
- "--reducers=6",
+ "--reducers=12",
+ "--fanout=2",
"--verbose",
"--go-live",
- ++numRuns % 2 == 0 ? "--input-list=" + INPATH.toString() : dataDir.toString(),
+ random().nextBoolean() ? "--input-list=" + INPATH.toString() : dataDir.toString(),
"--zk-host", zkServer.getZkAddress(),
"--collection", collection
};
@@ -469,15 +468,55 @@ public class MorphlineGoLiveMiniMRTest e
assertTrue(tool.job.isComplete());
assertTrue(tool.job.isSuccessful());
- results = server.query(new SolrQuery("*:*"));
- assertEquals(2126, results.getResults().getNumFound());
+ SolrDocumentList resultDocs = executeSolrQuery(cloudClient, "*:*");
+ assertEquals(RECORD_COUNT, resultDocs.getNumFound());
+ assertEquals(RECORD_COUNT, resultDocs.size());
+
+ // perform updates
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ SolrDocument doc = resultDocs.get(i);
+ SolrInputDocument update = new SolrInputDocument();
+ for (Map.Entry<String, Object> entry : doc.entrySet()) {
+ update.setField(entry.getKey(), entry.getValue());
+ }
+ update.setField("user_screen_name", "Nadja" + i);
+ update.removeField("_version_");
+ cloudClient.add(update);
+ }
+ cloudClient.commit();
+
+ // verify updates
+ SolrDocumentList resultDocs2 = executeSolrQuery(cloudClient, "*:*");
+ assertEquals(RECORD_COUNT, resultDocs2.getNumFound());
+ assertEquals(RECORD_COUNT, resultDocs2.size());
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ SolrDocument doc = resultDocs.get(i);
+ SolrDocument doc2 = resultDocs2.get(i);
+ assertEquals(doc.getFirstValue("id"), doc2.getFirstValue("id"));
+ assertEquals("Nadja" + i, doc2.getFirstValue("user_screen_name"));
+ assertEquals(doc.getFirstValue("text"), doc2.getFirstValue("text"));
+
+ // perform delete
+ cloudClient.deleteById((String)doc.getFirstValue("id"));
+ }
+ cloudClient.commit();
+
+ // verify deletes
+ assertEquals(0, executeSolrQuery(cloudClient, "*:*").size());
}
+ cloudClient.deleteByQuery("*:*");
+ cloudClient.commit();
+ assertEquals(0, cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound());
server.shutdown();
// try using zookeeper with replication
String replicatedCollection = "replicated_collection";
- createCollection(replicatedCollection, 2, 3, 2);
+ if (TEST_NIGHTLY) {
+ createCollection(replicatedCollection, 11, 3, 11);
+ } else {
+ createCollection(replicatedCollection, 2, 3, 2);
+ }
waitForRecoveriesToFinish(false);
cloudClient.setDefaultCollection(replicatedCollection);
fs.delete(inDir, true);
@@ -490,7 +529,8 @@ public class MorphlineGoLiveMiniMRTest e
"--solr-home-dir=" + MINIMR_CONF_DIR.getAbsolutePath(),
"--output-dir=" + outDir.toString(),
"--mappers=3",
- "--reducers=6",
+ "--reducers=22",
+ "--fanout=2",
"--verbose",
"--go-live",
"--zk-host", zkServer.getZkAddress(),
@@ -505,15 +545,51 @@ public class MorphlineGoLiveMiniMRTest e
assertTrue(tool.job.isComplete());
assertTrue(tool.job.isSuccessful());
- results = cloudClient.query(new SolrQuery("*:*"));
- assertEquals(2104, results.getResults().getNumFound());
+ SolrDocumentList resultDocs = executeSolrQuery(cloudClient, "*:*");
+ assertEquals(RECORD_COUNT, resultDocs.getNumFound());
+ assertEquals(RECORD_COUNT, resultDocs.size());
checkConsistency(replicatedCollection);
- }
+
+ // perform updates
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ SolrDocument doc = resultDocs.get(i);
+ SolrInputDocument update = new SolrInputDocument();
+ for (Map.Entry<String, Object> entry : doc.entrySet()) {
+ update.setField(entry.getKey(), entry.getValue());
+ }
+ update.setField("user_screen_name", "@Nadja" + i);
+ update.removeField("_version_");
+ cloudClient.add(update);
+ }
+ cloudClient.commit();
+
+ // verify updates
+ SolrDocumentList resultDocs2 = executeSolrQuery(cloudClient, "*:*");
+ assertEquals(RECORD_COUNT, resultDocs2.getNumFound());
+ assertEquals(RECORD_COUNT, resultDocs2.size());
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ SolrDocument doc = resultDocs.get(i);
+ SolrDocument doc2 = resultDocs2.get(i);
+ assertEquals(doc.getFieldValues("id"), doc2.getFieldValues("id"));
+ assertEquals(1, doc.getFieldValues("id").size());
+ assertEquals(Arrays.asList("@Nadja" + i), doc2.getFieldValues("user_screen_name"));
+ assertEquals(doc.getFieldValues("text"), doc2.getFieldValues("text"));
+
+ // perform delete
+ cloudClient.deleteById((String)doc.getFirstValue("id"));
+ }
+ cloudClient.commit();
+
+ // verify deletes
+ assertEquals(0, executeSolrQuery(cloudClient, "*:*").size());
+ }
// try using solr_url with replication
cloudClient.deleteByQuery("*:*");
cloudClient.commit();
+ assertEquals(0, executeSolrQuery(cloudClient, "*:*").getNumFound());
+ assertEquals(0, executeSolrQuery(cloudClient, "*:*").size());
fs.delete(inDir, true);
fs.delete(dataDir, true);
assertTrue(fs.mkdirs(dataDir));
@@ -543,8 +619,7 @@ public class MorphlineGoLiveMiniMRTest e
checkConsistency(replicatedCollection);
- results = cloudClient.query(new SolrQuery("*:*"));
- assertEquals(2104, results.getResults().getNumFound());
+ assertEquals(RECORD_COUNT, executeSolrQuery(cloudClient, "*:*").size());
}
}
@@ -555,6 +630,12 @@ public class MorphlineGoLiveMiniMRTest e
args.add(cloudJettys.get(i).url);
}
}
+
+ private SolrDocumentList executeSolrQuery(SolrServer collection, String queryString) throws SolrServerException {
+ SolrQuery query = new SolrQuery(queryString).setRows(2 * RECORD_COUNT).addSort("id", ORDER.asc);
+ QueryResponse response = collection.query(query);
+ return response.getResults();
+ }
private void checkConsistency(String replicatedCollection)
throws SolrServerException {
Modified: lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineReducerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineReducerTest.java?rev=1558541&r1=1558540&r2=1558541&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineReducerTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineReducerTest.java Wed Jan 15 19:33:21 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.lucene.util.Constants;
import org.apache.solr.common.SolrInputDocument;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -44,10 +45,17 @@ import com.google.common.collect.Lists;
public class MorphlineReducerTest extends MRUnitBase {
@BeforeClass
- public static void beforeClass() {
+ public static void beforeClass2() {
assumeFalse("Does not work on Windows, because it uses UNIX shell commands or POSIX paths", Constants.WINDOWS);
assumeFalse("FIXME: This test fails under Java 8 due to the Saxon dependency - see SOLR-1301", Constants.JRE_IS_MINIMUM_JAVA8);
assumeFalse("FIXME: This test fails under J9 due to the Saxon dependency - see SOLR-1301", System.getProperty("java.vm.info", "<?>").contains("IBM J9"));
+
+ System.setProperty("verifyPartitionAssignment", "false");
+ }
+
+ @AfterClass
+ public static void afterClass2() {
+ System.clearProperty("verifyPartitionAssignment");
}
public static class MySolrReducer extends SolrReducer {
@@ -89,28 +97,35 @@ public class MorphlineReducerTest extend
@Test
public void testReducer() throws Exception {
MySolrReducer myReducer = new MySolrReducer();
- ReduceDriver<Text, SolrInputDocumentWritable, Text, SolrInputDocumentWritable> reduceDriver = ReduceDriver.newReduceDriver(myReducer);
-
- Configuration config = reduceDriver.getConfiguration();
- setupHadoopConfig(config);
-
- List<SolrInputDocumentWritable> values = new ArrayList<SolrInputDocumentWritable>();
- SolrInputDocument sid = new SolrInputDocument();
- String id = "myid1";
- sid.addField("id", id);
- sid.addField("text", "some unique text");
- SolrInputDocumentWritable sidw = new SolrInputDocumentWritable(sid);
- values.add(sidw);
- reduceDriver.withInput(new Text(id), values);
-
- reduceDriver.withCacheArchive(solrHomeZip.getAbsolutePath());
-
- reduceDriver.withOutputFormat(SolrOutputFormat.class, NullInputFormat.class);
-
- reduceDriver.run();
-
- assertEquals("Expected 1 counter increment", 1, reduceDriver.getCounters()
- .findCounter(SolrCounters.class.getName(), SolrCounters.DOCUMENTS_WRITTEN.toString()).getValue());
+ try {
+ ReduceDriver<Text,SolrInputDocumentWritable,Text,SolrInputDocumentWritable> reduceDriver = ReduceDriver
+ .newReduceDriver(myReducer);
+
+ Configuration config = reduceDriver.getConfiguration();
+ setupHadoopConfig(config);
+
+ List<SolrInputDocumentWritable> values = new ArrayList<SolrInputDocumentWritable>();
+ SolrInputDocument sid = new SolrInputDocument();
+ String id = "myid1";
+ sid.addField("id", id);
+ sid.addField("text", "some unique text");
+ SolrInputDocumentWritable sidw = new SolrInputDocumentWritable(sid);
+ values.add(sidw);
+ reduceDriver.withInput(new Text(id), values);
+
+ reduceDriver.withCacheArchive(solrHomeZip.getAbsolutePath());
+
+ reduceDriver.withOutputFormat(SolrOutputFormat.class,
+ NullInputFormat.class);
+
+ reduceDriver.run();
+
+ assertEquals("Expected 1 counter increment", 1,
+ reduceDriver.getCounters().findCounter(SolrCounters.class.getName(),
+ SolrCounters.DOCUMENTS_WRITTEN.toString()).getValue());
+ } finally {
+ myReducer.cleanup(myReducer.context);
+ }
}
}