You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2016/12/13 13:20:10 UTC
[39/48] oozie git commit: Revert "OOZIE-2729 OYA: Use MiniYARNCluster
in tests. TODO: refactor XTestCase."
Revert "OOZIE-2729 OYA: Use MiniYARNCluster in tests. TODO: refactor XTestCase."
This reverts commit d5dcc5cec2e080413e2540f43d3877b4d56f99ad.
Change-Id: Iefe037a8477591a7554b31fe81a399d7e1f86f00
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/67dca9c3
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/67dca9c3
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/67dca9c3
Branch: refs/heads/oya
Commit: 67dca9c31016a3bf7ad00037f1750fce988f1e76
Parents: e5070b1
Author: Peter Bacsko <pb...@cloudera.com>
Authored: Mon Nov 28 14:04:13 2016 +0100
Committer: Peter Bacsko <pb...@cloudera.com>
Committed: Mon Nov 28 14:04:13 2016 +0100
----------------------------------------------------------------------
.../java/org/apache/oozie/test/XTestCase.java | 440 +++++++++----------
1 file changed, 217 insertions(+), 223 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/67dca9c3/core/src/test/java/org/apache/oozie/test/XTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java
index 784c578..711d41d 100644
--- a/core/src/test/java/org/apache/oozie/test/XTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java
@@ -27,9 +27,14 @@ import java.io.OutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.EnumSet;
import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -50,12 +55,12 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.AppenderSkeleton;
@@ -142,24 +147,24 @@ public abstract class XTestCase extends TestCase {
OOZIE_SRC_DIR = OOZIE_SRC_DIR.getParentFile();
}
- final String testPropsFile = System.getProperty(OOZIE_TEST_PROPERTIES, "test.properties");
- final File file = new File(testPropsFile).isAbsolute()
- ? new File(testPropsFile) : new File(OOZIE_SRC_DIR, testPropsFile);
+ String testPropsFile = System.getProperty(OOZIE_TEST_PROPERTIES, "test.properties");
+ File file = new File(testPropsFile).isAbsolute()
+ ? new File(testPropsFile) : new File(OOZIE_SRC_DIR, testPropsFile);
if (file.exists()) {
System.out.println();
System.out.println("*********************************************************************************");
System.out.println("Loading test system properties from: " + file.getAbsolutePath());
System.out.println();
- final Properties props = new Properties();
+ Properties props = new Properties();
props.load(new FileReader(file));
- for (final Map.Entry entry : props.entrySet()) {
+ for (Map.Entry entry : props.entrySet()) {
if (!System.getProperties().containsKey(entry.getKey())) {
System.setProperty((String) entry.getKey(), (String) entry.getValue());
System.out.println(entry.getKey() + " = " + entry.getValue());
}
else {
System.out.println(entry.getKey() + " IGNORED, using command line value = " +
- System.getProperty((String) entry.getKey()));
+ System.getProperty((String) entry.getKey()));
}
}
System.out.println("*********************************************************************************");
@@ -168,13 +173,14 @@ public abstract class XTestCase extends TestCase {
else {
if (System.getProperty(OOZIE_TEST_PROPERTIES) != null) {
System.err.println();
- System.err.println("ERROR: Specified test file does not exist: " +
- System.getProperty(OOZIE_TEST_PROPERTIES));
+ System.err.println("ERROR: Specified test file does not exist: " +
+ System.getProperty(OOZIE_TEST_PROPERTIES));
System.err.println();
System.exit(-1);
}
}
- } catch (final IOException ex) {
+ }
+ catch (IOException ex) {
throw new RuntimeException(ex);
}
@@ -255,12 +261,12 @@ public abstract class XTestCase extends TestCase {
/**
* Name of the shell command
*/
- protected static final String SHELL_COMMAND_NAME = (Shell.WINDOWS) ? "cmd" : "bash";
+ protected static final String SHELL_COMMAND_NAME = (Shell.WINDOWS)? "cmd": "bash";
/**
* Extension for shell script files
*/
- protected static final String SHELL_COMMAND_SCRIPTFILE_EXTENSION = (Shell.WINDOWS) ? "cmd" : "sh";
+ protected static final String SHELL_COMMAND_SCRIPTFILE_EXTENSION = (Shell.WINDOWS)? "cmd": "sh";
/**
* Option for shell command to pass script files
@@ -291,12 +297,12 @@ public abstract class XTestCase extends TestCase {
* @param cleanUpDBTables true if should cleanup the database tables, false if not
* @throws Exception if the test workflow working directory could not be created or there was a problem cleaning the database
*/
- protected void setUp(final boolean cleanUpDBTables) throws Exception {
+ protected void setUp(boolean cleanUpDBTables) throws Exception {
RUNNING_TESTCASES.incrementAndGet();
super.setUp();
- final String baseDir = System.getProperty(OOZIE_TEST_DIR, new File("target/test-data").getAbsolutePath());
+ String baseDir = System.getProperty(OOZIE_TEST_DIR, new File("target/test-data").getAbsolutePath());
String msg = null;
- final File f = new File(baseDir);
+ File f = new File(baseDir);
if (!f.isAbsolute()) {
msg = XLog.format("System property [{0}]=[{1}] must be set to an absolute path", OOZIE_TEST_DIR, baseDir);
}
@@ -313,7 +319,7 @@ public abstract class XTestCase extends TestCase {
f.mkdirs();
if (!f.exists() || !f.isDirectory()) {
System.err.println();
- System.err.println(XLog.format("Could not create test dir [{0}]", baseDir));
+ System.err.println(XLog.format("Could not create test dir [{0}]", baseDir));
System.exit(-1);
}
hadoopVersion = System.getProperty(HADOOP_VERSION, "0.20.0");
@@ -325,12 +331,12 @@ public abstract class XTestCase extends TestCase {
testCaseConfDir = createTestCaseSubDir("conf");
// load test Oozie site
- final String oozieTestDB = System.getProperty("oozie.test.db", "hsqldb");
- final String defaultOozieSize =
- new File(OOZIE_SRC_DIR, "core/src/test/resources/" + oozieTestDB + "-oozie-site.xml").getAbsolutePath();
- final String customOozieSite = System.getProperty("oozie.test.config.file", defaultOozieSize);
+ String oozieTestDB = System.getProperty("oozie.test.db", "hsqldb");
+ String defaultOozieSize =
+ new File(OOZIE_SRC_DIR, "core/src/test/resources/" + oozieTestDB + "-oozie-site.xml").getAbsolutePath();
+ String customOozieSite = System.getProperty("oozie.test.config.file", defaultOozieSize);
File source = new File(customOozieSite);
- if (!source.isAbsolute()) {
+ if(!source.isAbsolute()) {
source = new File(OOZIE_SRC_DIR, customOozieSite);
}
source = source.getAbsoluteFile();
@@ -340,7 +346,7 @@ public abstract class XTestCase extends TestCase {
}
else {
// If we can't find it, try using the class loader (useful if we're using XTestCase from outside core)
- final URL sourceURL = getClass().getClassLoader().getResource(oozieTestDB + "-oozie-site.xml");
+ URL sourceURL = getClass().getClassLoader().getResource(oozieTestDB + "-oozie-site.xml");
if (sourceURL != null) {
oozieSiteSourceStream = sourceURL.openStream();
}
@@ -348,35 +354,35 @@ public abstract class XTestCase extends TestCase {
// If we still can't find it, then exit
System.err.println();
System.err.println(XLog.format("Custom configuration file for testing does not exist [{0}]",
- source.getAbsolutePath()));
+ source.getAbsolutePath()));
System.err.println();
System.exit(-1);
}
}
// Copy the specified oozie-site file from oozieSiteSourceStream to the test case dir as oozie-site.xml
- final Configuration oozieSiteConf = new Configuration(false);
+ Configuration oozieSiteConf = new Configuration(false);
oozieSiteConf.addResource(oozieSiteSourceStream);
- final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- final InputStream inputStream = classLoader.getResourceAsStream(ConfigurationService.DEFAULT_CONFIG_FILE);
- final XConfiguration configuration = new XConfiguration(inputStream);
- final String classes = configuration.get(Services.CONF_SERVICE_CLASSES);
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ InputStream inputStream = classLoader.getResourceAsStream(ConfigurationService.DEFAULT_CONFIG_FILE);
+ XConfiguration configuration = new XConfiguration(inputStream);
+ String classes = configuration.get(Services.CONF_SERVICE_CLASSES);
// Disable sharelib service as it cannot find the sharelib jars
// as maven has target/classes in classpath and not the jar because test phase is before package phase
- oozieSiteConf.set(Services.CONF_SERVICE_CLASSES, classes.replaceAll("org.apache.oozie.service.ShareLibService,", ""));
+ oozieSiteConf.set(Services.CONF_SERVICE_CLASSES, classes.replaceAll("org.apache.oozie.service.ShareLibService,",""));
// Make sure to create the Oozie DB during unit tests
oozieSiteConf.set(JPAService.CONF_CREATE_DB_SCHEMA, "true");
File target = new File(testCaseConfDir, "oozie-site.xml");
oozieSiteConf.writeXml(new FileOutputStream(target));
- final File hadoopConfDir = new File(testCaseConfDir, "hadoop-conf");
+ File hadoopConfDir = new File(testCaseConfDir, "hadoop-conf");
hadoopConfDir.mkdir();
- final File actionConfDir = new File(testCaseConfDir, "action-conf");
+ File actionConfDir = new File(testCaseConfDir, "action-conf");
actionConfDir.mkdir();
source = new File(OOZIE_SRC_DIR, "core/src/test/resources/hadoop-config.xml");
InputStream hadoopConfigResourceStream = null;
if (!source.exists()) {
// If we can't find it, try using the class loader (useful if we're using XTestCase from outside core)
- final URL sourceURL = getClass().getClassLoader().getResource("hadoop-config.xml");
+ URL sourceURL = getClass().getClassLoader().getResource("hadoop-config.xml");
if (sourceURL != null) {
hadoopConfigResourceStream = sourceURL.openStream();
}
@@ -409,17 +415,17 @@ public abstract class XTestCase extends TestCase {
}
if (System.getProperty("oozie.test.db.host") == null) {
- System.setProperty("oozie.test.db.host", "localhost");
+ System.setProperty("oozie.test.db.host", "localhost");
}
setSystemProperty(ConfigurationService.OOZIE_DATA_DIR, testCaseDir);
- setSystemProperty(HadoopAccessorService.SUPPORTED_FILESYSTEMS, "*");
+ setSystemProperty(HadoopAccessorService.SUPPORTED_FILESYSTEMS,"*");
- if (yarnCluster != null) {
- try (final OutputStream os = new FileOutputStream(new File(hadoopConfDir, "core-site.xml"))) {
- final Configuration conf = createJobConfFromYarnCluster();
- conf.writeXml(os);
- }
+ if (mrCluster != null) {
+ OutputStream os = new FileOutputStream(new File(hadoopConfDir, "core-site.xml"));
+ Configuration conf = createJobConfFromMRCluster();
+ conf.writeXml(os);
+ os.close();
}
if (System.getProperty("oozie.test.metastore.server", "false").equals("true")) {
@@ -468,12 +474,12 @@ public abstract class XTestCase extends TestCase {
* reason for the manual parsing instead of an actual File.toURI is because Oozie tests use tokens ${}
* frequently. Something like URI("c:/temp/${HOUR}").toString() will generate escaped values that will break tests
*/
- protected String getTestCaseFileUri(final String relativeUri) {
+ protected String getTestCaseFileUri(String relativeUri) {
String uri = new File(testCaseDir).toURI().toString();
// truncates '/' if the testCaseDir was provided with a fullpath ended with separator
- if (uri.endsWith("/")) {
- uri = uri.substring(0, uri.length() - 1);
+ if (uri.endsWith("/")){
+ uri = uri.substring(0, uri.length() -1);
}
return uri + "/" + relativeUri;
@@ -512,7 +518,7 @@ public abstract class XTestCase extends TestCase {
/**
* Return an alternate test user Id that belongs
- to the test group.
+ to the test group.
*
* @return the user Id.
*/
@@ -556,7 +562,7 @@ public abstract class XTestCase extends TestCase {
* @param testCase testcase instance to obtain the working directory.
* @return the test working directory.
*/
- private String getTestCaseDirInternal(final TestCase testCase) {
+ private String getTestCaseDirInternal(TestCase testCase) {
ParamChecker.notNull(testCase, "testCase");
File dir = new File(System.getProperty(OOZIE_TEST_DIR, "target/test-data"));
dir = new File(dir, "oozietests").getAbsoluteFile();
@@ -565,16 +571,16 @@ public abstract class XTestCase extends TestCase {
return dir.getAbsolutePath();
}
- protected void delete(final File file) throws IOException {
+ protected void delete(File file) throws IOException {
ParamChecker.notNull(file, "file");
if (file.getAbsolutePath().length() < 5) {
throw new RuntimeException(XLog.format("path [{0}] is too short, not deleting", file.getAbsolutePath()));
}
if (file.exists()) {
if (file.isDirectory()) {
- final File[] children = file.listFiles();
+ File[] children = file.listFiles();
if (children != null) {
- for (final File child : children) {
+ for (File child : children) {
delete(child);
}
}
@@ -598,14 +604,14 @@ public abstract class XTestCase extends TestCase {
* @return return the path of the test working directory, it is always an absolute path.
* @throws Exception if the test working directory could not be created or cleaned up.
*/
- private String createTestCaseDir(final TestCase testCase, final boolean cleanup) throws Exception {
- final String testCaseDir = getTestCaseDirInternal(testCase);
+ private String createTestCaseDir(TestCase testCase, boolean cleanup) throws Exception {
+ String testCaseDir = getTestCaseDirInternal(testCase);
System.out.println();
System.out.println(XLog.format("Setting testcase work dir[{0}]", testCaseDir));
if (cleanup) {
delete(new File(testCaseDir));
}
- final File dir = new File(testCaseDir);
+ File dir = new File(testCaseDir);
if (!dir.mkdirs()) {
throw new RuntimeException(XLog.format("Could not create testcase dir[{0}]", testCaseDir));
}
@@ -618,7 +624,7 @@ public abstract class XTestCase extends TestCase {
* @param subDirNames a list of progressively deeper directory names
* @return the absolute path to the created directory.
*/
- protected String createTestCaseSubDir(final String... subDirNames) {
+ protected String createTestCaseSubDir(String... subDirNames) {
ParamChecker.notNull(subDirNames, "subDirName");
if (subDirNames.length == 0) {
throw new RuntimeException(XLog.format("Could not create testcase subdir ''; it already exists"));
@@ -644,12 +650,12 @@ public abstract class XTestCase extends TestCase {
* @param name system property name.
* @param value value to set.
*/
- protected void setSystemProperty(final String name, final String value) {
+ protected void setSystemProperty(String name, String value) {
if (sysProps == null) {
sysProps = new HashMap<String, String>();
}
if (!sysProps.containsKey(name)) {
- final String currentValue = System.getProperty(name);
+ String currentValue = System.getProperty(name);
sysProps.put(name, currentValue);
}
if (value != null) {
@@ -665,7 +671,7 @@ public abstract class XTestCase extends TestCase {
*/
private void resetSystemProperties() {
if (sysProps != null) {
- for (final Map.Entry<String, String> entry : sysProps.entrySet()) {
+ for (Map.Entry<String, String> entry : sysProps.entrySet()) {
if (entry.getValue() != null) {
System.setProperty(entry.getKey(), entry.getValue());
}
@@ -698,11 +704,11 @@ public abstract class XTestCase extends TestCase {
* @param predicate predicate waiting on.
* @return the waited time.
*/
- protected long waitFor(final int timeout, final Predicate predicate) {
+ protected long waitFor(int timeout, Predicate predicate) {
ParamChecker.notNull(predicate, "predicate");
- final XLog log = new XLog(LogFactory.getLog(getClass()));
- final long started = System.currentTimeMillis();
- final long mustEnd = System.currentTimeMillis() + (long) (WAITFOR_RATIO * timeout);
+ XLog log = new XLog(LogFactory.getLog(getClass()));
+ long started = System.currentTimeMillis();
+ long mustEnd = System.currentTimeMillis() + (long)(WAITFOR_RATIO * timeout);
long lastEcho = 0;
try {
long waiting = mustEnd - System.currentTimeMillis();
@@ -720,7 +726,8 @@ public abstract class XTestCase extends TestCase {
log.info("Waiting timed out after [{0}] msec", timeout);
}
return System.currentTimeMillis() - started;
- } catch (final Exception ex) {
+ }
+ catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -730,7 +737,7 @@ public abstract class XTestCase extends TestCase {
*
* @param sleepTime time in milliseconds to wait
*/
- protected void sleep(final int sleepTime) {
+ protected void sleep(int sleepTime) {
waitFor(sleepTime, new Predicate() {
@Override
public boolean evaluate() throws Exception {
@@ -770,7 +777,7 @@ public abstract class XTestCase extends TestCase {
}
public String getKeytabFile() {
- final String defaultFile = new File(System.getProperty("user.home"), "oozie.keytab").getAbsolutePath();
+ String defaultFile = new File(System.getProperty("user.home"), "oozie.keytab").getAbsolutePath();
return System.getProperty("oozie.test.kerberos.keytab.file", defaultFile);
}
@@ -780,7 +787,7 @@ public abstract class XTestCase extends TestCase {
public String getOoziePrincipal() {
return System.getProperty("oozie.test.kerberos.oozie.principal",
- getOozieUser() + "/localhost") + "@" + getRealm();
+ getOozieUser() + "/localhost") + "@" + getRealm();
}
protected MiniHCatServer getHCatalogServer() {
@@ -804,11 +811,12 @@ public abstract class XTestCase extends TestCase {
// needed to cleanup the database and shut them down when done; the test will likely start its own Services later and
// we don't want to interfere
try {
- final Services services = new Services();
+ Services services = new Services();
services.getConf().set(Services.CONF_SERVICE_CLASSES, MINIMAL_SERVICES_FOR_DB_CLEANUP);
services.init();
cleanUpDBTablesInternal();
- } finally {
+ }
+ finally {
if (Services.get() != null) {
Services.get().destroy();
}
@@ -817,70 +825,70 @@ public abstract class XTestCase extends TestCase {
}
private void cleanUpDBTablesInternal() throws StoreException {
- final EntityManager entityManager = Services.get().get(JPAService.class).getEntityManager();
+ EntityManager entityManager = Services.get().get(JPAService.class).getEntityManager();
entityManager.setFlushMode(FlushModeType.COMMIT);
entityManager.getTransaction().begin();
Query q = entityManager.createNamedQuery("GET_WORKFLOWS");
- final List<WorkflowJobBean> wfjBeans = q.getResultList();
- final int wfjSize = wfjBeans.size();
- for (final WorkflowJobBean w : wfjBeans) {
+ List<WorkflowJobBean> wfjBeans = q.getResultList();
+ int wfjSize = wfjBeans.size();
+ for (WorkflowJobBean w : wfjBeans) {
entityManager.remove(w);
}
q = entityManager.createNamedQuery("GET_ACTIONS");
- final List<WorkflowActionBean> wfaBeans = q.getResultList();
- final int wfaSize = wfaBeans.size();
- for (final WorkflowActionBean w : wfaBeans) {
+ List<WorkflowActionBean> wfaBeans = q.getResultList();
+ int wfaSize = wfaBeans.size();
+ for (WorkflowActionBean w : wfaBeans) {
entityManager.remove(w);
}
q = entityManager.createNamedQuery("GET_COORD_JOBS");
- final List<CoordinatorJobBean> cojBeans = q.getResultList();
- final int cojSize = cojBeans.size();
- for (final CoordinatorJobBean w : cojBeans) {
+ List<CoordinatorJobBean> cojBeans = q.getResultList();
+ int cojSize = cojBeans.size();
+ for (CoordinatorJobBean w : cojBeans) {
entityManager.remove(w);
}
q = entityManager.createNamedQuery("GET_COORD_ACTIONS");
- final List<CoordinatorActionBean> coaBeans = q.getResultList();
- final int coaSize = coaBeans.size();
- for (final CoordinatorActionBean w : coaBeans) {
+ List<CoordinatorActionBean> coaBeans = q.getResultList();
+ int coaSize = coaBeans.size();
+ for (CoordinatorActionBean w : coaBeans) {
entityManager.remove(w);
}
q = entityManager.createNamedQuery("GET_BUNDLE_JOBS");
- final List<BundleJobBean> bjBeans = q.getResultList();
- final int bjSize = bjBeans.size();
- for (final BundleJobBean w : bjBeans) {
+ List<BundleJobBean> bjBeans = q.getResultList();
+ int bjSize = bjBeans.size();
+ for (BundleJobBean w : bjBeans) {
entityManager.remove(w);
}
q = entityManager.createNamedQuery("GET_BUNDLE_ACTIONS");
- final List<BundleActionBean> baBeans = q.getResultList();
- final int baSize = baBeans.size();
- for (final BundleActionBean w : baBeans) {
+ List<BundleActionBean> baBeans = q.getResultList();
+ int baSize = baBeans.size();
+ for (BundleActionBean w : baBeans) {
entityManager.remove(w);
}
q = entityManager.createNamedQuery("GET_SLA_EVENTS");
- final List<SLAEventBean> slaBeans = q.getResultList();
- final int slaSize = slaBeans.size();
- for (final SLAEventBean w : slaBeans) {
+ List<SLAEventBean> slaBeans = q.getResultList();
+ int slaSize = slaBeans.size();
+ for (SLAEventBean w : slaBeans) {
entityManager.remove(w);
}
q = entityManager.createQuery("select OBJECT(w) from SLARegistrationBean w");
- final List<SLARegistrationBean> slaRegBeans = q.getResultList();
- final int slaRegSize = slaRegBeans.size();
- for (final SLARegistrationBean w : slaRegBeans) {
+ List<SLARegistrationBean> slaRegBeans = q.getResultList();
+ int slaRegSize = slaRegBeans.size();
+ for (SLARegistrationBean w : slaRegBeans) {
entityManager.remove(w);
}
q = entityManager.createQuery("select OBJECT(w) from SLASummaryBean w");
- final List<SLASummaryBean> sdBeans = q.getResultList();
- final int ssSize = sdBeans.size();
- for (final SLASummaryBean w : sdBeans) {
+ List<SLASummaryBean> sdBeans = q.getResultList();
+ int ssSize = sdBeans.size();
+ for (SLASummaryBean w : sdBeans) {
entityManager.remove(w);
}
@@ -900,49 +908,58 @@ public abstract class XTestCase extends TestCase {
private static MiniDFSCluster dfsCluster = null;
private static MiniDFSCluster dfsCluster2 = null;
- private static MiniYARNCluster yarnCluster = null;
+ // TODO: OYA: replace with MiniYarnCluster or MiniMRYarnCluster
+ private static MiniMRCluster mrCluster = null;
private static MiniHCatServer hcatServer = null;
private static MiniHS2 hiveserver2 = null;
private static HiveConf hs2Config = null;
- private void setUpEmbeddedHadoop(final String testCaseDir) throws Exception {
- if (dfsCluster == null && yarnCluster == null) {
- if (System.getProperty("hadoop.log.dir") == null) {
- System.setProperty("hadoop.log.dir", testCaseDir);
- }
+ private void setUpEmbeddedHadoop(String testCaseDir) throws Exception {
+ if (dfsCluster == null && mrCluster == null) {
+ if (System.getProperty("hadoop.log.dir") == null) {
+ System.setProperty("hadoop.log.dir", testCaseDir);
+ }
// Tell the ClasspathUtils that we're using a mini cluster
ClasspathUtils.setUsingMiniYarnCluster(true);
- final int dataNodes = 2;
- final String oozieUser = getOozieUser();
- final JobConf dfsConfig = createDFSConfig();
- final String[] userGroups = new String[]{getTestGroup(), getTestGroup2()};
+ int taskTrackers = 2;
+ int dataNodes = 2;
+ String oozieUser = getOozieUser();
+ JobConf conf = createDFSConfig();
+ String[] userGroups = new String[] { getTestGroup(), getTestGroup2() };
UserGroupInformation.createUserForTesting(oozieUser, userGroups);
UserGroupInformation.createUserForTesting(getTestUser(), userGroups);
UserGroupInformation.createUserForTesting(getTestUser2(), userGroups);
- UserGroupInformation.createUserForTesting(getTestUser3(), new String[]{"users"});
+ UserGroupInformation.createUserForTesting(getTestUser3(), new String[] { "users" } );
try {
- dfsCluster = new MiniDFSCluster.Builder(dfsConfig)
- .numDataNodes(dataNodes)
- .format(true)
- .racks(null)
- .build();
-
- createHdfsPathsAndSetupPermissions();
-
- final Configuration yarnConfig = createYarnConfig(dfsConfig);
- yarnCluster = new MiniYARNCluster(this.getClass().getName(), 1, 1, 1, 1);
- yarnCluster.init(yarnConfig);
- yarnCluster.start();
- final JobConf jobConf = new JobConf(yarnCluster.getConfig());
+ dfsCluster = new MiniDFSCluster(conf, dataNodes, true, null);
+ FileSystem fileSystem = dfsCluster.getFileSystem();
+ fileSystem.mkdirs(new Path("target/test-data"));
+ fileSystem.mkdirs(new Path("target/test-data"+"/minicluster/mapred"));
+ fileSystem.mkdirs(new Path("/user"));
+ fileSystem.mkdirs(new Path("/tmp"));
+ fileSystem.mkdirs(new Path("/hadoop/mapred/system"));
+ fileSystem.setPermission(new Path("target/test-data"), FsPermission.valueOf("-rwxrwxrwx"));
+ fileSystem.setPermission(new Path("target/test-data"+"/minicluster"), FsPermission.valueOf("-rwxrwxrwx"));
+ fileSystem.setPermission(new Path("target/test-data"+"/minicluster/mapred"), FsPermission.valueOf("-rwxrwxrwx"));
+ fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx"));
+ fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx"));
+ fileSystem.setPermission(new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------"));
+ String nnURI = fileSystem.getUri().toString();
+ int numDirs = 1;
+ String[] racks = null;
+ String[] hosts = null;
+ mrCluster = new MiniMRCluster(0, 0, taskTrackers, nnURI, numDirs, racks, hosts, null, conf);
+ JobConf jobConf = mrCluster.createJobConf();
System.setProperty(OOZIE_TEST_JOB_TRACKER, jobConf.get("mapred.job.tracker"));
- final String rmAddress = jobConf.get("yarn.resourcemanager.address");
+ String rmAddress = jobConf.get("yarn.resourcemanager.address");
if (rmAddress != null) {
System.setProperty(OOZIE_TEST_JOB_TRACKER, rmAddress);
}
- System.setProperty(OOZIE_TEST_NAME_NODE, dfsCluster.getFileSystem().getUri().toString());
- ProxyUsers.refreshSuperUserGroupsConfiguration(dfsConfig);
- } catch (final Exception ex) {
+ System.setProperty(OOZIE_TEST_NAME_NODE, jobConf.get("fs.default.name"));
+ ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
+ }
+ catch (Exception ex) {
shutdownMiniCluster();
throw ex;
}
@@ -950,32 +967,15 @@ public abstract class XTestCase extends TestCase {
}
}
- private void createHdfsPathsAndSetupPermissions() throws IOException {
- final FileSystem fileSystem = dfsCluster.getFileSystem();
-
- fileSystem.mkdirs(new Path("target/test-data"));
- fileSystem.mkdirs(new Path("target/test-data" + "/minicluster/mapred"));
- fileSystem.mkdirs(new Path("/user"));
- fileSystem.mkdirs(new Path("/tmp"));
- fileSystem.mkdirs(new Path("/hadoop/mapred/system"));
-
- fileSystem.setPermission(new Path("target/test-data"), FsPermission.valueOf("-rwxrwxrwx"));
- fileSystem.setPermission(new Path("target/test-data" + "/minicluster"), FsPermission.valueOf("-rwxrwxrwx"));
- fileSystem.setPermission(new Path("target/test-data" + "/minicluster/mapred"), FsPermission.valueOf("-rwxrwxrwx"));
- fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx"));
- fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx"));
- fileSystem.setPermission(new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------"));
- }
-
private void setUpEmbeddedHadoop2() throws Exception {
if (dfsCluster != null && dfsCluster2 == null) {
// Trick dfs location for MiniDFSCluster since it doesn't accept location as input)
- final String testBuildDataSaved = System.getProperty("test.build.data", "build/test/data");
+ String testBuildDataSaved = System.getProperty("test.build.data", "build/test/data");
try {
System.setProperty("test.build.data", FilenameUtils.concat(testBuildDataSaved, "2"));
// Only DFS cluster is created based upon current need
dfsCluster2 = new MiniDFSCluster(createDFSConfig(), 2, true, null);
- final FileSystem fileSystem = dfsCluster2.getFileSystem();
+ FileSystem fileSystem = dfsCluster2.getFileSystem();
fileSystem.mkdirs(new Path("target/test-data"));
fileSystem.mkdirs(new Path("/user"));
fileSystem.mkdirs(new Path("/tmp"));
@@ -983,10 +983,12 @@ public abstract class XTestCase extends TestCase {
fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx"));
fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx"));
System.setProperty(OOZIE_TEST_NAME_NODE2, fileSystem.getConf().get("fs.default.name"));
- } catch (final Exception ex) {
+ }
+ catch (Exception ex) {
shutdownMiniCluster2();
throw ex;
- } finally {
+ }
+ finally {
// Restore previus value
System.setProperty("test.build.data", testBuildDataSaved);
}
@@ -994,41 +996,31 @@ public abstract class XTestCase extends TestCase {
}
private JobConf createDFSConfig() throws UnknownHostException {
- final JobConf conf = new JobConf();
- conf.set("dfs.block.access.token.enable", "false");
- conf.set("dfs.permissions", "true");
- conf.set("hadoop.security.authentication", "simple");
-
- //Doing this because Hadoop 1.x does not support '*' if the value is '*,127.0.0.1'
- final StringBuilder sb = new StringBuilder();
- sb.append("127.0.0.1,localhost");
- for (final InetAddress i : InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) {
- sb.append(",").append(i.getCanonicalHostName());
- }
- conf.set("hadoop.proxyuser." + getOozieUser() + ".hosts", sb.toString());
-
- conf.set("hadoop.proxyuser." + getOozieUser() + ".groups", getTestGroup());
- conf.set("mapred.tasktracker.map.tasks.maximum", "4");
- conf.set("mapred.tasktracker.reduce.tasks.maximum", "4");
-
- conf.set("hadoop.tmp.dir", "target/test-data" + "/minicluster");
-
- // Scheduler properties required for YARN CapacityScheduler to work
- conf.set("yarn.scheduler.capacity.root.queues", "default");
- conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
- // Required to prevent deadlocks with YARN CapacityScheduler
- conf.set("yarn.scheduler.capacity.maximum-am-resource-percent", "0.5");
-
- return conf;
- }
-
- private Configuration createYarnConfig(final Configuration parentConfig) {
- final Configuration yarnConfig = new YarnConfiguration(parentConfig);
-
- yarnConfig.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
- yarnConfig.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
-
- return yarnConfig;
+ JobConf conf = new JobConf();
+ conf.set("dfs.block.access.token.enable", "false");
+ conf.set("dfs.permissions", "true");
+ conf.set("hadoop.security.authentication", "simple");
+
+ //Doing this because Hadoop 1.x does not support '*' if the value is '*,127.0.0.1'
+ StringBuilder sb = new StringBuilder();
+ sb.append("127.0.0.1,localhost");
+ for (InetAddress i : InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) {
+ sb.append(",").append(i.getCanonicalHostName());
+ }
+ conf.set("hadoop.proxyuser." + getOozieUser() + ".hosts", sb.toString());
+
+ conf.set("hadoop.proxyuser." + getOozieUser() + ".groups", getTestGroup());
+ conf.set("mapred.tasktracker.map.tasks.maximum", "4");
+ conf.set("mapred.tasktracker.reduce.tasks.maximum", "4");
+
+ conf.set("hadoop.tmp.dir", "target/test-data"+"/minicluster");
+
+ // Scheduler properties required for YARN CapacityScheduler to work
+ conf.set("yarn.scheduler.capacity.root.queues", "default");
+ conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
+ // Required to prevent deadlocks with YARN CapacityScheduler
+ conf.set("yarn.scheduler.capacity.maximum-am-resource-percent", "0.5");
+ return conf;
}
protected void setupHCatalogServer() throws Exception {
@@ -1055,8 +1047,8 @@ public abstract class XTestCase extends TestCase {
if (hs2Config == null) {
// Make HS2 use our Mini cluster by copying all configs to HiveConf; also had to hack MiniHS2
hs2Config = new HiveConf();
- final Configuration jobConf = createJobConf();
- for (final Map.Entry<String, String> pair : jobConf) {
+ Configuration jobConf = createJobConf();
+ for (Map.Entry<String, String> pair : jobConf) {
hs2Config.set(pair.getKey(), pair.getValue());
}
}
@@ -1078,32 +1070,27 @@ public abstract class XTestCase extends TestCase {
return hiveserver2.getJdbcURL();
}
- protected String getHiveServer2JdbcURL(final String dbName) {
+ protected String getHiveServer2JdbcURL(String dbName) {
return hiveserver2.getJdbcURL(dbName);
}
private static void shutdownMiniCluster() {
try {
- if (yarnCluster != null) {
- final YarnJobActions yarnJobActions =
- new YarnJobActions.Builder(yarnCluster.getConfig(), ApplicationsRequestScope.ALL)
- .build();
- final Set<ApplicationId> allYarnJobs = yarnJobActions.getYarnJobs();
-
- yarnJobActions.killSelectedYarnJobs(allYarnJobs);
-
- yarnCluster.stop();
+ if (mrCluster != null) {
+ mrCluster.shutdown();
}
- } catch (final Exception ex) {
- System.out.println(ex.getMessage());
+ }
+ catch (Exception ex) {
+ System.out.println(ex);
}
try {
if (dfsCluster != null) {
dfsCluster.shutdown();
}
- } catch (final Exception ex) {
- System.out.println(ex.getMessage());
+ }
+ catch (Exception ex) {
+ System.out.println(ex);
}
// This is tied to the MiniCluster because it inherits configs from there
hs2Config = null;
@@ -1114,11 +1101,11 @@ public abstract class XTestCase extends TestCase {
if (dfsCluster2 != null) {
dfsCluster2.shutdown();
}
- } catch (final Exception ex) {
+ }
+ catch (Exception ex) {
System.out.println(ex);
}
}
-
private static final AtomicLong LAST_TESTCASE_FINISHED = new AtomicLong();
private static final AtomicInteger RUNNING_TESTCASES = new AtomicInteger();
@@ -1129,7 +1116,7 @@ public abstract class XTestCase extends TestCase {
}
public void run() {
- final long shutdownWait = Long.parseLong(System.getProperty(TEST_MINICLUSTER_MONITOR_SHUTDOWN_WAIT, "10")) * 1000;
+ long shutdownWait = Long.parseLong(System.getProperty(TEST_MINICLUSTER_MONITOR_SHUTDOWN_WAIT, "10")) * 1000;
LAST_TESTCASE_FINISHED.set(System.currentTimeMillis());
while (true) {
if (RUNNING_TESTCASES.get() == 0) {
@@ -1139,7 +1126,8 @@ public abstract class XTestCase extends TestCase {
}
try {
Thread.sleep(1000);
- } catch (final InterruptedException ex) {
+ }
+ catch (InterruptedException ex) {
break;
}
}
@@ -1149,10 +1137,10 @@ public abstract class XTestCase extends TestCase {
}
@SuppressWarnings("deprecation")
- private JobConf createJobConfFromYarnCluster() {
- final JobConf jobConf = new JobConf();
- final JobConf jobConfYarn = new JobConf(yarnCluster.getConfig());
- for (final Entry<String, String> entry : jobConfYarn) {
+ private JobConf createJobConfFromMRCluster() {
+ JobConf jobConf = new JobConf();
+ JobConf jobConfMR = mrCluster.createJobConf();
+ for ( Entry<String, String> entry : jobConfMR) {
// MiniMRClientClusterFactory sets the job jar in Hadoop 2.0 causing tests to fail
// TODO call conf.unset after moving completely to Hadoop 2.x
if (!(entry.getKey().equals("mapreduce.job.jar") || entry.getKey().equals("mapred.jar"))) {
@@ -1167,16 +1155,15 @@ public abstract class XTestCase extends TestCase {
* @return a jobconf preconfigured to talk with the test cluster/minicluster.
*/
protected JobConf createJobConf() throws IOException {
- final JobConf jobConf;
-
- if (yarnCluster != null) {
- jobConf = createJobConfFromYarnCluster();
- } else {
+ JobConf jobConf;
+ if (mrCluster != null) {
+ jobConf = createJobConfFromMRCluster();
+ }
+ else {
jobConf = new JobConf();
jobConf.set("mapred.job.tracker", getJobTrackerUri());
jobConf.set("fs.default.name", getNameNodeUri());
}
-
return jobConf;
}
@@ -1199,22 +1186,29 @@ public abstract class XTestCase extends TestCase {
*
* @param executable The ShutdownJobTrackerExecutable to execute while the JobTracker is shutdown
*/
- protected void executeWhileJobTrackerIsShutdown(final ShutdownJobTrackerExecutable executable) {
+ protected void executeWhileJobTrackerIsShutdown(ShutdownJobTrackerExecutable executable) {
+ mrCluster.stopJobTracker();
+ Exception ex = null;
try {
executable.execute();
- } catch (final Exception e) {
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ ex = e;
+ } finally {
+ mrCluster.startJobTracker();
+ }
+ if (ex != null) {
+ throw new RuntimeException(ex);
}
}
protected Services setupServicesForHCatalog() throws ServiceException {
- final Services services = new Services();
+ Services services = new Services();
setupServicesForHCataLogImpl(services);
return services;
}
- private void setupServicesForHCataLogImpl(final Services services) {
- final Configuration conf = services.getConf();
+ private void setupServicesForHCataLogImpl(Services services) {
+ Configuration conf = services.getConf();
conf.set(Services.CONF_SERVICE_EXT_CLASSES,
JMSAccessorService.class.getName() + "," +
PartitionDependencyManagerService.class.getName() + "," +
@@ -1222,31 +1216,31 @@ public abstract class XTestCase extends TestCase {
conf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES,
"default=java.naming.factory.initial#" + ActiveMQConnFactory + ";" +
"java.naming.provider.url#" + localActiveMQBroker +
- "connectionFactoryNames#" + "ConnectionFactory");
+ "connectionFactoryNames#"+ "ConnectionFactory");
conf.set(URIHandlerService.URI_HANDLERS,
FSURIHandler.class.getName() + "," + HCatURIHandler.class.getName());
setSystemProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
setSystemProperty("java.naming.provider.url", "vm://localhost?broker.persistent=false");
}
- protected Services setupServicesForHCatalog(final Services services) throws ServiceException {
+ protected Services setupServicesForHCatalog(Services services) throws ServiceException {
setupServicesForHCataLogImpl(services);
return services;
}
- protected YarnApplicationState waitUntilYarnAppState(final String externalId, final EnumSet<YarnApplicationState> acceptedStates)
+ protected YarnApplicationState waitUntilYarnAppState(String externalId, final EnumSet<YarnApplicationState> acceptedStates)
throws HadoopAccessorException, IOException, YarnException {
final ApplicationId appId = ConverterUtils.toApplicationId(externalId);
final MutableObject<YarnApplicationState> finalState = new MutableObject<YarnApplicationState>();
- final JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri());
+ JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri());
final YarnClient yarnClient = Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), jobConf);
try {
waitFor(60 * 1000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
- final YarnApplicationState state = yarnClient.getApplicationReport(appId).getYarnApplicationState();
+ YarnApplicationState state = yarnClient.getApplicationReport(appId).getYarnApplicationState();
finalState.setValue(state);
return acceptedStates.contains(state);
@@ -1262,20 +1256,20 @@ public abstract class XTestCase extends TestCase {
return finalState.getValue();
}
- protected void waitUntilYarnAppDoneAndAssertSuccess(final String externalId) throws HadoopAccessorException, IOException, YarnException {
- final YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES);
+ protected void waitUntilYarnAppDoneAndAssertSuccess(String externalId) throws HadoopAccessorException, IOException, YarnException {
+ YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES);
assertEquals("YARN App state", YarnApplicationState.FINISHED, state);
}
- protected void waitUntilYarnAppKilledAndAssertSuccess(final String externalId) throws HadoopAccessorException, IOException, YarnException {
- final YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES);
+ protected void waitUntilYarnAppKilledAndAssertSuccess(String externalId) throws HadoopAccessorException, IOException, YarnException {
+ YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES);
assertEquals("YARN App state", YarnApplicationState.KILLED, state);
}
protected YarnApplicationState getYarnApplicationState(String externalId) throws HadoopAccessorException, IOException, YarnException {
final ApplicationId appId = ConverterUtils.toApplicationId(externalId);
YarnApplicationState state = null;
- final JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri());
+ JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri());
// This is needed here because we need a mutable final YarnClient
final MutableObject<YarnClient> yarnClientMO = new MutableObject<YarnClient>(null);
try {