You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sa...@apache.org on 2017/03/24 16:37:07 UTC
[26/62] lucene-solr:master: SOLR-9221: Remove Solr contribs:
map-reduce, morphlines-core and morphlines-cell
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53e5f34f/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineGoLiveMiniMRTest.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineGoLiveMiniMRTest.java b/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineGoLiveMiniMRTest.java
deleted file mode 100644
index adc8d88..0000000
--- a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineGoLiveMiniMRTest.java
+++ /dev/null
@@ -1,881 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.UnsupportedEncodingException;
-import java.io.Writer;
-import java.lang.reflect.Array;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.apache.hadoop.util.JarFinder;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.lucene.util.Constants;
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
-import org.apache.lucene.util.LuceneTestCase.Slow;
-import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrQuery.ORDER;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.embedded.JettySolrRunner;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
-import org.apache.solr.cloud.AbstractZkTestCase;
-import org.apache.solr.common.SolrDocument;
-import org.apache.solr.common.SolrDocumentList;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.params.CollectionParams.CollectionAction;
-import org.apache.solr.common.params.CoreAdminParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.hadoop.hack.MiniMRClientCluster;
-import org.apache.solr.hadoop.hack.MiniMRClientClusterFactory;
-import org.apache.solr.morphlines.solr.AbstractSolrMorphlineTestBase;
-import org.apache.solr.util.BadHdfsThreadsFilter;
-import org.apache.solr.util.BadMrClusterThreadsFilter;
-import org.apache.solr.util.TimeOut;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.carrotsearch.randomizedtesting.annotations.Nightly;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction.Action;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakZombies;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakZombies.Consequence;
-
-@ThreadLeakAction({Action.WARN})
-@ThreadLeakLingering(linger = 0)
-@ThreadLeakZombies(Consequence.CONTINUE)
-@ThreadLeakFilters(defaultFilters = true, filters = {
- BadHdfsThreadsFilter.class, BadMrClusterThreadsFilter.class // hdfs currently leaks thread(s)
-})
-@SuppressSSL // SSL does not work with this test for currently unknown reasons
-@Slow
-@Nightly
-@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-9076")
-public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
-
- private static final int RECORD_COUNT = 2104;
- private static final String RESOURCES_DIR = getFile("morphlines-core.marker").getParent();
- private static final String DOCUMENTS_DIR = RESOURCES_DIR + "/test-documents";
- private static final File MINIMR_INSTANCE_DIR = new File(RESOURCES_DIR + "/solr/minimr");
- private static final File MINIMR_CONF_DIR = new File(RESOURCES_DIR + "/solr/minimr");
-
- private static String SEARCH_ARCHIVES_JAR;
-
- private static MiniDFSCluster dfsCluster = null;
- private static MiniMRClientCluster mrCluster = null;
- private static String tempDir;
-
- private final String inputAvroFile1;
- private final String inputAvroFile2;
- private final String inputAvroFile3;
-
- private static File solrHomeDirectory;
-
- @Override
- public String getSolrHome() {
- return solrHomeDirectory.getPath();
- }
-
- public MorphlineGoLiveMiniMRTest() {
- this.inputAvroFile1 = "sample-statuses-20120521-100919.avro";
- this.inputAvroFile2 = "sample-statuses-20120906-141433.avro";
- this.inputAvroFile3 = "sample-statuses-20120906-141433-medium.avro";
-
- sliceCount = TEST_NIGHTLY ? 5 : 3;
- fixShardCount(TEST_NIGHTLY ? 5 : 3);
- }
-
- @BeforeClass
- public static void setupClass() throws Exception {
- System.setProperty("solr.hdfs.blockcache.global", Boolean.toString(LuceneTestCase.random().nextBoolean()));
- System.setProperty("solr.hdfs.blockcache.enabled", Boolean.toString(LuceneTestCase.random().nextBoolean()));
- System.setProperty("solr.hdfs.blockcache.blocksperbank", "2048");
-
- solrHomeDirectory = createTempDir().toFile();
-
- assumeFalse("HDFS tests were disabled by -Dtests.disableHdfs",
- Boolean.parseBoolean(System.getProperty("tests.disableHdfs", "false")));
-
- assumeFalse("FIXME: This test does not work with Windows because of native library requirements", Constants.WINDOWS);
-
- AbstractZkTestCase.SOLRHOME = solrHomeDirectory;
- FileUtils.copyDirectory(MINIMR_INSTANCE_DIR, AbstractZkTestCase.SOLRHOME);
- tempDir = createTempDir().toFile().getAbsolutePath();
-
- new File(tempDir).mkdirs();
-
- FileUtils.copyFile(new File(RESOURCES_DIR + "/custom-mimetypes.xml"), new File(tempDir + "/custom-mimetypes.xml"));
-
- AbstractSolrMorphlineTestBase.setupMorphline(tempDir, "test-morphlines/solrCellDocumentTypes", true);
-
-
- System.setProperty("hadoop.log.dir", new File(tempDir, "logs").getAbsolutePath());
-
- int dataNodes = 2;
-
- JobConf conf = new JobConf();
- conf.set("dfs.block.access.token.enable", "false");
- conf.set("dfs.permissions", "true");
- conf.set("hadoop.security.authentication", "simple");
- conf.set("mapreduce.jobhistory.minicluster.fixed.ports", "false");
- conf.set("mapreduce.jobhistory.admin.address", "0.0.0.0:0");
-
- conf.set(YarnConfiguration.NM_LOCAL_DIRS, tempDir + File.separator + "nm-local-dirs");
- conf.set(YarnConfiguration.DEFAULT_NM_LOG_DIRS, tempDir + File.separator + "nm-logs");
-
-
- new File(tempDir + File.separator + "nm-local-dirs").mkdirs();
-
- System.setProperty("test.build.dir", tempDir + File.separator + "hdfs" + File.separator + "test-build-dir");
- System.setProperty("test.build.data", tempDir + File.separator + "hdfs" + File.separator + "build");
- System.setProperty("test.cache.data", tempDir + File.separator + "hdfs" + File.separator + "cache");
-
- // Initialize AFTER test.build.dir is set, JarFinder uses it.
- SEARCH_ARCHIVES_JAR = JarFinder.getJar(MapReduceIndexerTool.class);
-
- dfsCluster = new MiniDFSCluster(conf, dataNodes, true, null);
- FileSystem fileSystem = dfsCluster.getFileSystem();
- fileSystem.mkdirs(new Path("/tmp"));
- fileSystem.mkdirs(new Path("/user"));
- fileSystem.mkdirs(new Path("/hadoop/mapred/system"));
- fileSystem.setPermission(new Path("/tmp"),
- FsPermission.valueOf("-rwxrwxrwx"));
- fileSystem.setPermission(new Path("/user"),
- FsPermission.valueOf("-rwxrwxrwx"));
- fileSystem.setPermission(new Path("/hadoop/mapred/system"),
- FsPermission.valueOf("-rwx------"));
-
- mrCluster = MiniMRClientClusterFactory.create(MorphlineGoLiveMiniMRTest.class, 1, conf, new File(tempDir, "mrCluster"));
-
- //new MiniMRCluster(0, 0, taskTrackers, nnURI, numDirs, racks,
- //hosts, null, conf);
-
- ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
- }
-
- @Override
- public void distribSetUp() throws Exception {
- super.distribSetUp();
- System.setProperty("host", "127.0.0.1");
- System.setProperty("numShards", Integer.toString(sliceCount));
- URI uri = dfsCluster.getFileSystem().getUri();
- System.setProperty("solr.hdfs.home", uri.toString() + "/" + this.getClass().getName());
- uploadConfFiles();
- }
-
- @Override
- public void distribTearDown() throws Exception {
- super.distribTearDown();
- System.clearProperty("host");
- System.clearProperty("numShards");
- System.clearProperty("solr.hdfs.home");
- }
-
- @AfterClass
- public static void teardownClass() throws Exception {
- System.clearProperty("solr.hdfs.blockcache.global");
- System.clearProperty("solr.hdfs.blockcache.blocksperbank");
- System.clearProperty("solr.hdfs.blockcache.enabled");
- System.clearProperty("hadoop.log.dir");
- System.clearProperty("test.build.dir");
- System.clearProperty("test.build.data");
- System.clearProperty("test.cache.data");
-
- if (mrCluster != null) {
- mrCluster.stop();
- mrCluster = null;
- }
- if (dfsCluster != null) {
- dfsCluster.shutdown();
- dfsCluster = null;
- }
- FileSystem.closeAll();
- }
-
- private JobConf getJobConf() throws IOException {
- JobConf jobConf = new JobConf(mrCluster.getConfig());
- return jobConf;
- }
-
- @Test
- public void testBuildShardUrls() throws Exception {
- // 2x3
- Integer numShards = 2;
- List<Object> urls = new ArrayList<>();
- urls.add("shard1");
- urls.add("shard2");
- urls.add("shard3");
- urls.add("shard4");
- urls.add("shard5");
- urls.add("shard6");
- List<List<String>> shardUrls = MapReduceIndexerTool.buildShardUrls(urls , numShards);
-
- assertEquals(shardUrls.toString(), 2, shardUrls.size());
-
- for (List<String> u : shardUrls) {
- assertEquals(3, u.size());
- }
-
- // 1x6
- numShards = 1;
- shardUrls = MapReduceIndexerTool.buildShardUrls(urls , numShards);
-
- assertEquals(shardUrls.toString(), 1, shardUrls.size());
-
- for (List<String> u : shardUrls) {
- assertEquals(6, u.size());
- }
-
- // 6x1
- numShards = 6;
- shardUrls = MapReduceIndexerTool.buildShardUrls(urls , numShards);
-
- assertEquals(shardUrls.toString(), 6, shardUrls.size());
-
- for (List<String> u : shardUrls) {
- assertEquals(1, u.size());
- }
-
- // 3x2
- numShards = 3;
- shardUrls = MapReduceIndexerTool.buildShardUrls(urls , numShards);
-
- assertEquals(shardUrls.toString(), 3, shardUrls.size());
-
- for (List<String> u : shardUrls) {
- assertEquals(2, u.size());
- }
-
- // null shards, 6x1
- numShards = null;
- shardUrls = MapReduceIndexerTool.buildShardUrls(urls , numShards);
-
- assertEquals(shardUrls.toString(), 6, shardUrls.size());
-
- for (List<String> u : shardUrls) {
- assertEquals(1, u.size());
- }
-
- // null shards 3x1
- numShards = null;
-
- urls = new ArrayList<>();
- urls.add("shard1");
- urls.add("shard2");
- urls.add("shard3");
-
- shardUrls = MapReduceIndexerTool.buildShardUrls(urls , numShards);
-
- assertEquals(shardUrls.toString(), 3, shardUrls.size());
-
- for (List<String> u : shardUrls) {
- assertEquals(1, u.size());
- }
-
- // 2x(2,3) off balance
- numShards = 2;
- urls = new ArrayList<>();
- urls.add("shard1");
- urls.add("shard2");
- urls.add("shard3");
- urls.add("shard4");
- urls.add("shard5");
- shardUrls = MapReduceIndexerTool.buildShardUrls(urls , numShards);
-
- assertEquals(shardUrls.toString(), 2, shardUrls.size());
-
- Set<Integer> counts = new HashSet<>();
- counts.add(shardUrls.get(0).size());
- counts.add(shardUrls.get(1).size());
-
- assertTrue(counts.contains(2));
- assertTrue(counts.contains(3));
- }
-
- private String[] prependInitialArgs(String[] args) {
- String[] head = new String[] {
- "--morphline-file=" + tempDir + "/test-morphlines/solrCellDocumentTypes.conf",
- "--morphline-id=morphline1",
- };
- return concat(head, args);
- }
-
- @Nightly
- @Test
- public void test() throws Exception {
-
- waitForRecoveriesToFinish(false);
-
- FileSystem fs = dfsCluster.getFileSystem();
- Path inDir = fs.makeQualified(new Path(
- "/user/testing/testMapperReducer/input"));
- fs.delete(inDir, true);
- String DATADIR = "/user/testing/testMapperReducer/data";
- Path dataDir = fs.makeQualified(new Path(DATADIR));
- fs.delete(dataDir, true);
- Path outDir = fs.makeQualified(new Path(
- "/user/testing/testMapperReducer/output"));
- fs.delete(outDir, true);
-
- assertTrue(fs.mkdirs(inDir));
- Path INPATH = upAvroFile(fs, inDir, DATADIR, dataDir, inputAvroFile1);
-
- JobConf jobConf = getJobConf();
- jobConf.set("jobclient.output.filter", "ALL");
- // enable mapred.job.tracker = local to run in debugger and set breakpoints
- // jobConf.set("mapred.job.tracker", "local");
- jobConf.setMaxMapAttempts(1);
- jobConf.setMaxReduceAttempts(1);
- jobConf.setJar(SEARCH_ARCHIVES_JAR);
-
- MapReduceIndexerTool tool;
- int res;
- QueryResponse results;
- String[] args = new String[]{};
- List<String> argList = new ArrayList<>();
-
- try (HttpSolrClient server = getHttpSolrClient(cloudJettys.get(0).url)) {
-
- args = new String[]{
- "--solr-home-dir=" + MINIMR_CONF_DIR.getAbsolutePath(),
- "--output-dir=" + outDir.toString(),
- "--log4j=" + getFile("log4j.properties").getAbsolutePath(),
- "--mappers=3",
- random().nextBoolean() ? "--input-list=" + INPATH.toString() : dataDir.toString(),
- "--go-live-threads", Integer.toString(random().nextInt(15) + 1),
- "--verbose",
- "--go-live"
- };
- args = prependInitialArgs(args);
- getShardUrlArgs(argList);
- args = concat(args, argList.toArray(new String[0]));
-
- if (true) {
- tool = new MapReduceIndexerTool();
- res = ToolRunner.run(jobConf, tool, args);
- assertEquals(0, res);
- assertTrue(tool.job.isComplete());
- assertTrue(tool.job.isSuccessful());
- results = server.query(new SolrQuery("*:*"));
- assertEquals(20, results.getResults().getNumFound());
- }
-
- fs.delete(inDir, true);
- fs.delete(outDir, true);
- fs.delete(dataDir, true);
- assertTrue(fs.mkdirs(inDir));
- INPATH = upAvroFile(fs, inDir, DATADIR, dataDir, inputAvroFile2);
-
- args = new String[]{
- "--solr-home-dir=" + MINIMR_CONF_DIR.getAbsolutePath(),
- "--output-dir=" + outDir.toString(),
- "--mappers=3",
- "--verbose",
- "--go-live",
- random().nextBoolean() ? "--input-list=" + INPATH.toString() : dataDir.toString(),
- "--go-live-threads", Integer.toString(random().nextInt(15) + 1)
- };
- args = prependInitialArgs(args);
-
- getShardUrlArgs(argList);
- args = concat(args, argList.toArray(new String[0]));
-
- if (true) {
- tool = new MapReduceIndexerTool();
- res = ToolRunner.run(jobConf, tool, args);
- assertEquals(0, res);
- assertTrue(tool.job.isComplete());
- assertTrue(tool.job.isSuccessful());
- results = server.query(new SolrQuery("*:*"));
-
- assertEquals(22, results.getResults().getNumFound());
- }
-
- // try using zookeeper
- String collection = "collection1";
- if (random().nextBoolean()) {
- // sometimes, use an alias
- createAlias("updatealias", "collection1");
- collection = "updatealias";
- }
-
- fs.delete(inDir, true);
- fs.delete(outDir, true);
- fs.delete(dataDir, true);
- INPATH = upAvroFile(fs, inDir, DATADIR, dataDir, inputAvroFile3);
-
- cloudClient.deleteByQuery("*:*");
- cloudClient.commit();
- assertEquals(0, cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound());
-
- args = new String[]{
- "--output-dir=" + outDir.toString(),
- "--mappers=3",
- "--reducers=6",
- "--fanout=2",
- "--verbose",
- "--go-live",
- random().nextBoolean() ? "--input-list=" + INPATH.toString() : dataDir.toString(),
- "--zk-host", zkServer.getZkAddress(),
- "--collection", collection
- };
- args = prependInitialArgs(args);
-
- if (true) {
- tool = new MapReduceIndexerTool();
- res = ToolRunner.run(jobConf, tool, args);
- assertEquals(0, res);
- assertTrue(tool.job.isComplete());
- assertTrue(tool.job.isSuccessful());
-
- SolrDocumentList resultDocs = executeSolrQuery(cloudClient, "*:*");
- assertEquals(RECORD_COUNT, resultDocs.getNumFound());
- assertEquals(RECORD_COUNT, resultDocs.size());
-
- // perform updates
- for (int i = 0; i < RECORD_COUNT; i++) {
- SolrDocument doc = resultDocs.get(i);
- SolrInputDocument update = new SolrInputDocument();
- for (Map.Entry<String, Object> entry : doc.entrySet()) {
- update.setField(entry.getKey(), entry.getValue());
- }
- update.setField("user_screen_name", "Nadja" + i);
- update.removeField("_version_");
- cloudClient.add(update);
- }
- cloudClient.commit();
-
- // verify updates
- SolrDocumentList resultDocs2 = executeSolrQuery(cloudClient, "*:*");
- assertEquals(RECORD_COUNT, resultDocs2.getNumFound());
- assertEquals(RECORD_COUNT, resultDocs2.size());
- for (int i = 0; i < RECORD_COUNT; i++) {
- SolrDocument doc = resultDocs.get(i);
- SolrDocument doc2 = resultDocs2.get(i);
- assertEquals(doc.getFirstValue("id"), doc2.getFirstValue("id"));
- assertEquals("Nadja" + i, doc2.getFirstValue("user_screen_name"));
- assertEquals(doc.getFirstValue("text"), doc2.getFirstValue("text"));
-
- // perform delete
- cloudClient.deleteById((String) doc.getFirstValue("id"));
- }
- cloudClient.commit();
-
- // verify deletes
- assertEquals(0, executeSolrQuery(cloudClient, "*:*").size());
- }
-
- cloudClient.deleteByQuery("*:*");
- cloudClient.commit();
- assertEquals(0, cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound());
- }
-
- // try using zookeeper with replication
- String replicatedCollection = "replicated_collection";
- if (TEST_NIGHTLY) {
- createCollection(replicatedCollection, 3, 3, 3);
- } else {
- createCollection(replicatedCollection, 2, 3, 2);
- }
- waitForRecoveriesToFinish(false);
- cloudClient.setDefaultCollection(replicatedCollection);
- fs.delete(inDir, true);
- fs.delete(outDir, true);
- fs.delete(dataDir, true);
- assertTrue(fs.mkdirs(dataDir));
- INPATH = upAvroFile(fs, inDir, DATADIR, dataDir, inputAvroFile3);
-
- args = new String[] {
- "--solr-home-dir=" + MINIMR_CONF_DIR.getAbsolutePath(),
- "--output-dir=" + outDir.toString(),
- "--mappers=3",
- "--reducers=12",
- "--fanout=2",
- "--verbose",
- "--go-live",
- "--zk-host", zkServer.getZkAddress(),
- "--collection", replicatedCollection, dataDir.toString()
- };
- args = prependInitialArgs(args);
-
- if (true) {
- tool = new MapReduceIndexerTool();
- res = ToolRunner.run(jobConf, tool, args);
- assertEquals(0, res);
- assertTrue(tool.job.isComplete());
- assertTrue(tool.job.isSuccessful());
-
- SolrDocumentList resultDocs = executeSolrQuery(cloudClient, "*:*");
- assertEquals(RECORD_COUNT, resultDocs.getNumFound());
- assertEquals(RECORD_COUNT, resultDocs.size());
-
- checkConsistency(replicatedCollection);
-
- // perform updates
- for (int i = 0; i < RECORD_COUNT; i++) {
- SolrDocument doc = resultDocs.get(i);
- SolrInputDocument update = new SolrInputDocument();
- for (Map.Entry<String, Object> entry : doc.entrySet()) {
- update.setField(entry.getKey(), entry.getValue());
- }
- update.setField("user_screen_name", "@Nadja" + i);
- update.removeField("_version_");
- cloudClient.add(update);
- }
- cloudClient.commit();
-
- // verify updates
- SolrDocumentList resultDocs2 = executeSolrQuery(cloudClient, "*:*");
- assertEquals(RECORD_COUNT, resultDocs2.getNumFound());
- assertEquals(RECORD_COUNT, resultDocs2.size());
- for (int i = 0; i < RECORD_COUNT; i++) {
- SolrDocument doc = resultDocs.get(i);
- SolrDocument doc2 = resultDocs2.get(i);
- assertEquals(doc.getFieldValues("id"), doc2.getFieldValues("id"));
- assertEquals(1, doc.getFieldValues("id").size());
- assertEquals(Arrays.asList("@Nadja" + i), doc2.getFieldValues("user_screen_name"));
- assertEquals(doc.getFieldValues("text"), doc2.getFieldValues("text"));
-
- // perform delete
- cloudClient.deleteById((String)doc.getFirstValue("id"));
- }
- cloudClient.commit();
-
- // verify deletes
- assertEquals(0, executeSolrQuery(cloudClient, "*:*").size());
- }
-
- // try using solr_url with replication
- cloudClient.deleteByQuery("*:*");
- cloudClient.commit();
- assertEquals(0, executeSolrQuery(cloudClient, "*:*").getNumFound());
- assertEquals(0, executeSolrQuery(cloudClient, "*:*").size());
- fs.delete(inDir, true);
- fs.delete(dataDir, true);
- assertTrue(fs.mkdirs(dataDir));
- INPATH = upAvroFile(fs, inDir, DATADIR, dataDir, inputAvroFile3);
-
- args = new String[] {
- "--solr-home-dir=" + MINIMR_CONF_DIR.getAbsolutePath(),
- "--output-dir=" + outDir.toString(),
- "--shards", "2",
- "--mappers=3",
- "--verbose",
- "--go-live",
- "--go-live-threads", Integer.toString(random().nextInt(15) + 1), dataDir.toString()
- };
- args = prependInitialArgs(args);
-
- argList = new ArrayList<>();
- getShardUrlArgs(argList, replicatedCollection);
- args = concat(args, argList.toArray(new String[0]));
-
- if (true) {
- tool = new MapReduceIndexerTool();
- res = ToolRunner.run(jobConf, tool, args);
- assertEquals(0, res);
- assertTrue(tool.job.isComplete());
- assertTrue(tool.job.isSuccessful());
-
- checkConsistency(replicatedCollection);
-
- assertEquals(RECORD_COUNT, executeSolrQuery(cloudClient, "*:*").size());
- }
-
- // delete collection
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set("action", CollectionAction.DELETE.toString());
- params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
- params.set(CoreAdminParams.DELETE_DATA_DIR, true);
- params.set(CoreAdminParams.DELETE_INDEX, true);
- params.set("name", replicatedCollection);
- QueryRequest request = new QueryRequest(params);
- request.setPath("/admin/collections");
- cloudClient.request(request);
-
- final TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS);
- while (cloudClient.getZkStateReader().getClusterState().hasCollection(replicatedCollection)) {
- if (timeout.hasTimedOut()) {
- throw new AssertionError("Timeout waiting to see removed collection leave clusterstate");
- }
-
- Thread.sleep(200);
- }
-
- if (TEST_NIGHTLY) {
- createCollection(replicatedCollection, 3, 3, 3);
- } else {
- createCollection(replicatedCollection, 2, 3, 2);
- }
-
- waitForRecoveriesToFinish(replicatedCollection, false);
- printLayout();
- assertEquals(0, executeSolrQuery(cloudClient, "*:*").getNumFound());
-
-
- args = new String[] {
- "--solr-home-dir=" + MINIMR_CONF_DIR.getAbsolutePath(),
- "--output-dir=" + outDir.toString(),
- "--shards", "2",
- "--mappers=3",
- "--verbose",
- "--go-live",
- "--go-live-threads", Integer.toString(random().nextInt(15) + 1), dataDir.toString()
- };
- args = prependInitialArgs(args);
-
- argList = new ArrayList<>();
- getShardUrlArgs(argList, replicatedCollection);
- args = concat(args, argList.toArray(new String[0]));
-
- tool = new MapReduceIndexerTool();
- res = ToolRunner.run(jobConf, tool, args);
- assertEquals(0, res);
- assertTrue(tool.job.isComplete());
- assertTrue(tool.job.isSuccessful());
-
- checkConsistency(replicatedCollection);
-
- assertEquals(RECORD_COUNT, executeSolrQuery(cloudClient, "*:*").size());
- }
-
- private void getShardUrlArgs(List<String> args) {
- for (int i = 0; i < getShardCount(); i++) {
- args.add("--shard-url");
- args.add(cloudJettys.get(i).url);
- }
- }
-
- private SolrDocumentList executeSolrQuery(SolrClient collection, String queryString) throws SolrServerException, IOException {
- SolrQuery query = new SolrQuery(queryString).setRows(2 * RECORD_COUNT).addSort("id", ORDER.asc);
- QueryResponse response = collection.query(query);
- return response.getResults();
- }
-
- private void checkConsistency(String replicatedCollection)
- throws Exception {
- Collection<Slice> slices = cloudClient.getZkStateReader().getClusterState()
- .getSlices(replicatedCollection);
- for (Slice slice : slices) {
- Collection<Replica> replicas = slice.getReplicas();
- long found = -1;
- for (Replica replica : replicas) {
- try (HttpSolrClient client = getHttpSolrClient(new ZkCoreNodeProps(replica).getCoreUrl())) {
- SolrQuery query = new SolrQuery("*:*");
- query.set("distrib", false);
- QueryResponse replicaResults = client.query(query);
- long count = replicaResults.getResults().getNumFound();
- if (found != -1) {
- assertEquals(slice.getName() + " is inconsistent "
- + new ZkCoreNodeProps(replica).getCoreUrl(), found, count);
- }
- found = count;
- }
- }
- }
- }
-
- private void getShardUrlArgs(List<String> args, String replicatedCollection) {
- Collection<Slice> slices = cloudClient.getZkStateReader().getClusterState().getSlices(replicatedCollection);
- for (Slice slice : slices) {
- Collection<Replica> replicas = slice.getReplicas();
- for (Replica replica : replicas) {
- args.add("--shard-url");
- args.add(new ZkCoreNodeProps(replica).getCoreUrl());
- }
- }
- }
-
- private Path upAvroFile(FileSystem fs, Path inDir, String DATADIR,
- Path dataDir, String localFile) throws IOException, UnsupportedEncodingException {
- Path INPATH = new Path(inDir, "input.txt");
- OutputStream os = fs.create(INPATH);
- Writer wr = new OutputStreamWriter(os, StandardCharsets.UTF_8);
- wr.write(DATADIR + File.separator + localFile);
- wr.close();
-
- assertTrue(fs.mkdirs(dataDir));
- fs.copyFromLocalFile(new Path(DOCUMENTS_DIR, localFile), dataDir);
- return INPATH;
- }
-
- @Override
- public JettySolrRunner createJetty(File solrHome, String dataDir,
- String shardList, String solrConfigOverride, String schemaOverride)
- throws Exception {
-
- Properties props = new Properties();
- if (solrConfigOverride != null)
- props.setProperty("solrconfig", solrConfigOverride);
- if (schemaOverride != null)
- props.setProperty("schema", schemaOverride);
- if (shardList != null)
- props.setProperty("shards", shardList);
-
- String collection = System.getProperty("collection");
- if (collection == null)
- collection = "collection1";
- props.setProperty("collection", collection);
-
- JettySolrRunner jetty = new JettySolrRunner(solrHome.getAbsolutePath(), props, buildJettyConfig(context));
- jetty.start();
-
- return jetty;
- }
-
- private static void putConfig(SolrZkClient zkClient, File solrhome, String name) throws Exception {
- putConfig(zkClient, solrhome, name, name);
- }
-
- private static void putConfig(SolrZkClient zkClient, File solrhome, String srcName, String destName)
- throws Exception {
-
- File file = new File(solrhome, "conf" + File.separator + srcName);
- if (!file.exists()) {
- // LOG.info("skipping " + file.getAbsolutePath() +
- // " because it doesn't exist");
- return;
- }
-
- String destPath = "/configs/conf1/" + destName;
- // LOG.info("put " + file.getAbsolutePath() + " to " + destPath);
- zkClient.makePath(destPath, file, false, true);
- }
-
- private void uploadConfFiles() throws Exception {
- // upload our own config files
- SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(), 10000);
- putConfig(zkClient, new File(RESOURCES_DIR + "/solr/solrcloud"),
- "solrconfig.xml");
- putConfig(zkClient, MINIMR_CONF_DIR, "schema.xml");
- putConfig(zkClient, MINIMR_CONF_DIR, "elevate.xml");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_en.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_ar.txt");
-
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_bg.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_ca.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_cz.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_da.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_el.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_es.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_eu.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_de.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_fa.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_fi.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_fr.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_ga.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_gl.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_hi.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_hu.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_hy.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_id.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_it.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_ja.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_lv.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_nl.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_no.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_pt.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_ro.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_ru.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_sv.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_th.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stopwords_tr.txt");
-
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/contractions_ca.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/contractions_fr.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/contractions_ga.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/contractions_it.txt");
-
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/stemdict_nl.txt");
-
- putConfig(zkClient, MINIMR_CONF_DIR, "lang/hyphenations_ga.txt");
-
- putConfig(zkClient, MINIMR_CONF_DIR, "stopwords.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "protwords.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "currency.xml");
- putConfig(zkClient, MINIMR_CONF_DIR, "open-exchange-rates.json");
- putConfig(zkClient, MINIMR_CONF_DIR, "mapping-ISOLatin1Accent.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "old_synonyms.txt");
- putConfig(zkClient, MINIMR_CONF_DIR, "synonyms.txt");
- zkClient.close();
- }
-
- protected static <T> T[] concat(T[]... arrays) {
- if (arrays.length <= 0) {
- throw new IllegalArgumentException();
- }
- Class clazz = null;
- int length = 0;
- for (T[] array : arrays) {
- clazz = array.getClass();
- length += array.length;
- }
- T[] result = (T[]) Array.newInstance(clazz.getComponentType(), length);
- int pos = 0;
- for (T[] array : arrays) {
- System.arraycopy(array, 0, result, pos, array.length);
- pos += array.length;
- }
- return result;
- }
-
- private NamedList<Object> createAlias(String alias, String collections) throws SolrServerException, IOException {
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set("collections", collections);
- params.set("name", alias);
- params.set("action", CollectionAction.CREATEALIAS.toString());
- QueryRequest request = new QueryRequest(params);
- request.setPath("/admin/collections");
- return cloudClient.request(request);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53e5f34f/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineMapperTest.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineMapperTest.java b/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineMapperTest.java
deleted file mode 100644
index 4f93a66..0000000
--- a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineMapperTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import java.net.URLEncoder;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.apache.hadoop.mrunit.types.Pair;
-import org.apache.lucene.util.Constants;
-import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.hadoop.morphline.MorphlineMapper;
-import org.apache.solr.util.BadHdfsThreadsFilter;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
-
-@ThreadLeakFilters(defaultFilters = true, filters = {
- BadHdfsThreadsFilter.class // hdfs currently leaks thread(s)
-})
-@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-9220")
-public class MorphlineMapperTest extends MRUnitBase {
-
- @BeforeClass
- public static void beforeClass() {
- assumeFalse("Does not work on Windows, because it uses UNIX shell commands or POSIX paths", Constants.WINDOWS);
- }
-
- @Test
- public void testMapper() throws Exception {
- MorphlineMapper mapper = new MorphlineMapper();
- MapDriver<LongWritable, Text, Text, SolrInputDocumentWritable> mapDriver = MapDriver.newMapDriver(mapper);;
-
- Configuration config = mapDriver.getConfiguration();
- setupHadoopConfig(config);
-
- mapDriver.withInput(new LongWritable(0L), new Text("hdfs://localhost/" +
- URLEncoder.encode(DOCUMENTS_DIR, "UTF-8").replace("+", "%20") +
- "/sample-statuses-20120906-141433.avro"));
-
- SolrInputDocument sid = new SolrInputDocument();
- sid.addField("id", "uniqueid1");
- sid.addField("user_name", "user1");
- sid.addField("text", "content of record one");
- SolrInputDocumentWritable sidw = new SolrInputDocumentWritable(sid);
-
- mapDriver
- .withCacheArchive(solrHomeZip.getAbsolutePath())
- .withOutput(new Text("0"), sidw);
- //mapDriver.runTest();
- List<Pair<Text, SolrInputDocumentWritable>> result = mapDriver.run();
- for (Pair<Text, SolrInputDocumentWritable> p: result) {
- System.out.println(p.getFirst());
- System.out.println(p.getSecond());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53e5f34f/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineReducerTest.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineReducerTest.java b/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineReducerTest.java
deleted file mode 100644
index 31616d8..0000000
--- a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/MorphlineReducerTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.TaskID;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
-import org.apache.lucene.util.Constants;
-import org.apache.solr.common.SolrInputDocument;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import com.google.common.collect.Lists;
-
-@Ignore("This test cannot currently work because it uses a local filesystem output path for the indexes and Solr requires hdfs output paths")
-public class MorphlineReducerTest extends MRUnitBase {
-
- @BeforeClass
- public static void beforeClass2() {
- assumeFalse("Does not work on Windows, because it uses UNIX shell commands or POSIX paths", Constants.WINDOWS);
-
- System.setProperty("verifyPartitionAssignment", "false");
- }
-
- @AfterClass
- public static void afterClass2() {
- System.clearProperty("verifyPartitionAssignment");
- }
-
- public static class MySolrReducer extends SolrReducer {
- Context context;
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- this.context = context;
-
- // handle a bug in MRUnit - should be fixed in MRUnit 1.0.0
- when(context.getTaskAttemptID()).thenAnswer(new Answer<TaskAttemptID>() {
- @Override
- public TaskAttemptID answer(final InvocationOnMock invocation) {
- // FIXME MRUNIT seems to pass taskid to the reduce task as mapred.TaskID rather than mapreduce.TaskID
- return new TaskAttemptID(new TaskID("000000000000", 0, true, 0), 0);
- }
- });
-
- super.setup(context);
- }
-
- }
-
- public static class NullInputFormat<K, V> extends InputFormat<K, V> {
- @Override
- public List<InputSplit> getSplits(JobContext context) throws IOException,
- InterruptedException {
- return Lists.newArrayList();
- }
-
- @Override
- public RecordReader<K, V> createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException, InterruptedException {
- return null;
- }
-
- }
-
- @Test
- public void testReducer() throws Exception {
- MySolrReducer myReducer = new MySolrReducer();
- try {
- ReduceDriver<Text,SolrInputDocumentWritable,Text,SolrInputDocumentWritable> reduceDriver = ReduceDriver
- .newReduceDriver(myReducer);
-
- Configuration config = reduceDriver.getConfiguration();
- setupHadoopConfig(config);
-
- List<SolrInputDocumentWritable> values = new ArrayList<>();
- SolrInputDocument sid = new SolrInputDocument();
- String id = "myid1";
- sid.addField("id", id);
- sid.addField("text", "some unique text");
- SolrInputDocumentWritable sidw = new SolrInputDocumentWritable(sid);
- values.add(sidw);
- reduceDriver.withInput(new Text(id), values);
-
- reduceDriver.withCacheArchive(solrHomeZip.getAbsolutePath());
-
- reduceDriver.withOutputFormat(SolrOutputFormat.class,
- NullInputFormat.class);
-
- reduceDriver.run();
-
- assertEquals("Expected 1 counter increment", 1,
- reduceDriver.getCounters().findCounter(SolrCounters.class.getName(),
- SolrCounters.DOCUMENTS_WRITTEN.toString()).getValue());
- } finally {
- myReducer.cleanup(myReducer.context);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53e5f34f/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/UtilsForTests.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/UtilsForTests.java b/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/UtilsForTests.java
deleted file mode 100644
index bc5148f..0000000
--- a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/UtilsForTests.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
-import org.apache.solr.client.solrj.response.QueryResponse;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class UtilsForTests {
-
- public static void validateSolrServerDocumentCount(File solrHomeDir, FileSystem fs, Path outDir, int expectedDocs, int expectedShards)
- throws IOException, SolrServerException {
-
- long actualDocs = 0;
- int actualShards = 0;
- for (FileStatus dir : fs.listStatus(outDir)) { // for each shard
- if (dir.getPath().getName().startsWith("part") && dir.isDirectory()) {
- actualShards++;
- try (EmbeddedSolrServer solr
- = SolrRecordWriter.createEmbeddedSolrServer(new Path(solrHomeDir.getAbsolutePath()), fs, dir.getPath())) {
- SolrQuery query = new SolrQuery();
- query.setQuery("*:*");
- QueryResponse resp = solr.query(query);
- long numDocs = resp.getResults().getNumFound();
- actualDocs += numDocs;
- }
- }
- }
- assertEquals(expectedShards, actualShards);
- assertEquals(expectedDocs, actualDocs);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53e5f34f/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRClientCluster.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRClientCluster.java b/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRClientCluster.java
deleted file mode 100644
index be5ea01..0000000
--- a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRClientCluster.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop.hack;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-
-/*
- * A simple interface for a client MR cluster used for testing. This interface
- * provides basic methods which are independent of the underlying Mini Cluster (
- * either through MR1 or MR2).
- */
-public interface MiniMRClientCluster {
-
- public void start() throws IOException;
-
- /**
- * Stop and start back the cluster using the same configuration.
- */
- public void restart() throws IOException;
-
- public void stop() throws IOException;
-
- public Configuration getConfig() throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53e5f34f/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRClientClusterFactory.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRClientClusterFactory.java b/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRClientClusterFactory.java
deleted file mode 100644
index 2bf721b..0000000
--- a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRClientClusterFactory.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop.hack;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.JarFinder;
-
-/**
- * A MiniMRCluster factory. In MR2, it provides a wrapper MiniMRClientCluster
- * interface around the MiniMRYarnCluster. While in MR1, it provides such
- * wrapper around MiniMRCluster. This factory should be used in tests to provide
- * an easy migration of tests across MR1 and MR2.
- */
-public class MiniMRClientClusterFactory {
-
- public static MiniMRClientCluster create(Class<?> caller, int noOfNMs,
- Configuration conf, File testWorkDir) throws IOException {
- return create(caller, caller.getSimpleName(), noOfNMs, conf, testWorkDir);
- }
-
- public static MiniMRClientCluster create(Class<?> caller, String identifier,
- int noOfNMs, Configuration conf, File testWorkDir) throws IOException {
-
- if (conf == null) {
- conf = new Configuration();
- }
-
- FileSystem fs = FileSystem.get(conf);
-
- Path testRootDir = new Path(testWorkDir.getPath(), identifier + "-tmpDir")
- .makeQualified(fs);
- Path appJar = new Path(testRootDir, "MRAppJar.jar");
-
- // Copy MRAppJar and make it private.
- Path appMasterJar = new Path(MiniMRYarnCluster.APPJAR);
-
- fs.copyFromLocalFile(appMasterJar, appJar);
- fs.setPermission(appJar, new FsPermission("744"));
-
- Job job = Job.getInstance(conf);
-
- job.addFileToClassPath(appJar);
-
- Path callerJar = new Path(JarFinder.getJar(caller));
- Path remoteCallerJar = new Path(testRootDir, callerJar.getName());
- fs.copyFromLocalFile(callerJar, remoteCallerJar);
- fs.setPermission(remoteCallerJar, new FsPermission("744"));
- job.addFileToClassPath(remoteCallerJar);
-
- MiniMRYarnCluster miniMRYarnCluster;
- try {
- miniMRYarnCluster = new MiniMRYarnCluster(identifier,
- noOfNMs, testWorkDir);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- job.getConfiguration().set("minimrclientcluster.caller.name",
- identifier);
- job.getConfiguration().setInt("minimrclientcluster.nodemanagers.number",
- noOfNMs);
- miniMRYarnCluster.init(job.getConfiguration());
- miniMRYarnCluster.start();
-
- return new MiniMRYarnClusterAdapter(miniMRYarnCluster, testWorkDir);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53e5f34f/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRCluster.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRCluster.java b/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRCluster.java
deleted file mode 100644
index cf872abd..0000000
--- a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRCluster.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop.hack;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.JobPriority;
-import org.apache.hadoop.mapred.MapTaskCompletionEventsUpdate;
-import org.apache.hadoop.mapred.TaskCompletionEvent;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.lucene.util.LuceneTestCase;
-
-
-/**
- * This class is an MR2 replacement for older MR1 MiniMRCluster, that was used
- * by tests prior to MR2. This replacement class uses the new MiniMRYarnCluster
- * in MR2 but provides the same old MR1 interface, so tests can be migrated from
- * MR1 to MR2 with minimal changes.
- *
- * Due to major differences between MR1 and MR2, a number of methods are either
- * unimplemented/unsupported or were re-implemented to provide wrappers around
- * MR2 functionality.
- *
- * @deprecated Use {@link org.apache.hadoop.mapred.MiniMRClientClusterFactory}
- * instead
- */
-@Deprecated
-public class MiniMRCluster {
- private static final Log LOG = LogFactory.getLog(MiniMRCluster.class);
-
- private MiniMRClientCluster mrClientCluster;
-
- public String getTaskTrackerLocalDir(int taskTracker) {
- throw new UnsupportedOperationException();
- }
-
- public String[] getTaskTrackerLocalDirs(int taskTracker) {
- throw new UnsupportedOperationException();
- }
-
- class JobTrackerRunner {
- // Mock class
- }
-
- class TaskTrackerRunner {
- // Mock class
- }
-
- public JobTrackerRunner getJobTrackerRunner() {
- throw new UnsupportedOperationException();
- }
-
- TaskTrackerRunner getTaskTrackerRunner(int id) {
- throw new UnsupportedOperationException();
- }
-
- public int getNumTaskTrackers() {
- throw new UnsupportedOperationException();
- }
-
- public void setInlineCleanupThreads() {
- throw new UnsupportedOperationException();
- }
-
- public void waitUntilIdle() {
- throw new UnsupportedOperationException();
- }
-
- private void waitTaskTrackers() {
- throw new UnsupportedOperationException();
- }
-
- public int getJobTrackerPort() {
- throw new UnsupportedOperationException();
- }
-
- public JobConf createJobConf() {
- JobConf jobConf = null;
- try {
- jobConf = new JobConf(mrClientCluster.getConfig());
- } catch (IOException e) {
- LOG.error(e);
- }
- return jobConf;
- }
-
- public JobConf createJobConf(JobConf conf) {
- JobConf jobConf = null;
- try {
- jobConf = new JobConf(mrClientCluster.getConfig());
- } catch (IOException e) {
- LOG.error(e);
- }
- return jobConf;
- }
-
- static JobConf configureJobConf(JobConf conf, String namenode,
- int jobTrackerPort, int jobTrackerInfoPort, UserGroupInformation ugi) {
- throw new UnsupportedOperationException();
- }
-
- public MiniMRCluster(int numTaskTrackers, String namenode, int numDir,
- String[] racks, String[] hosts) throws Exception {
- this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts);
- }
-
- public MiniMRCluster(int numTaskTrackers, String namenode, int numDir,
- String[] racks, String[] hosts, JobConf conf) throws Exception {
- this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts, null, conf);
- }
-
- public MiniMRCluster(int numTaskTrackers, String namenode, int numDir)
- throws Exception {
- this(0, 0, numTaskTrackers, namenode, numDir);
- }
-
- public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
- int numTaskTrackers, String namenode, int numDir) throws Exception {
- this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
- null);
- }
-
- public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
- int numTaskTrackers, String namenode, int numDir, String[] racks)
- throws Exception {
- this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
- racks, null);
- }
-
- public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
- int numTaskTrackers, String namenode, int numDir, String[] racks,
- String[] hosts) throws Exception {
- this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
- racks, hosts, null);
- }
-
- public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
- int numTaskTrackers, String namenode, int numDir, String[] racks,
- String[] hosts, UserGroupInformation ugi) throws Exception {
- this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
- racks, hosts, ugi, null);
- }
-
- public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
- int numTaskTrackers, String namenode, int numDir, String[] racks,
- String[] hosts, UserGroupInformation ugi, JobConf conf)
- throws Exception {
- this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir,
- racks, hosts, ugi, conf, 0);
- }
-
- public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
- int numTaskTrackers, String namenode, int numDir, String[] racks,
- String[] hosts, UserGroupInformation ugi, JobConf conf,
- int numTrackerToExclude) throws Exception {
- if (conf == null) conf = new JobConf();
- FileSystem.setDefaultUri(conf, namenode);
- String identifier = this.getClass().getSimpleName() + "_"
- + Integer.toString(LuceneTestCase.random().nextInt(Integer.MAX_VALUE));
- mrClientCluster = MiniMRClientClusterFactory.create(this.getClass(),
- identifier, numTaskTrackers, conf, new File(conf.get("testWorkDir")));
- }
-
- public UserGroupInformation getUgi() {
- throw new UnsupportedOperationException();
- }
-
- public TaskCompletionEvent[] getTaskCompletionEvents(JobID id, int from,
- int max) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- public void setJobPriority(JobID jobId, JobPriority priority)
- throws AccessControlException, IOException {
- throw new UnsupportedOperationException();
- }
-
- public JobPriority getJobPriority(JobID jobId) {
- throw new UnsupportedOperationException();
- }
-
- public long getJobFinishTime(JobID jobId) {
- throw new UnsupportedOperationException();
- }
-
- public void initializeJob(JobID jobId) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- public MapTaskCompletionEventsUpdate getMapTaskCompletionEventsUpdates(
- int index, JobID jobId, int max) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- public JobConf getJobTrackerConf() {
- JobConf jobConf = null;
- try {
- jobConf = new JobConf(mrClientCluster.getConfig());
- } catch (IOException e) {
- LOG.error(e);
- }
- return jobConf;
- }
-
- public int getFaultCount(String hostName) {
- throw new UnsupportedOperationException();
- }
-
- public void startJobTracker() {
- // Do nothing
- }
-
- public void startJobTracker(boolean wait) {
- // Do nothing
- }
-
- public void stopJobTracker() {
- // Do nothing
- }
-
- public void stopTaskTracker(int id) {
- // Do nothing
- }
-
- public void startTaskTracker(String host, String rack, int idx, int numDir)
- throws IOException {
- // Do nothing
- }
-
- void addTaskTracker(TaskTrackerRunner taskTracker) {
- throw new UnsupportedOperationException();
- }
-
- int getTaskTrackerID(String trackerName) {
- throw new UnsupportedOperationException();
- }
-
- public void shutdown() {
- try {
- mrClientCluster.stop();
- } catch (IOException e) {
- LOG.error(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53e5f34f/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRYarnCluster.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRYarnCluster.java b/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRYarnCluster.java
deleted file mode 100644
index 8fa1b31..0000000
--- a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRYarnCluster.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop.hack;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Locale;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.LocalContainerLauncher;
-import org.apache.hadoop.mapred.ShuffleHandler;
-import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.util.JarFinder;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
-
-/**
- * Configures and starts the MR-specific components in the YARN cluster.
- *
- */
-public class MiniMRYarnCluster extends MiniYARNCluster {
-
- public static final String APPJAR = JarFinder.getJar(LocalContainerLauncher.class);
-
- private static final Log LOG = LogFactory.getLog(MiniMRYarnCluster.class);
- private JobHistoryServer historyServer;
- private JobHistoryServerWrapper historyServerWrapper;
-
- public MiniMRYarnCluster(String testName, File testWorkDir) {
- this(testName, 1, testWorkDir);
- }
-
- public MiniMRYarnCluster(String testName, int noOfNMs, File testWorkDir) {
- super(testName, noOfNMs, 4, 4, testWorkDir);
- //TODO: add the history server
- historyServerWrapper = new JobHistoryServerWrapper();
- addService(historyServerWrapper);
- }
-
- @Override
- public void serviceInit(Configuration conf) throws Exception {
- conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
- if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
- conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
- "apps_staging_dir/").getAbsolutePath());
- }
-
- // By default, VMEM monitoring disabled, PMEM monitoring enabled.
- if (!conf.getBoolean(
- MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
- MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
- conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
- conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
- }
-
- conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
-
- try {
- Path stagingPath = FileContext.getFileContext(conf).makeQualified(
- new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
- /*
- * Re-configure the staging path on Windows if the file system is localFs.
- * We need to use a absolute path that contains the drive letter. The unit
- * test could run on a different drive than the AM. We can run into the
- * issue that job files are localized to the drive where the test runs on,
- * while the AM starts on a different drive and fails to find the job
- * metafiles. Using absolute path can avoid this ambiguity.
- */
- if (Path.WINDOWS) {
- if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
- conf.set(MRJobConfig.MR_AM_STAGING_DIR,
- new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
- .getAbsolutePath());
- }
- }
- FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
- if (fc.util().exists(stagingPath)) {
- LOG.info(stagingPath + " exists! deleting...");
- fc.delete(stagingPath, true);
- }
- LOG.info("mkdir: " + stagingPath);
- //mkdir the staging directory so that right permissions are set while running as proxy user
- fc.mkdir(stagingPath, null, true);
- //mkdir done directory as well
- String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
- Path doneDirPath = fc.makeQualified(new Path(doneDir));
- fc.mkdir(doneDirPath, null, true);
- } catch (IOException e) {
- throw new YarnRuntimeException("Could not create staging directory. ", e);
- }
- conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
- // which shuffle doesn't happen
- //configure the shuffle service in NM
- conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
- new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
- conf.setClass(String.format(Locale.ENGLISH, YarnConfiguration.NM_AUX_SERVICE_FMT,
- ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
- Service.class);
-
- // Non-standard shuffle port
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
-
- conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
- DefaultContainerExecutor.class, ContainerExecutor.class);
-
- // TestMRJobs is for testing non-uberized operation only; see TestUberAM
- // for corresponding uberized tests.
- conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
-
- super.serviceInit(conf);
- }
-
- private class JobHistoryServerWrapper extends AbstractService {
- public JobHistoryServerWrapper() {
- super(JobHistoryServerWrapper.class.getName());
- }
-
- @Override
- public synchronized void serviceStart() throws Exception {
- try {
- if (!getConfig().getBoolean(
- JHAdminConfig.MR_HISTORY_MINICLUSTER_FIXED_PORTS,
- JHAdminConfig.DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS)) {
- // pick free random ports.
- getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
- MiniYARNCluster.getHostname() + ":0");
- getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
- MiniYARNCluster.getHostname() + ":0");
- }
- historyServer = new JobHistoryServer();
- historyServer.init(getConfig());
- new Thread() {
- public void run() {
- historyServer.start();
- };
- }.start();
- while (historyServer.getServiceState() == STATE.INITED) {
- LOG.info("Waiting for HistoryServer to start...");
- Thread.sleep(1500);
- }
- //TODO Add a timeout. State.STOPPED check ?
- if (historyServer.getServiceState() != STATE.STARTED) {
- throw new IOException("HistoryServer failed to start");
- }
- super.serviceStart();
- } catch (Throwable t) {
- throw new YarnRuntimeException(t);
- }
- //need to do this because historyServer.init creates a new Configuration
- getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
- historyServer.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
- getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
- historyServer.getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
-
- LOG.info("MiniMRYARN ResourceManager address: " +
- getConfig().get(YarnConfiguration.RM_ADDRESS));
- LOG.info("MiniMRYARN ResourceManager web address: " +
- getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS));
- LOG.info("MiniMRYARN HistoryServer address: " +
- getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
- LOG.info("MiniMRYARN HistoryServer web address: " +
- getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
- }
-
- @Override
- public synchronized void serviceStop() throws Exception {
- if (historyServer != null) {
- historyServer.stop();
- }
- super.serviceStop();
- }
- }
-
- public JobHistoryServer getHistoryServer() {
- return this.historyServer;
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53e5f34f/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRYarnClusterAdapter.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRYarnClusterAdapter.java b/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRYarnClusterAdapter.java
deleted file mode 100644
index 08ab881..0000000
--- a/solr/contrib/map-reduce/src/test/org/apache/solr/hadoop/hack/MiniMRYarnClusterAdapter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop.hack;
-
-import java.io.File;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
-import org.apache.hadoop.service.Service.STATE;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-/**
- * An adapter for MiniMRYarnCluster providing a MiniMRClientCluster interface.
- * This interface could be used by tests across both MR1 and MR2.
- */
-public class MiniMRYarnClusterAdapter implements MiniMRClientCluster {
-
- private MiniMRYarnCluster miniMRYarnCluster;
-
- private File testWorkDir;
-
- private static final Log LOG = LogFactory.getLog(MiniMRYarnClusterAdapter.class);
-
- public MiniMRYarnClusterAdapter(MiniMRYarnCluster miniMRYarnCluster, File testWorkDir) {
- this.miniMRYarnCluster = miniMRYarnCluster;
- this.testWorkDir = testWorkDir;
- }
-
- @Override
- public Configuration getConfig() {
- return miniMRYarnCluster.getConfig();
- }
-
- @Override
- public void start() {
- miniMRYarnCluster.start();
- }
-
- @Override
- public void stop() {
- miniMRYarnCluster.stop();
- }
-
- @Override
- public void restart() {
- if (!miniMRYarnCluster.getServiceState().equals(STATE.STARTED)){
- LOG.warn("Cannot restart the mini cluster, start it first");
- return;
- }
- Configuration oldConf = new Configuration(getConfig());
- String callerName = oldConf.get("minimrclientcluster.caller.name",
- this.getClass().getName());
- int noOfNMs = oldConf.getInt("minimrclientcluster.nodemanagers.number", 1);
- oldConf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
- oldConf.setBoolean(JHAdminConfig.MR_HISTORY_MINICLUSTER_FIXED_PORTS, true);
- stop();
- miniMRYarnCluster = new MiniMRYarnCluster(callerName, noOfNMs, testWorkDir);
- miniMRYarnCluster.init(oldConf);
- miniMRYarnCluster.start();
- }
-
-}