You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2013/06/10 22:15:13 UTC
svn commit: r1491589 [1/2] - in /oozie/trunk: ./ core/
core/src/main/java/org/apache/oozie/action/hadoop/
core/src/main/java/org/apache/oozie/util/ core/src/main/resources/
core/src/test/java/org/apache/oozie/action/hadoop/
core/src/test/java/org/apach...
Author: rkanter
Date: Mon Jun 10 20:15:11 2013
New Revision: 1491589
URL: http://svn.apache.org/r1491589
Log:
OOZIE-1315 Refactor classes from launcher jar into Oozie sharelib (rkanter)
Added:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
oozie/trunk/sharelib/hcatalog/src/
oozie/trunk/sharelib/hcatalog/src/main/
oozie/trunk/sharelib/hcatalog/src/main/java/
oozie/trunk/sharelib/hcatalog/src/main/java/org/
oozie/trunk/sharelib/hcatalog/src/main/java/org/apache/
oozie/trunk/sharelib/hcatalog/src/main/java/org/apache/oozie/
oozie/trunk/sharelib/hcatalog/src/main/java/org/apache/oozie/action/
oozie/trunk/sharelib/hcatalog/src/main/java/org/apache/oozie/action/hadoop/
oozie/trunk/sharelib/hcatalog/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java
- copied unchanged from r1490336, oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java
oozie/trunk/sharelib/hcatalog/src/main/java/org/apache/oozie/util/
oozie/trunk/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java
- copied unchanged from r1490336, oozie/trunk/core/src/main/java/org/apache/oozie/util/HCatURI.java
oozie/trunk/sharelib/hcatalog/src/test/
oozie/trunk/sharelib/hcatalog/src/test/java/
oozie/trunk/sharelib/hcatalog/src/test/java/org/
oozie/trunk/sharelib/hcatalog/src/test/java/org/apache/
oozie/trunk/sharelib/hcatalog/src/test/java/org/apache/oozie/
oozie/trunk/sharelib/hcatalog/src/test/java/org/apache/oozie/util/
oozie/trunk/sharelib/hcatalog/src/test/java/org/apache/oozie/util/TestHCatURI.java
- copied unchanged from r1490336, oozie/trunk/core/src/test/java/org/apache/oozie/util/TestHCatURI.java
oozie/trunk/sharelib/oozie/src/
oozie/trunk/sharelib/oozie/src/main/
oozie/trunk/sharelib/oozie/src/main/java/
oozie/trunk/sharelib/oozie/src/main/java/org/
oozie/trunk/sharelib/oozie/src/main/java/org/apache/
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ActionStats.java
- copied unchanged from r1490336, oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/ActionStats.java
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ActionType.java
- copied unchanged from r1490336, oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/ActionType.java
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
- copied unchanged from r1490336, oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java
- copied unchanged from r1490336, oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
- copied unchanged from r1490336, oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
- copied, changed from r1490336, oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java
- copied unchanged from r1490336, oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java
- copied unchanged from r1490336, oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MRStats.java
- copied unchanged from r1490336, oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MRStats.java
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
- copied, changed from r1490336, oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java
- copied unchanged from r1490336, oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
- copied unchanged from r1490336, oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java
- copied unchanged from r1490336, oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java
oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
- copied, changed from r1490336, oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorUberJar.java
- copied, changed from r1490336, oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorUberJar.java
Removed:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/ActionStats.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/ActionType.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MRStats.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java
oozie/trunk/core/src/main/java/org/apache/oozie/util/HCatURI.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorUberJar.java
oozie/trunk/core/src/test/java/org/apache/oozie/util/TestHCatURI.java
oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorStreaming.java
Modified:
oozie/trunk/core/pom.xml
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
oozie/trunk/core/src/main/resources/oozie-default.xml
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncher.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
oozie/trunk/core/src/test/java/org/apache/oozie/test/XTestCase.java
oozie/trunk/docs/src/site/twiki/AG_Install.twiki
oozie/trunk/examples/pom.xml
oozie/trunk/pom.xml
oozie/trunk/release-log.txt
oozie/trunk/sharelib/distcp/pom.xml
oozie/trunk/sharelib/hcatalog/pom.xml
oozie/trunk/sharelib/hive/pom.xml
oozie/trunk/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
oozie/trunk/sharelib/oozie/pom.xml
oozie/trunk/sharelib/pig/pom.xml
oozie/trunk/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
oozie/trunk/sharelib/sqoop/pom.xml
oozie/trunk/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
oozie/trunk/sharelib/streaming/pom.xml
oozie/trunk/webapp/pom.xml
Modified: oozie/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/core/pom.xml?rev=1491589&r1=1491588&r2=1491589&view=diff
==============================================================================
--- oozie/trunk/core/pom.xml (original)
+++ oozie/trunk/core/pom.xml Mon Jun 10 20:15:11 2013
@@ -245,7 +245,13 @@
<dependency>
<groupId>org.apache.oozie</groupId>
<artifactId>oozie-sharelib-oozie</artifactId>
- <scope>provided</scope>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.oozie</groupId>
+ <artifactId>oozie-sharelib-hcatalog</artifactId>
+ <scope>compile</scope>
</dependency>
<dependency>
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java?rev=1491589&r1=1491588&r2=1491589&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java Mon Jun 10 20:15:11 2013
@@ -59,9 +59,7 @@ public class HiveActionExecutor extends
classes.add(Class.forName(HIVE_MAIN_CLASS_NAME));
}
catch (ClassNotFoundException e) {
- //TODO - A temporary fix as hive class is in hive sharelib
- // - Change this to RuntimeException when classes are refactored
- log.error("HiveMain class not found " +e);
+ throw new RuntimeException("Class not found", e);
}
return classes;
}
@@ -132,7 +130,7 @@ public class HiveActionExecutor extends
super.setActionCompletionData(context, actionFs);
// Load stored Hadoop jobs ids and promote them as external child ids on job failure
- Path externalChildIDs = LauncherMapper.getExternalChildIDsDataPath(context.getActionDir());
+ Path externalChildIDs = LauncherMapperHelper.getExternalChildIDsDataPath(context.getActionDir());
if (actionFs.exists(externalChildIDs)) {
InputStream is = actionFs.open(externalChildIDs);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1491589&r1=1491588&r2=1491589&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java Mon Jun 10 20:15:11 2013
@@ -91,6 +91,8 @@ public class JavaActionExecutor extends
public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
private static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable";
+ public static final String OOZIE_ACTION_SHIP_LAUNCHER_JAR = "oozie.action.ship.launcher.jar";
+ private boolean useLauncherJar;
private static int maxActionOutputLen;
private static int maxExternalStatsSize;
@@ -111,12 +113,12 @@ public class JavaActionExecutor extends
public JavaActionExecutor() {
this("java");
- requiresNNJT = true;
}
protected JavaActionExecutor(String type) {
super(type);
requiresNNJT = true;
+ useLauncherJar = getOozieConf().getBoolean(OOZIE_ACTION_SHIP_LAUNCHER_JAR, true);
}
protected String getLauncherJarName() {
@@ -151,33 +153,40 @@ public class JavaActionExecutor extends
//Get the limit for the maximum allowed size of action stats
maxExternalStatsSize = getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, MAX_EXTERNAL_STATS_SIZE_DEFAULT);
maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
- try {
- List<Class> classes = getLauncherClasses();
- Class[] launcherClasses = classes.toArray(new Class[classes.size()]);
- IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses);
-
- registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001");
- registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT,
- "JA002");
- registerError(DiskChecker.DiskOutOfSpaceException.class.getName(),
- ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003");
- registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(),
- ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
- registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
- ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
- registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, " JA006");
- registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
- registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
- registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");
- }
- catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- catch (java.lang.NoClassDefFoundError err) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- err.printStackTrace(new PrintStream(baos));
- log.warn(baos.toString());
- }
+
+ createLauncherJar();
+
+ registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001");
+ registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT,
+ "JA002");
+ registerError(DiskChecker.DiskOutOfSpaceException.class.getName(),
+ ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003");
+ registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(),
+ ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
+ registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
+ ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
+ registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, " JA006");
+ registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
+ registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
+ registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");
+ }
+
+ public void createLauncherJar() {
+ if (useLauncherJar) {
+ try {
+ List<Class> classes = getLauncherClasses();
+ Class[] launcherClasses = classes.toArray(new Class[classes.size()]);
+ IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses);
+ }
+ catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ catch (java.lang.NoClassDefFoundError err) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ err.printStackTrace(new PrintStream(baos));
+ log.warn(baos.toString());
+ }
+ }
}
/**
@@ -370,8 +379,13 @@ public class JavaActionExecutor extends
Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp");
if (!actionFs.exists(actionDir)) {
try {
- actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path(
- tempActionDir, getLauncherJarName()));
+ if (useLauncherJar) {
+ actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path(
+ tempActionDir, getLauncherJarName()));
+ }
+ else {
+ actionFs.mkdirs(tempActionDir);
+ }
actionFs.rename(tempActionDir, actionDir);
}
catch (IOException ex) {
@@ -478,7 +492,9 @@ public class JavaActionExecutor extends
Configuration proto = context.getProtoActionConf();
// launcher JAR
- addToCache(conf, appPath, getOozieLauncherJar(context), false);
+ if (useLauncherJar) {
+ addToCache(conf, appPath, getOozieLauncherJar(context), false);
+ }
// Workflow lib/
String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST);
@@ -597,20 +613,20 @@ public class JavaActionExecutor extends
prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim();
}
}
- LauncherMapper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
+ LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
prepareXML);
- LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
- LauncherMapper.setupLauncherURIHandlerConf(launcherJobConf);
- LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
- LauncherMapper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
+ LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
+ LauncherMapperHelper.setupLauncherURIHandlerConf(launcherJobConf);
+ LauncherMapperHelper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
+ LauncherMapperHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
List<Element> list = actionXml.getChildren("arg", ns);
String[] args = new String[list.size()];
for (int i = 0; i < list.size(); i++) {
args[i] = list.get(i).getTextTrim();
}
- LauncherMapper.setupMainArguments(launcherJobConf, args);
+ LauncherMapperHelper.setupMainArguments(launcherJobConf, args);
List<Element> javaopts = actionXml.getChildren("java-opt", ns);
for (Element opt: javaopts) {
@@ -725,7 +741,7 @@ public class JavaActionExecutor extends
injectLauncherCallback(context, launcherJobConf);
XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId());
jobClient = createJobClient(context, launcherJobConf);
- String launcherId = LauncherMapper.getRecoveryId(launcherJobConf, context.getActionDir(), context
+ String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
.getRecoveryId());
boolean alreadyRunning = launcherId != null;
RunningJob runningJob;
@@ -999,9 +1015,9 @@ public class JavaActionExecutor extends
String user = context.getWorkflow().getUser();
String group = context.getWorkflow().getGroup();
- if (LauncherMapper.hasIdSwap(runningJob, user, group, actionDir)) {
+ if (LauncherMapperHelper.hasIdSwap(runningJob, user, group, actionDir)) {
String launcherId = action.getExternalId();
- Path idSwapPath = LauncherMapper.getIdSwapPath(context.getActionDir());
+ Path idSwapPath = LauncherMapperHelper.getIdSwapPath(context.getActionDir());
InputStream is = actionFs.open(idSwapPath);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
@@ -1022,14 +1038,14 @@ public class JavaActionExecutor extends
if (runningJob.isComplete()) {
XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]",
action.getExternalId());
- if (runningJob.isSuccessful() && LauncherMapper.isMainSuccessful(runningJob)) {
+ if (runningJob.isSuccessful() && LauncherMapperHelper.isMainSuccessful(runningJob)) {
getActionData(actionFs, runningJob, action, context);
XLog.getLog(getClass()).info(XLog.STD, "action produced output");
}
else {
XLog log = XLog.getLog(getClass());
String errorReason;
- Path actionError = LauncherMapper.getErrorPath(context.getActionDir());
+ Path actionError = LauncherMapperHelper.getErrorPath(context.getActionDir());
if (actionFs.exists(actionError)) {
InputStream is = actionFs.open(actionError);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
@@ -1110,8 +1126,8 @@ public class JavaActionExecutor extends
Properties props = null;
if (getCaptureOutput(action)) {
props = new Properties();
- if (LauncherMapper.hasOutputData(runningJob)) {
- Path actionOutput = LauncherMapper.getOutputDataPath(context.getActionDir());
+ if (LauncherMapperHelper.hasOutputData(runningJob)) {
+ Path actionOutput = LauncherMapperHelper.getOutputDataPath(context.getActionDir());
InputStream is = actionFs.open(actionOutput);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
Added: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java?rev=1491589&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java Mon Jun 10 20:15:11 2013
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.oozie.service.HadoopAccessorException;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
+import org.apache.oozie.util.XLog;
+
+public class LauncherMapperHelper {
+
+ public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
+ throws HadoopAccessorException, IOException {
+ String jobId = null;
+ Path recoveryFile = new Path(actionDir, recoveryId);
+ FileSystem fs = Services.get().get(HadoopAccessorService.class)
+ .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
+
+ if (fs.exists(recoveryFile)) {
+ InputStream is = fs.open(recoveryFile);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+ jobId = reader.readLine();
+ reader.close();
+ }
+ return jobId;
+
+ }
+
+ public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
+ // Only set the javaMainClass if its not null or empty string (should be the case except for java action), this way the user
+ // can override the action's main class via <configuration> property
+ if (javaMainClass != null && !javaMainClass.equals("")) {
+ launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
+ }
+ }
+
+ public static void setupLauncherURIHandlerConf(Configuration launcherConf) {
+ for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) {
+ launcherConf.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public static void setupMainArguments(Configuration launcherConf, String[] args) {
+ launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
+ for (int i = 0; i < args.length; i++) {
+ launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
+ }
+ }
+
+ public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
+ launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
+ }
+
+ /**
+ * Set the maximum value of stats data
+ *
+ * @param launcherConf the oozie launcher configuration
+ * @param maxStatsData the maximum allowed size of stats data
+ */
+ public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
+ launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
+ }
+
+ public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir,
+ String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
+
+ launcherConf.setMapperClass(LauncherMapper.class);
+ launcherConf.setSpeculativeExecution(false);
+ launcherConf.setNumMapTasks(1);
+ launcherConf.setNumReduceTasks(0);
+
+ launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
+ launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
+ launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString());
+ launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId);
+ launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML);
+
+ actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
+ actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
+
+ if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
+ List<String> purgedEntries = new ArrayList<String>();
+ Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files");
+ for (String entry : entries) {
+ if (entry.contains("#")) {
+ purgedEntries.add(entry);
+ }
+ }
+ actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()]));
+ launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
+ }
+
+ FileSystem fs =
+ Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"),
+ actionDir.toUri(), launcherConf);
+ fs.mkdirs(actionDir);
+
+ OutputStream os = fs.create(new Path(actionDir, LauncherMapper.ACTION_CONF_XML));
+ actionConf.writeXml(os);
+ os.close();
+
+ Path inputDir = new Path(actionDir, "input");
+ fs.mkdirs(inputDir);
+ Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "dummy.txt")));
+ writer.write("dummy");
+ writer.close();
+
+ launcherConf.set("mapred.input.dir", inputDir.toString());
+ launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString());
+ }
+
+ public static boolean isMainDone(RunningJob runningJob) throws IOException {
+ return runningJob.isComplete();
+ }
+
+ public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
+ boolean succeeded = runningJob.isSuccessful();
+ if (succeeded) {
+ Counters counters = runningJob.getCounters();
+ if (counters != null) {
+ Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
+ if (group != null) {
+ succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0;
+ }
+ }
+ }
+ return succeeded;
+ }
+
+ public static boolean hasOutputData(RunningJob runningJob) throws IOException {
+ boolean output = false;
+ Counters counters = runningJob.getCounters();
+ if (counters != null) {
+ Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
+ if (group != null) {
+ output = group.getCounter(LauncherMapper.COUNTER_OUTPUT_DATA) == 1;
+ }
+ }
+ return output;
+ }
+
+ /**
+ * Check whether runningJob has stats data or not
+ *
+ * @param runningJob the runningJob
+ * @return returns whether the running Job has stats data or not
+ * @throws IOException
+ */
+ public static boolean hasStatsData(RunningJob runningJob) throws IOException{
+ boolean output = false;
+ Counters counters = runningJob.getCounters();
+ if (counters != null) {
+ Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
+ if (group != null) {
+ output = group.getCounter(LauncherMapper.COUNTER_STATS_DATA) == 1;
+ }
+ }
+ return output;
+ }
+
+ public static boolean hasIdSwap(RunningJob runningJob) throws IOException {
+ boolean swap = false;
+ Counters counters = runningJob.getCounters();
+ if (counters != null) {
+ Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
+ if (group != null) {
+ swap = group.getCounter(LauncherMapper.COUNTER_DO_ID_SWAP) == 1;
+ }
+ }
+ return swap;
+ }
+
+ public static boolean hasIdSwap(RunningJob runningJob, String user, String group, Path actionDir)
+ throws IOException, HadoopAccessorException {
+ boolean swap = false;
+
+ XLog log = XLog.getLog("org.apache.oozie.action.hadoop.LauncherMapper");
+
+ Counters counters = runningJob.getCounters();
+ if (counters != null) {
+ Counters.Group counterGroup = counters.getGroup(LauncherMapper.COUNTER_GROUP);
+ if (counterGroup != null) {
+ swap = counterGroup.getCounter(LauncherMapper.COUNTER_DO_ID_SWAP) == 1;
+ }
+ }
+ // additional check for swapped hadoop ID
+ // Can't rely on hadoop counters existing
+ // we'll check for the newID file in hdfs if the hadoop counters is null
+ else {
+
+ Path p = getIdSwapPath(actionDir);
+ // log.debug("Checking for newId file in: [{0}]", p);
+
+ HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+ Configuration conf = has.createJobConf(p.toUri().getAuthority());
+ FileSystem fs = has.createFileSystem(user, p.toUri(), conf);
+ if (fs.exists(p)) {
+ log.debug("Hadoop Counters is null, but found newID file.");
+
+ swap = true;
+ }
+ else {
+ log.debug("Hadoop Counters is null, and newID file doesn't exist at: [{0}]", p);
+ }
+ }
+ return swap;
+ }
+
+ public static Path getOutputDataPath(Path actionDir) {
+ return new Path(actionDir, LauncherMapper.ACTION_OUTPUT_PROPS);
+ }
+
+ /**
+ * Get the location of stats file
+ *
+ * @param actionDir the action directory
+ * @return the hdfs location of the file
+ */
+ public static Path getActionStatsDataPath(Path actionDir){
+ return new Path(actionDir, LauncherMapper.ACTION_STATS_PROPS);
+ }
+
+ /**
+ * Get the location of external Child IDs file
+ *
+ * @param actionDir the action directory
+ * @return the hdfs location of the file
+ */
+ public static Path getExternalChildIDsDataPath(Path actionDir){
+ return new Path(actionDir, LauncherMapper.ACTION_EXTERNAL_CHILD_IDS_PROPS);
+ }
+
+ public static Path getErrorPath(Path actionDir) {
+ return new Path(actionDir, LauncherMapper.ACTION_ERROR_PROPS);
+ }
+
+ public static Path getIdSwapPath(Path actionDir) {
+ return new Path(actionDir, LauncherMapper.ACTION_NEW_ID_PROPS);
+ }
+}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java?rev=1491589&r1=1491588&r2=1491589&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java Mon Jun 10 20:15:11 2013
@@ -44,7 +44,6 @@ public class MapReduceActionExecutor ext
public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
public static final String HADOOP_COUNTERS = "hadoop.counters";
- public static final String OOZIE_MAPREDUCE_UBER_JAR = "oozie.mapreduce.uber.jar";
public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable";
private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain";
private XLog log = XLog.getLog(getClass());
@@ -63,9 +62,7 @@ public class MapReduceActionExecutor ext
classes.add(Class.forName(STREAMING_MAIN_CLASS_NAME));
}
catch (ClassNotFoundException e) {
- //TODO - A temporary fix as streaming class in streaming sharelib
- // - Change this to RuntimeException when classes are refactored
- log.error("Streaming class not found " +e);
+ throw new RuntimeException("Class not found", e);
}
return classes;
}
@@ -138,12 +135,12 @@ public class MapReduceActionExecutor ext
// For "regular" (not streaming or pipes) MR jobs
if (regularMR) {
// Resolve uber jar path (has to be done after super because oozie.mapreduce.uber.jar is under <configuration>)
- String uberJar = actionConf.get(OOZIE_MAPREDUCE_UBER_JAR);
+ String uberJar = actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR);
if (uberJar != null) {
if (!Services.get().getConf().getBoolean(OOZIE_MAPREDUCE_UBER_JAR_ENABLE, false)) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "MR003",
- "{0} property is not allowed. Set {1} to true in oozie-site to enable.", OOZIE_MAPREDUCE_UBER_JAR,
- OOZIE_MAPREDUCE_UBER_JAR_ENABLE);
+ "{0} property is not allowed. Set {1} to true in oozie-site to enable.",
+ MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, OOZIE_MAPREDUCE_UBER_JAR_ENABLE);
}
String nameNode = actionXml.getChildTextTrim("name-node", ns);
if (nameNode != null) {
@@ -153,20 +150,21 @@ public class MapReduceActionExecutor ext
Path nameNodePath = new Path(nameNode);
String nameNodeSchemeAuthority = nameNodePath.toUri().getScheme()
+ "://" + nameNodePath.toUri().getAuthority();
- actionConf.set(OOZIE_MAPREDUCE_UBER_JAR, new Path(nameNodeSchemeAuthority + uberJarPath).toString());
+ actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR,
+ new Path(nameNodeSchemeAuthority + uberJarPath).toString());
}
else { // relative path --> prepend app path
- actionConf.set(OOZIE_MAPREDUCE_UBER_JAR, new Path(appPath, uberJarPath).toString());
+ actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, new Path(appPath, uberJarPath).toString());
}
}
}
}
}
else {
- if (actionConf.get(OOZIE_MAPREDUCE_UBER_JAR) != null) {
- log.warn("The " + OOZIE_MAPREDUCE_UBER_JAR + " property is only applicable for MapReduce (not streaming nor pipes)"
- + " workflows, ignoring");
- actionConf.set(OOZIE_MAPREDUCE_UBER_JAR, "");
+ if (actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR) != null) {
+ log.warn("The " + MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR + " property is only applicable for MapReduce (not"
+ + "streaming nor pipes) workflows, ignoring");
+ actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, "");
}
}
@@ -294,7 +292,7 @@ public class MapReduceActionExecutor ext
Namespace ns = actionXml.getNamespace();
if (actionXml.getChild("streaming", ns) == null && actionXml.getChild("pipes", ns) == null) {
// Set for uber jar
- String uberJar = actionConf.get(MapReduceActionExecutor.OOZIE_MAPREDUCE_UBER_JAR);
+ String uberJar = actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR);
if (uberJar != null && uberJar.trim().length() > 0) {
launcherJobConf.setJar(uberJar);
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java?rev=1491589&r1=1491588&r2=1491589&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java Mon Jun 10 20:15:11 2013
@@ -58,9 +58,7 @@ public class PigActionExecutor extends S
classes.add(Class.forName(OOZIE_PIG_STATS));
}
catch (ClassNotFoundException e) {
- //TODO - A temporary fix as pig class is in pig sharelib
- // - Change this to RuntimeException when classes are refactored
- log.error("PigMain class not found " +e);
+ throw new RuntimeException("Class not found", e);
}
return classes;
}
@@ -134,7 +132,7 @@ public class PigActionExecutor extends S
private String getStats(Context context, FileSystem actionFs) throws IOException, HadoopAccessorException,
URISyntaxException {
- Path actionOutput = LauncherMapper.getActionStatsDataPath(context.getActionDir());
+ Path actionOutput = LauncherMapperHelper.getActionStatsDataPath(context.getActionDir());
String stats = null;
if (actionFs.exists(actionOutput)) {
stats = getDataFromPath(actionOutput, actionFs);
@@ -152,7 +150,7 @@ public class PigActionExecutor extends S
private String getExternalChildIDs(Context context, FileSystem actionFs) throws IOException,
HadoopAccessorException, URISyntaxException {
- Path actionOutput = LauncherMapper.getExternalChildIDsDataPath(context.getActionDir());
+ Path actionOutput = LauncherMapperHelper.getExternalChildIDsDataPath(context.getActionDir());
String externalIDs = null;
if (actionFs.exists(actionOutput)) {
externalIDs = getDataFromPath(actionOutput, actionFs);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java?rev=1491589&r1=1491588&r2=1491589&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java Mon Jun 10 20:15:11 2013
@@ -62,7 +62,7 @@ public class SqoopActionExecutor extends
classes.add(Class.forName(SQOOP_MAIN_CLASS_NAME));
}
catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
+ throw new RuntimeException("Class not found", e);
}
return classes;
}
Modified: oozie/trunk/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1491589&r1=1491588&r2=1491589&view=diff
==============================================================================
--- oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ oozie/trunk/core/src/main/resources/oozie-default.xml Mon Jun 10 20:15:11 2013
@@ -25,6 +25,18 @@
<!-- ************************** VERY IMPORTANT ************************** -->
<property>
+ <name>oozie.action.ship.launcher.jar</name>
+ <value>true</value>
+ <description>
+ If true, Oozie will create and ship a "launcher jar" that contains classes necessary for the launcher job. If false,
+ Oozie will not do this, and it is assumed that the necessary classes are in their respective sharelib jars or the
+ "oozie" sharelib instead. When false, the sharelib is required for ALL actions; when true, the sharelib is only
+ required for actions that need additional jars (e.g. Pig). The main advantage of setting this to false is that
+ launching jobs should be slightly faster.
+ </description>
+ </property>
+
+ <property>
<name>oozie.action.mapreduce.uber.jar.enable</name>
<value>false</value>
<description>
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java?rev=1491589&r1=1491588&r2=1491589&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java Mon Jun 10 20:15:11 2013
@@ -58,7 +58,7 @@ public class TestFSPrepareActions extend
String prepareXML = "<prepare>" + "<delete path='" + newDir + "'/>" + "</prepare>";
JobConf conf = createJobConf();
- LauncherMapper.setupLauncherURIHandlerConf(conf);
+ LauncherMapperHelper.setupLauncherURIHandlerConf(conf);
PrepareActionsDriver.doOperations(prepareXML, conf);
assertFalse(fs.exists(newDir));
}
@@ -77,7 +77,7 @@ public class TestFSPrepareActions extend
String prepareXML = "<prepare>" + "<mkdir path='" + newDir + "'/>" + "</prepare>";
JobConf conf = createJobConf();
- LauncherMapper.setupLauncherURIHandlerConf(conf);
+ LauncherMapperHelper.setupLauncherURIHandlerConf(conf);
PrepareActionsDriver.doOperations(prepareXML, conf);
assertTrue(fs.exists(newDir));
}
@@ -96,7 +96,7 @@ public class TestFSPrepareActions extend
try {
JobConf conf = createJobConf();
- LauncherMapper.setupLauncherURIHandlerConf(conf);
+ LauncherMapperHelper.setupLauncherURIHandlerConf(conf);
PrepareActionsDriver.doOperations(prepareXML, conf);
fail("Expected to catch an exception but did not encounter any");
} catch (LauncherException le) {
@@ -126,7 +126,7 @@ public class TestFSPrepareActions extend
try {
JobConf conf = createJobConf();
- LauncherMapper.setupLauncherURIHandlerConf(conf);
+ LauncherMapperHelper.setupLauncherURIHandlerConf(conf);
PrepareActionsDriver.doOperations(prepareXML, conf);
fail("Expected to catch an exception but did not encounter any");
} catch (LauncherException le) {
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java?rev=1491589&r1=1491588&r2=1491589&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java Mon Jun 10 20:15:11 2013
@@ -64,7 +64,7 @@ public class TestHCatPrepareActions exte
+ "</prepare>";
JobConf conf = createJobConf();
- LauncherMapper.setupLauncherURIHandlerConf(conf);
+ LauncherMapperHelper.setupLauncherURIHandlerConf(conf);
PrepareActionsDriver.doOperations(prepareXML, conf);
FileSystem fs = getFileSystem();
assertFalse(fs.exists(new Path(part1)));
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java?rev=1491589&r1=1491588&r2=1491589&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java Mon Jun 10 20:15:11 2013
@@ -84,26 +84,57 @@ public class TestJavaActionExecutor exte
IOUtils.copyStream(is, os);
}
- public void testLauncherJar() throws Exception {
- JavaActionExecutor ae = new JavaActionExecutor();
- Path jar = new Path(ae.getOozieRuntimeDir(), ae.getLauncherJarName());
- assertTrue(new File(jar.toString()).exists());
+ public void testSetupMethodsWithLauncherJar() throws Exception {
+ String defaultVal = Services.get().getConf().get("oozie.action.ship.launcher.jar");
+ try {
+ Services.get().getConf().set("oozie.action.ship.launcher.jar", "true");
+ _testSetupMethods(true);
+ }
+ finally {
+ // back to default
+ if (defaultVal != null) {
+ Services.get().getConf().set("oozie.action.ship.launcher.jar", defaultVal);
+ }
+ }
+ }
+
+ public void testSetupMethodsWithoutLauncherJar() throws Exception {
+ String defaultVal = Services.get().getConf().get("oozie.action.ship.launcher.jar");
+ try {
+ Services.get().getConf().set("oozie.action.ship.launcher.jar", "false");
+ _testSetupMethods(false);
+ }
+ finally {
+ // back to default
+ if (defaultVal != null) {
+ Services.get().getConf().set("oozie.action.ship.launcher.jar", defaultVal);
+ }
+ }
}
- public void testSetupMethods() throws Exception {
+ public void _testSetupMethods(boolean launcherJarShouldExist) throws Exception {
JavaActionExecutor ae = new JavaActionExecutor();
+ Path jar = new Path(ae.getOozieRuntimeDir(), ae.getLauncherJarName());
+ File fJar = new File(jar.toString());
+ fJar.delete();
+ assertFalse(fJar.exists());
+ ae.createLauncherJar();
+ assertEquals(launcherJarShouldExist, fJar.exists());
+
assertEquals("java", ae.getType());
assertEquals("java-launcher.jar", ae.getLauncherJarName());
- List<Class> classes = new ArrayList<Class>();
- classes.add(LauncherMapper.class);
- classes.add(LauncherSecurityManager.class);
- classes.add(LauncherException.class);
- classes.add(LauncherMainException.class);
- classes.add(PrepareActionsDriver.class);
- classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
- classes.add(ActionStats.class);
- classes.add(ActionType.class);
- assertEquals(classes, ae.getLauncherClasses());
+ if (launcherJarShouldExist) {
+ List<Class> classes = new ArrayList<Class>();
+ classes.add(LauncherMapper.class);
+ classes.add(LauncherSecurityManager.class);
+ classes.add(LauncherException.class);
+ classes.add(LauncherMainException.class);
+ classes.add(PrepareActionsDriver.class);
+ classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
+ classes.add(ActionStats.class);
+ classes.add(ActionType.class);
+ assertEquals(classes, ae.getLauncherClasses());
+ }
Configuration conf = new XConfiguration();
conf.set("user.name", "a");
@@ -235,7 +266,7 @@ public class TestJavaActionExecutor exte
assertFalse(getFileSystem().exists(context.getActionDir()));
ae.prepareActionDir(getFileSystem(), context);
assertTrue(getFileSystem().exists(context.getActionDir()));
- assertTrue(getFileSystem().exists(new Path(context.getActionDir(), ae.getLauncherJarName())));
+ assertEquals(launcherJarShouldExist, getFileSystem().exists(new Path(context.getActionDir(), ae.getLauncherJarName())));
ae.cleanUpActionDir(getFileSystem(), context);
assertFalse(getFileSystem().exists(context.getActionDir()));
@@ -517,7 +548,7 @@ public class TestJavaActionExecutor exte
}
});
assertTrue(runningJob.isSuccessful());
- assertFalse(LauncherMapper.isMainSuccessful(runningJob));
+ assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob));
ActionExecutor ae = new JavaActionExecutor();
ae.check(context, context.getAction());
assertTrue(ae.isCompleted(context.getAction().getExternalStatus()));
@@ -546,7 +577,7 @@ public class TestJavaActionExecutor exte
}
});
assertTrue(runningJob.isSuccessful());
- assertFalse(LauncherMapper.isMainSuccessful(runningJob));
+ assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob));
ActionExecutor ae = new JavaActionExecutor();
ae.check(context, context.getAction());
assertTrue(ae.isCompleted(context.getAction().getExternalStatus()));
@@ -597,7 +628,7 @@ public class TestJavaActionExecutor exte
public boolean evaluate() throws Exception {
JavaActionExecutor ae = new JavaActionExecutor();
Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml));
- return LauncherMapper.getRecoveryId(conf, context.getActionDir(), context.getRecoveryId()) != null;
+ return LauncherMapperHelper.getRecoveryId(conf, context.getActionDir(), context.getRecoveryId()) != null;
}
});
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncher.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncher.java?rev=1491589&r1=1491588&r2=1491589&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncher.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncher.java Mon Jun 10 20:15:11 2013
@@ -74,12 +74,11 @@ public class TestLauncher extends XFsTes
jobConf.set("fs.default.name", getNameNodeUri());
- LauncherMapper lm = new LauncherMapper();
- lm.setupMainClass(jobConf, LauncherMainTester.class.getName());
- lm.setupMainArguments(jobConf, arg);
+ LauncherMapperHelper.setupMainClass(jobConf, LauncherMainTester.class.getName());
+ LauncherMapperHelper.setupMainArguments(jobConf, arg);
Configuration actionConf = new XConfiguration();
- lm.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, "");
+ LauncherMapperHelper.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, "");
assertEquals("1", actionConf.get("oozie.job.id"));
assertEquals("1@a", actionConf.get("oozie.action.id"));
@@ -115,14 +114,14 @@ public class TestLauncher extends XFsTes
});
assertTrue(runningJob.isSuccessful());
- assertTrue(LauncherMapper.isMainDone(runningJob));
- assertTrue(LauncherMapper.isMainSuccessful(runningJob));
- assertFalse(LauncherMapper.hasOutputData(runningJob));
- assertFalse(LauncherMapper.hasIdSwap(runningJob));
- assertTrue(LauncherMapper.isMainDone(runningJob));
- assertFalse(fs.exists(LauncherMapper.getErrorPath(actionDir)));
- assertFalse(fs.exists(LauncherMapper.getIdSwapPath(actionDir)));
- assertFalse(fs.exists(LauncherMapper.getOutputDataPath(actionDir)));
+ assertTrue(LauncherMapperHelper.isMainDone(runningJob));
+ assertTrue(LauncherMapperHelper.isMainSuccessful(runningJob));
+ assertFalse(LauncherMapperHelper.hasOutputData(runningJob));
+ assertFalse(LauncherMapperHelper.hasIdSwap(runningJob));
+ assertTrue(LauncherMapperHelper.isMainDone(runningJob));
+ assertFalse(fs.exists(LauncherMapperHelper.getErrorPath(actionDir)));
+ assertFalse(fs.exists(LauncherMapperHelper.getIdSwapPath(actionDir)));
+ assertFalse(fs.exists(LauncherMapperHelper.getOutputDataPath(actionDir)));
}
public void testExit0() throws Exception {
@@ -137,14 +136,14 @@ public class TestLauncher extends XFsTes
});
assertTrue(runningJob.isSuccessful());
- assertTrue(LauncherMapper.isMainDone(runningJob));
- assertTrue(LauncherMapper.isMainSuccessful(runningJob));
- assertFalse(LauncherMapper.hasOutputData(runningJob));
- assertFalse(LauncherMapper.hasIdSwap(runningJob));
- assertTrue(LauncherMapper.isMainDone(runningJob));
- assertFalse(fs.exists(LauncherMapper.getErrorPath(actionDir)));
- assertFalse(fs.exists(LauncherMapper.getIdSwapPath(actionDir)));
- assertFalse(fs.exists(LauncherMapper.getOutputDataPath(actionDir)));
+ assertTrue(LauncherMapperHelper.isMainDone(runningJob));
+ assertTrue(LauncherMapperHelper.isMainSuccessful(runningJob));
+ assertFalse(LauncherMapperHelper.hasOutputData(runningJob));
+ assertFalse(LauncherMapperHelper.hasIdSwap(runningJob));
+ assertTrue(LauncherMapperHelper.isMainDone(runningJob));
+ assertFalse(fs.exists(LauncherMapperHelper.getErrorPath(actionDir)));
+ assertFalse(fs.exists(LauncherMapperHelper.getIdSwapPath(actionDir)));
+ assertFalse(fs.exists(LauncherMapperHelper.getOutputDataPath(actionDir)));
}
public void testExit1() throws Exception {
@@ -159,14 +158,14 @@ public class TestLauncher extends XFsTes
});
assertTrue(runningJob.isSuccessful());
- assertTrue(LauncherMapper.isMainDone(runningJob));
- assertFalse(LauncherMapper.isMainSuccessful(runningJob));
- assertFalse(LauncherMapper.hasOutputData(runningJob));
- assertFalse(LauncherMapper.hasIdSwap(runningJob));
- assertTrue(LauncherMapper.isMainDone(runningJob));
- assertTrue(fs.exists(LauncherMapper.getErrorPath(actionDir)));
- assertFalse(fs.exists(LauncherMapper.getIdSwapPath(actionDir)));
- assertFalse(fs.exists(LauncherMapper.getOutputDataPath(actionDir)));
+ assertTrue(LauncherMapperHelper.isMainDone(runningJob));
+ assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob));
+ assertFalse(LauncherMapperHelper.hasOutputData(runningJob));
+ assertFalse(LauncherMapperHelper.hasIdSwap(runningJob));
+ assertTrue(LauncherMapperHelper.isMainDone(runningJob));
+ assertTrue(fs.exists(LauncherMapperHelper.getErrorPath(actionDir)));
+ assertFalse(fs.exists(LauncherMapperHelper.getIdSwapPath(actionDir)));
+ assertFalse(fs.exists(LauncherMapperHelper.getOutputDataPath(actionDir)));
}
public void testException() throws Exception {
@@ -181,14 +180,14 @@ public class TestLauncher extends XFsTes
});
assertTrue(runningJob.isSuccessful());
- assertTrue(LauncherMapper.isMainDone(runningJob));
- assertFalse(LauncherMapper.isMainSuccessful(runningJob));
- assertFalse(LauncherMapper.hasOutputData(runningJob));
- assertFalse(LauncherMapper.hasIdSwap(runningJob));
- assertTrue(LauncherMapper.isMainDone(runningJob));
- assertTrue(fs.exists(LauncherMapper.getErrorPath(actionDir)));
- assertFalse(fs.exists(LauncherMapper.getIdSwapPath(actionDir)));
- assertFalse(fs.exists(LauncherMapper.getOutputDataPath(actionDir)));
+ assertTrue(LauncherMapperHelper.isMainDone(runningJob));
+ assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob));
+ assertFalse(LauncherMapperHelper.hasOutputData(runningJob));
+ assertFalse(LauncherMapperHelper.hasIdSwap(runningJob));
+ assertTrue(LauncherMapperHelper.isMainDone(runningJob));
+ assertTrue(fs.exists(LauncherMapperHelper.getErrorPath(actionDir)));
+ assertFalse(fs.exists(LauncherMapperHelper.getIdSwapPath(actionDir)));
+ assertFalse(fs.exists(LauncherMapperHelper.getOutputDataPath(actionDir)));
}
public void testOutput() throws Exception {
@@ -203,14 +202,14 @@ public class TestLauncher extends XFsTes
});
assertTrue(runningJob.isSuccessful());
- assertTrue(LauncherMapper.isMainDone(runningJob));
- assertTrue(LauncherMapper.isMainSuccessful(runningJob));
- assertTrue(LauncherMapper.hasOutputData(runningJob));
- assertFalse(LauncherMapper.hasIdSwap(runningJob));
- assertTrue(LauncherMapper.isMainDone(runningJob));
- assertFalse(fs.exists(LauncherMapper.getErrorPath(actionDir)));
- assertFalse(fs.exists(LauncherMapper.getIdSwapPath(actionDir)));
- assertTrue(fs.exists(LauncherMapper.getOutputDataPath(actionDir)));
+ assertTrue(LauncherMapperHelper.isMainDone(runningJob));
+ assertTrue(LauncherMapperHelper.isMainSuccessful(runningJob));
+ assertTrue(LauncherMapperHelper.hasOutputData(runningJob));
+ assertFalse(LauncherMapperHelper.hasIdSwap(runningJob));
+ assertTrue(LauncherMapperHelper.isMainDone(runningJob));
+ assertFalse(fs.exists(LauncherMapperHelper.getErrorPath(actionDir)));
+ assertFalse(fs.exists(LauncherMapperHelper.getIdSwapPath(actionDir)));
+ assertTrue(fs.exists(LauncherMapperHelper.getOutputDataPath(actionDir)));
}
public void testNewId() throws Exception {
@@ -225,14 +224,14 @@ public class TestLauncher extends XFsTes
});
assertTrue(runningJob.isSuccessful());
- assertTrue(LauncherMapper.isMainDone(runningJob));
- assertTrue(LauncherMapper.isMainSuccessful(runningJob));
- assertFalse(LauncherMapper.hasOutputData(runningJob));
- assertTrue(LauncherMapper.hasIdSwap(runningJob));
- assertTrue(LauncherMapper.isMainDone(runningJob));
- assertFalse(fs.exists(LauncherMapper.getErrorPath(actionDir)));
- assertTrue(fs.exists(LauncherMapper.getIdSwapPath(actionDir)));
- assertFalse(fs.exists(LauncherMapper.getOutputDataPath(actionDir)));
+ assertTrue(LauncherMapperHelper.isMainDone(runningJob));
+ assertTrue(LauncherMapperHelper.isMainSuccessful(runningJob));
+ assertFalse(LauncherMapperHelper.hasOutputData(runningJob));
+ assertTrue(LauncherMapperHelper.hasIdSwap(runningJob));
+ assertTrue(LauncherMapperHelper.isMainDone(runningJob));
+ assertFalse(fs.exists(LauncherMapperHelper.getErrorPath(actionDir)));
+ assertTrue(fs.exists(LauncherMapperHelper.getIdSwapPath(actionDir)));
+ assertFalse(fs.exists(LauncherMapperHelper.getOutputDataPath(actionDir)));
}
public void testSecurityManager() throws Exception {
@@ -247,14 +246,14 @@ public class TestLauncher extends XFsTes
});
assertTrue(runningJob.isSuccessful());
- assertTrue(LauncherMapper.isMainDone(runningJob));
- assertTrue(LauncherMapper.isMainSuccessful(runningJob));
- assertFalse(LauncherMapper.hasOutputData(runningJob));
- assertFalse(LauncherMapper.hasIdSwap(runningJob));
- assertTrue(LauncherMapper.isMainDone(runningJob));
- assertFalse(fs.exists(LauncherMapper.getErrorPath(actionDir)));
- assertFalse(fs.exists(LauncherMapper.getIdSwapPath(actionDir)));
- assertFalse(fs.exists(LauncherMapper.getOutputDataPath(actionDir)));
+ assertTrue(LauncherMapperHelper.isMainDone(runningJob));
+ assertTrue(LauncherMapperHelper.isMainSuccessful(runningJob));
+ assertFalse(LauncherMapperHelper.hasOutputData(runningJob));
+ assertFalse(LauncherMapperHelper.hasIdSwap(runningJob));
+ assertTrue(LauncherMapperHelper.isMainDone(runningJob));
+ assertFalse(fs.exists(LauncherMapperHelper.getErrorPath(actionDir)));
+ assertFalse(fs.exists(LauncherMapperHelper.getIdSwapPath(actionDir)));
+ assertFalse(fs.exists(LauncherMapperHelper.getOutputDataPath(actionDir)));
}
// Test to ensure that the property value "oozie.action.prepare.xml" in the configuration of the job is an empty
@@ -268,10 +267,9 @@ public class TestLauncher extends XFsTes
jobConf.set("user.name", getTestUser());
jobConf.set("fs.default.name", getNameNodeUri());
- LauncherMapper lm = new LauncherMapper();
Configuration actionConf = new XConfiguration();
String prepareBlock = "";
- lm.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, prepareBlock);
+ LauncherMapperHelper.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, prepareBlock);
assertTrue(jobConf.get("oozie.action.prepare.xml").equals(""));
}
@@ -288,31 +286,30 @@ public class TestLauncher extends XFsTes
jobConf.set("user.name", getTestUser());
jobConf.set("fs.default.name", getNameNodeUri());
- LauncherMapper lm = new LauncherMapper();
Configuration actionConf = new XConfiguration();
String prepareBlock = "<prepare>" + "<mkdir path='" + newDir + "'/>" + "</prepare>";
- lm.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, prepareBlock);
+ LauncherMapperHelper.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, prepareBlock);
assertTrue(jobConf.get("oozie.action.prepare.xml").equals(prepareBlock));
}
public void testSetupMainClass() throws Exception {
Configuration conf = new Configuration(false);
- LauncherMapper.setupMainClass(conf, "");
+ LauncherMapperHelper.setupMainClass(conf, "");
assertNull(conf.get("oozie.launcher.action.main.class"));
conf = new Configuration(false);
- LauncherMapper.setupMainClass(conf, "org.blah.myclass1");
+ LauncherMapperHelper.setupMainClass(conf, "org.blah.myclass1");
assertEquals(conf.get("oozie.launcher.action.main.class"), "org.blah.myclass1");
conf = new Configuration(false);
conf.set("oozie.launcher.action.main.class", "org.blah.myclass2");
- LauncherMapper.setupMainClass(conf, "");
+ LauncherMapperHelper.setupMainClass(conf, "");
assertEquals(conf.get("oozie.launcher.action.main.class"), "org.blah.myclass2");
// the passed argument (myclass1) should have priority
conf = new Configuration(false);
conf.set("oozie.launcher.action.main.class", "org.blah.myclass2");
- LauncherMapper.setupMainClass(conf, "org.blah.myclass1");
+ LauncherMapperHelper.setupMainClass(conf, "org.blah.myclass1");
assertEquals(conf.get("oozie.launcher.action.main.class"), "org.blah.myclass1");
}
@@ -329,18 +326,16 @@ public class TestLauncher extends XFsTes
jobConf.set("user.name", getTestUser());
jobConf.set("fs.default.name", getNameNodeUri());
- LauncherMapper lm = new LauncherMapper();
Configuration actionConf = new XConfiguration();
actionConf.set("mapreduce.job.cache.files", "a.jar,aa.jar#aa.jar");
- lm.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, "");
+ LauncherMapperHelper.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, "");
assertFalse(jobConf.getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false));
assertEquals("a.jar,aa.jar#aa.jar", actionConf.get("mapreduce.job.cache.files"));
Services.get().getConf().setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
- lm = new LauncherMapper();
actionConf = new XConfiguration();
actionConf.set("mapreduce.job.cache.files", "a.jar,aa.jar#aa.jar");
- lm.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, "");
+ LauncherMapperHelper.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, "");
assertTrue(jobConf.getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false));
assertEquals("aa.jar#aa.jar", actionConf.get("mapreduce.job.cache.files"));
}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java?rev=1491589&r1=1491588&r2=1491589&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java Mon Jun 10 20:15:11 2013
@@ -17,7 +17,6 @@
*/
package org.apache.oozie.action.hadoop;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
@@ -29,7 +28,6 @@ import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
-import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
@@ -43,8 +41,6 @@ import java.io.FileInputStream;
import java.io.StringReader;
import java.io.Writer;
import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.List;
public class TestMapReduceActionError extends ActionExecutorTestCase {
@@ -54,109 +50,6 @@ public class TestMapReduceActionError ex
setSystemProperty("oozie.service.ActionService.executor.classes", MapReduceActionExecutor.class.getName());
}
- public void testLauncherJar() throws Exception {
- MapReduceActionExecutor ae = new MapReduceActionExecutor();
- Path jar = new Path(ae.getOozieRuntimeDir(), ae.getLauncherJarName());
- assertTrue(new File(jar.toString()).exists());
- }
-
- public void testSetupMethods() throws Exception {
- MapReduceActionExecutor ae = new MapReduceActionExecutor();
-
- assertEquals("map-reduce", ae.getType());
-
- assertEquals("map-reduce-launcher.jar", ae.getLauncherJarName());
-
- List<Class> classes = new ArrayList<Class>();
- classes.add(LauncherMapper.class);
- classes.add(LauncherSecurityManager.class);
- classes.add(LauncherException.class);
- classes.add(LauncherMainException.class);
- classes.add(PrepareActionsDriver.class);
- classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
- classes.add(ActionStats.class);
- classes.add(ActionType.class);
- classes.add(LauncherMain.class);
- classes.add(MapReduceMain.class);
- classes.add(PipesMain.class);
- assertEquals(classes, ae.getLauncherClasses());
-
- Element actionXml = XmlUtils.parseXml("<map-reduce>" +
- "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
- "<name-node>" + getNameNodeUri() + "</name-node>" +
- "<configuration>" +
- "<property><name>mapred.input.dir</name><value>IN</value></property>" +
- "<property><name>mapred.output.dir</name><value>OUT</value></property>" +
- "</configuration>" +
- "</map-reduce>");
-
- XConfiguration protoConf = new XConfiguration();
- protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
-
- WorkflowJobBean wf = createBaseWorkflow(protoConf, "mr-action");
- WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
- action.setType(ae.getType());
-
- Context context = new Context(wf, action);
-
- Configuration conf = ae.createBaseHadoopConf(context, actionXml);
- ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
- assertEquals("IN", conf.get("mapred.input.dir"));
-
- actionXml = XmlUtils.parseXml("<map-reduce>" +
- "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
- "<name-node>" + getNameNodeUri() + "</name-node>" +
- "<streaming>" +
- "<mapper>M</mapper>" +
- "<reducer>R</reducer>" +
- "<record-reader>RR</record-reader>" +
- "<record-reader-mapping>RRM1=1</record-reader-mapping>" +
- "<record-reader-mapping>RRM2=2</record-reader-mapping>" +
- "<env>e=E</env>" +
- "<env>ee=EE</env>" +
- "</streaming>" +
- "<configuration>" +
- "<property><name>mapred.input.dir</name><value>IN</value></property>" +
- "<property><name>mapred.output.dir</name><value>OUT</value></property>" +
- "</configuration>" +
- "</map-reduce>");
-
- conf = ae.createBaseHadoopConf(context, actionXml);
- ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
- assertEquals("M", conf.get("oozie.streaming.mapper"));
- assertEquals("R", conf.get("oozie.streaming.reducer"));
- assertEquals("RR", conf.get("oozie.streaming.record-reader"));
- assertEquals("2", conf.get("oozie.streaming.record-reader-mapping.size"));
- assertEquals("2", conf.get("oozie.streaming.env.size"));
-
- actionXml = XmlUtils.parseXml("<map-reduce>" +
- "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
- "<name-node>" + getNameNodeUri() + "</name-node>" +
- "<pipes>" +
- "<map>M</map>" +
- "<reduce>R</reduce>" +
- "<inputformat>IF</inputformat>" +
- "<partitioner>P</partitioner>" +
- "<writer>W</writer>" +
- "<program>PP</program>" +
- "</pipes>" +
- "<configuration>" +
- "<property><name>mapred.input.dir</name><value>IN</value></property>" +
- "<property><name>mapred.output.dir</name><value>OUT</value></property>" +
- "</configuration>" +
- "</map-reduce>");
-
- conf = ae.createBaseHadoopConf(context, actionXml);
- ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
- assertEquals("M", conf.get("oozie.pipes.map"));
- assertEquals("R", conf.get("oozie.pipes.reduce"));
- assertEquals("IF", conf.get("oozie.pipes.inputformat"));
- assertEquals("P", conf.get("oozie.pipes.partitioner"));
- assertEquals("W", conf.get("oozie.pipes.writer"));
- assertEquals(getFsTestCaseDir()+"/PP", conf.get("oozie.pipes.program"));
- }
-
private Context createContext(String actionXml) throws Exception {
JavaActionExecutor ae = new JavaActionExecutor();
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java?rev=1491589&r1=1491588&r2=1491589&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java Mon Jun 10 20:15:11 2013
@@ -51,7 +51,7 @@ public class TestPrepareActionsDriver ex
}
JobConf conf = createJobConf();
- LauncherMapper.setupLauncherURIHandlerConf(conf);
+ LauncherMapperHelper.setupLauncherURIHandlerConf(conf);
PrepareActionsDriver.doOperations(prepareXML, conf);
assertTrue(fs.exists(actionDir));
}
@@ -71,7 +71,7 @@ public class TestPrepareActionsDriver ex
try {
prepareXML = "prepare>" + "<mkdir path='" + newDir + "'/>" + "</prepare>";
JobConf conf = createJobConf();
- LauncherMapper.setupLauncherURIHandlerConf(conf);
+ LauncherMapperHelper.setupLauncherURIHandlerConf(conf);
PrepareActionsDriver.doOperations(prepareXML, conf);
fail("Expected to catch an exception but did not encounter any");
} catch (LauncherException le) {
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java?rev=1491589&r1=1491588&r2=1491589&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java Mon Jun 10 20:15:11 2013
@@ -54,43 +54,69 @@ public class TestShellActionExecutor ext
setSystemProperty("oozie.service.ActionService.executor.classes", ShellActionExecutor.class.getName());
}
- /**
- * Verify if the launcher jar is created.
- *
- * @throws Exception
- */
- public void testLauncherJar() throws Exception {
- ShellActionExecutor ae = new ShellActionExecutor();
- Path jar = new Path(ae.getOozieRuntimeDir(), ae.getLauncherJarName());
- assertTrue(new File(jar.toString()).exists());
+ public void testSetupMethodsWithLauncherJar() throws Exception {
+ String defaultVal = Services.get().getConf().get("oozie.action.ship.launcher.jar");
+ try {
+ Services.get().getConf().set("oozie.action.ship.launcher.jar", "true");
+ _testSetupMethods(true);
+ }
+ finally {
+ // back to default
+ if (defaultVal != null) {
+ Services.get().getConf().set("oozie.action.ship.launcher.jar", defaultVal);
+ }
+ }
+ }
+
+ public void testSetupMethodsWithoutLauncherJar() throws Exception {
+ String defaultVal = Services.get().getConf().get("oozie.action.ship.launcher.jar");
+ try {
+ Services.get().getConf().set("oozie.action.ship.launcher.jar", "false");
+ _testSetupMethods(false);
+ }
+ finally {
+ // back to default
+ if (defaultVal != null) {
+ Services.get().getConf().set("oozie.action.ship.launcher.jar", defaultVal);
+ }
+ }
}
/**
* Verify if the ShellActionExecutor indeed setups the basic stuffs
*
+ * @param launcherJarShouldExist
* @throws Exception
*/
- public void testSetupMethods() throws Exception {
+ public void _testSetupMethods(boolean launcherJarShouldExist) throws Exception {
ShellActionExecutor ae = new ShellActionExecutor();
+ Path jar = new Path(ae.getOozieRuntimeDir(), ae.getLauncherJarName());
+ File fJar = new File(jar.toString());
+ fJar.delete();
+ assertFalse(fJar.exists());
+ ae.createLauncherJar();
+ assertEquals(launcherJarShouldExist, fJar.exists());
assertEquals("shell", ae.getType());// ActionExcutor type is 'shell'
// Verify the launcher jar filename
assertEquals("shell-launcher.jar", ae.getLauncherJarName());
- List<Class> classes = new ArrayList<Class>();
- classes.add(LauncherMapper.class);
- classes.add(LauncherSecurityManager.class);
- classes.add(LauncherException.class);
- classes.add(LauncherMainException.class);
- classes.add(PrepareActionsDriver.class);
- classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
- classes.add(ActionStats.class);
- classes.add(ActionType.class);
- classes.add(LauncherMain.class);
- classes.add(MapReduceMain.class);
- classes.add(ShellMain.class);
- classes.add(ShellMain.OutputWriteThread.class);
- assertEquals(classes, ae.getLauncherClasses());// Verify the class
+ if (launcherJarShouldExist) {
+ List<Class> classes = new ArrayList<Class>();
+ classes.add(LauncherMapper.class);
+ classes.add(LauncherSecurityManager.class);
+ classes.add(LauncherException.class);
+ classes.add(LauncherMainException.class);
+ classes.add(PrepareActionsDriver.class);
+ classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
+ classes.add(ActionStats.class);
+ classes.add(ActionType.class);
+ classes.add(LauncherMain.class);
+ classes.add(MapReduceMain.class);
+ classes.add(ShellMain.class);
+ classes.add(ShellMain.OutputWriteThread.class);
+ assertEquals(classes, ae.getLauncherClasses());// Verify the class
+ }
Element actionXml = XmlUtils.parseXml("<shell>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
+ "<name-node>" + getNameNodeUri() + "</name-node>" + "<exec>SCRIPT</exec>"
@@ -247,7 +273,7 @@ public class TestShellActionExecutor ext
assertTrue(launcherJob.isSuccessful());
sleep(2000);// Wait more to make sure no ID swap happens
- assertFalse(LauncherMapper.hasIdSwap(launcherJob));
+ assertFalse(LauncherMapperHelper.hasIdSwap(launcherJob));
ShellActionExecutor ae = new ShellActionExecutor();
ae.check(context, context.getAction());
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java?rev=1491589&r1=1491588&r2=1491589&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java Mon Jun 10 20:15:11 2013
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.action.hadoop.LauncherMapper;
+import org.apache.oozie.action.hadoop.LauncherMapperHelper;
import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
import org.apache.oozie.action.hadoop.MapperReducerForTest;
import org.apache.oozie.client.WorkflowAction;
@@ -215,7 +215,7 @@ public class TestActionCheckXCommand ext
}
});
assertTrue(launcherJob.isSuccessful());
- assertTrue(LauncherMapper.hasIdSwap(launcherJob));
+ assertTrue(LauncherMapperHelper.hasIdSwap(launcherJob));
new ActionCheckXCommand(action.getId()).call();
action = jpaService.execute(wfActionGetCmd);
@@ -325,7 +325,7 @@ public class TestActionCheckXCommand ext
}
});
assertTrue(launcherJob.isSuccessful());
- assertTrue(LauncherMapper.hasIdSwap(launcherJob));
+ assertTrue(LauncherMapperHelper.hasIdSwap(launcherJob));
new ActionCheckXCommand(actionId).call();
WorkflowActionBean action4 = jpaService.execute(wfActionGetCmd);
@@ -390,7 +390,7 @@ public class TestActionCheckXCommand ext
}
});
assertTrue(launcherJob.isSuccessful());
- assertTrue(LauncherMapper.hasIdSwap(launcherJob));
+ assertTrue(LauncherMapperHelper.hasIdSwap(launcherJob));
new ActionCheckXCommand(action1.getId()).call();
WorkflowActionBean action2 = jpaService.execute(wfActionGetCmd);
@@ -455,7 +455,7 @@ public class TestActionCheckXCommand ext
});
assertTrue(launcherJob2.isSuccessful());
- assertTrue(LauncherMapper.hasIdSwap(launcherJob2));
+ assertTrue(LauncherMapperHelper.hasIdSwap(launcherJob2));
new ActionCheckXCommand(actionId).call();
WorkflowActionBean action4 = jpaService.execute(wfActionGetCmd);
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java?rev=1491589&r1=1491588&r2=1491589&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java Mon Jun 10 20:15:11 2013
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.action.hadoop.LauncherMapper;
+import org.apache.oozie.action.hadoop.LauncherMapperHelper;
import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
import org.apache.oozie.action.hadoop.MapperReducerForTest;
import org.apache.oozie.client.OozieClient;
@@ -171,7 +171,7 @@ public class TestActionStartXCommand ext
}
});
assertTrue(launcherJob.isSuccessful());
- assertTrue(LauncherMapper.hasIdSwap(launcherJob));
+ assertTrue(LauncherMapperHelper.hasIdSwap(launcherJob));
}
public void testActionReuseWfJobAppPath() throws Exception {
@@ -236,7 +236,7 @@ public class TestActionStartXCommand ext
});
// check if launcher job succeeds
assertTrue(launcherJob.isSuccessful());
- assertTrue(LauncherMapper.hasIdSwap(launcherJob));
+ assertTrue(LauncherMapperHelper.hasIdSwap(launcherJob));
}
/**
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java?rev=1491589&r1=1491588&r2=1491589&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java Mon Jun 10 20:15:11 2013
@@ -43,7 +43,7 @@ import org.apache.oozie.DagEngine;
import org.apache.oozie.ForTestingActionExecutor;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.action.hadoop.LauncherMapper;
+import org.apache.oozie.action.hadoop.LauncherMapperHelper;
import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
import org.apache.oozie.action.hadoop.MapperReducerForTest;
import org.apache.oozie.client.CoordinatorAction;
@@ -239,7 +239,7 @@ public class TestRecoveryService extends
}
});
assertTrue(launcherJob.isSuccessful());
- assertTrue(LauncherMapper.hasIdSwap(launcherJob));
+ assertTrue(LauncherMapperHelper.hasIdSwap(launcherJob));
}