You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/02/25 22:42:09 UTC
svn commit: r1449911 [1/8] - in /oozie/trunk: ./
client/src/main/java/org/apache/oozie/cli/
client/src/main/java/org/apache/oozie/client/
client/src/main/java/org/apache/oozie/client/rest/
client/src/test/java/org/apache/oozie/client/rest/ core/ core/s...
Author: virag
Date: Mon Feb 25 21:42:07 2013
New Revision: 1449911
URL: http://svn.apache.org/r1449911
Log:
Merging hcat-intre to trunk
Added:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/URIHandlerException.java
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/HCatDependencyCache.java
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/HCatMessageHandler.java
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingAction.java
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingActions.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdatePushInputCheckJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/
oozie/trunk/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSConnectionInfo.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/MessageHandler.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/HCatAccessorException.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/HCatAccessorService.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/URIHandlerService.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/UserGroupInformationService.java
oozie/trunk/core/src/main/java/org/apache/oozie/util/HCatURI.java
oozie/trunk/core/src/main/java/org/apache/oozie/util/MappingRule.java
oozie/trunk/core/src/main/resources/ehcache-default.xml
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherFSURIHandler.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherHCatURIHandler.java
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
oozie/trunk/core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java
oozie/trunk/core/src/test/java/org/apache/oozie/dependency/
oozie/trunk/core/src/test/java/org/apache/oozie/dependency/TestFSURIHandler.java
oozie/trunk/core/src/test/java/org/apache/oozie/dependency/TestHCatURIHandler.java
oozie/trunk/core/src/test/java/org/apache/oozie/dependency/TestURIHandlerService.java
oozie/trunk/core/src/test/java/org/apache/oozie/jms/
oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestHCatMessageHandler.java
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestHCatAccessorService.java
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
oozie/trunk/core/src/test/java/org/apache/oozie/test/MiniHCatServer.java
oozie/trunk/core/src/test/java/org/apache/oozie/test/XHCatTestCase.java
oozie/trunk/core/src/test/java/org/apache/oozie/util/TestHCatURI.java
oozie/trunk/core/src/test/resources/coord-job-for-matd-hcat.xml
oozie/trunk/core/src/test/resources/coord-job-for-matd-neg-hcat.xml
oozie/trunk/core/src/test/resources/ehcache.xml
oozie/trunk/examples/src/main/apps/hcatalog/
oozie/trunk/examples/src/main/apps/hcatalog/README
oozie/trunk/examples/src/main/apps/hcatalog/coordinator.xml
oozie/trunk/examples/src/main/apps/hcatalog/id.pig
oozie/trunk/examples/src/main/apps/hcatalog/job.properties
oozie/trunk/examples/src/main/apps/hcatalog/workflow.xml
oozie/trunk/hcataloglibs/
oozie/trunk/hcataloglibs/hcatalog-0.6/
oozie/trunk/hcataloglibs/hcatalog-0.6/pom.xml
oozie/trunk/hcataloglibs/pom.xml
oozie/trunk/sharelib/hcatalog/
oozie/trunk/sharelib/hcatalog/pom.xml
oozie/trunk/src/main/assemblies/hcataloglib.xml
oozie/trunk/src/main/assemblies/hcataloglibs.xml
Modified:
oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java
oozie/trunk/core/pom.xml
oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FileSystemActions.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java
oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java
oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInfoJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInputCheckJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/HadoopAccessorException.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/SchedulerService.java
oozie/trunk/core/src/main/java/org/apache/oozie/util/XLog.java
oozie/trunk/core/src/main/resources/oozie-default.xml
oozie/trunk/core/src/main/resources/oozie-log4j.properties
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFileSystemActions.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/client/rest/TestJsonCoordinatorAction.java
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
oozie/trunk/core/src/test/java/org/apache/oozie/coord/TestCoordELFunctions.java
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestJobsServlet.java
oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
oozie/trunk/core/src/test/java/org/apache/oozie/test/XTestCase.java
oozie/trunk/core/src/test/java/org/apache/oozie/util/TestGraphGenerator.java
oozie/trunk/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
oozie/trunk/docs/src/site/twiki/DG_QuickStart.twiki
oozie/trunk/docs/src/site/twiki/ENG_Custom_Authentication.twiki
oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
oozie/trunk/examples/src/test/java/org/apache/oozie/example/TestLocalOozieExample.java
oozie/trunk/login/src/main/java/org/apache/oozie/servlet/login/LoginServlet.java
oozie/trunk/login/src/main/webapp/WEB-INF/web.xml
oozie/trunk/minitest/src/test/java/org/apache/oozie/test/WorkflowTest.java
oozie/trunk/pom.xml
oozie/trunk/release-log.txt
oozie/trunk/sharelib/pom.xml
oozie/trunk/src/main/assemblies/sharelib.xml
oozie/trunk/tests/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
oozie/trunk/tests/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigMain.java
oozie/trunk/webapp/src/main/webapp/oozie-console.js
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java Mon Feb 25 21:42:07 2013
@@ -981,10 +981,6 @@ public class OozieCLI {
System.out.println(RULER);
for (CoordinatorAction action : actions) {
- String missingDep = action.getMissingDependencies();
- if(missingDep != null && !missingDep.isEmpty()) {
- missingDep = missingDep.split(INSTANCE_SEPARATOR)[0];
- }
System.out.println(maskIfNull(action.getId()) + VERBOSE_DELIMITER + action.getActionNumber()
+ VERBOSE_DELIMITER + maskIfNull(action.getConsoleUrl()) + VERBOSE_DELIMITER
+ maskIfNull(action.getErrorCode()) + VERBOSE_DELIMITER + maskIfNull(action.getErrorMessage())
@@ -994,7 +990,7 @@ public class OozieCLI {
+ maskDate(action.getCreatedTime(), timeZoneId, verbose) + VERBOSE_DELIMITER
+ maskDate(action.getNominalTime(), timeZoneId, verbose) + action.getStatus() + VERBOSE_DELIMITER
+ maskDate(action.getLastModifiedTime(), timeZoneId, verbose) + VERBOSE_DELIMITER
- + maskIfNull(missingDep));
+ + maskIfNull(getFirstMissingDependencies(action)));
System.out.println(RULER);
}
@@ -1056,11 +1052,7 @@ public class OozieCLI {
System.out.println("Nominal Time : " + maskDate(coordAction.getNominalTime(), timeZoneId, false));
System.out.println("Status : " + coordAction.getStatus());
System.out.println("Last Modified : " + maskDate(coordAction.getLastModifiedTime(), timeZoneId, false));
- String missingDep = coordAction.getMissingDependencies();
- if(missingDep != null && !missingDep.isEmpty()) {
- missingDep = missingDep.split(INSTANCE_SEPARATOR)[0];
- }
- System.out.println("First Missing Dependency : " + maskIfNull(missingDep));
+ System.out.println("First Missing Dependency : " + maskIfNull(getFirstMissingDependencies(coordAction)));
System.out.println(RULER);
}
@@ -1537,7 +1529,7 @@ public class OozieCLI {
Schema schema = factory.newSchema(sources.toArray(new StreamSource[sources.size()]));
Validator validator = schema.newValidator();
validator.validate(new StreamSource(new FileReader(file)));
- System.out.println("Valid worflow-app");
+ System.out.println("Valid workflow-app");
}
catch (Exception ex) {
throw new OozieCLIException("Invalid workflow-app, " + ex.toString(), ex);
@@ -1637,4 +1629,23 @@ public class OozieCLI {
throw new OozieCLIException(ex.toString(), ex);
}
}
+
+ private String getFirstMissingDependencies(CoordinatorAction action) {
+ StringBuilder allDeps = new StringBuilder();
+ String missingDep = action.getMissingDependencies();
+ boolean depExists = false;
+ if (missingDep != null && !missingDep.isEmpty()) {
+ allDeps.append(missingDep.split(INSTANCE_SEPARATOR)[0]);
+ depExists = true;
+ }
+ String pushDeps = action.getPushMissingDependencies();
+ if (pushDeps != null && !pushDeps.isEmpty()) {
+ if(depExists) {
+ allDeps.append(INSTANCE_SEPARATOR);
+ }
+ allDeps.append(pushDeps.split(INSTANCE_SEPARATOR)[0]);
+ }
+ return allDeps.toString();
+ }
+
}
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java Mon Feb 25 21:42:07 2013
@@ -105,12 +105,20 @@ public interface CoordinatorAction {
Status getStatus();
/**
- * Return the missing dependencies for the particular action
+ * Return the PULL-based (directory based) missing dependencies for the
+ * particular action
*
* @return the missing dependencies for the particular action
*/
String getMissingDependencies();
+ /**
+ * Return the PUSH-based (e.d HCatalog partition-based ) missing
+ * dependencies for the particular action
+ *
+ * @return the missing dependencies for the particular action
+ */
+ String getPushMissingDependencies();
/**
* Return the external status of the application instance.
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java Mon Feb 25 21:42:07 2013
@@ -115,6 +115,7 @@ public interface JsonTags {
public static final String COORDINATOR_ACTION_NOMINAL_TIME = "nominalTime";
public static final String COORDINATOR_ACTION_STATUS = "status";
public static final String COORDINATOR_ACTION_MISSING_DEPS = "missingDependencies";
+ public static final String COORDINATOR_ACTION_PUSH_MISSING_DEPS = "pushMissingDependencies";
public static final String COORDINATOR_ACTION_EXTERNAL_STATUS = "externalStatus";
public static final String COORDINATOR_ACTION_TRACKER_URI = "trackerUri";
public static final String COORDINATOR_ACTION_CONSOLE_URL = "consoleUrl";
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java Mon Feb 25 21:42:07 2013
@@ -119,6 +119,8 @@ public class JsonToBean {
.put("getLastModifiedTime", new Property(JsonTags.COORDINATOR_ACTION_LAST_MODIFIED_TIME, Date.class));
COORD_ACTION
.put("getMissingDependencies", new Property(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, String.class));
+ COORD_ACTION.put("getPushMissingDependencies", new Property(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS,
+ String.class));
COORD_ACTION.put("getExternalStatus", new Property(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, String.class));
COORD_ACTION.put("getTrackerUri", new Property(JsonTags.COORDINATOR_ACTION_TRACKER_URI, String.class));
COORD_ACTION.put("getConsoleUrl", new Property(JsonTags.COORDINATOR_ACTION_CONSOLE_URL, String.class));
Modified: oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java (original)
+++ oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java Mon Feb 25 21:42:07 2013
@@ -181,6 +181,7 @@ public class TestJsonToBean extends Test
json.put(JsonTags.COORDINATOR_ACTION_RUNTIME_CONF, "e");
json.put(JsonTags.COORDINATOR_ACTION_LAST_MODIFIED_TIME, LAST_MODIFIED);
json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, "f");
+ json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, "ff");
json.put(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, "g");
json.put(JsonTags.COORDINATOR_ACTION_TRACKER_URI, "h");
json.put(JsonTags.COORDINATOR_ACTION_CONSOLE_URL, "i");
@@ -217,6 +218,7 @@ public class TestJsonToBean extends Test
assertEquals("e", action.getRunConf());
assertEquals(JsonUtils.parseDateRfc822(LAST_MODIFIED), action.getLastModifiedTime());
assertEquals("f", action.getMissingDependencies());
+ assertEquals("ff", action.getPushMissingDependencies());
assertEquals("g", action.getExternalStatus());
assertEquals("h", action.getTrackerUri());
assertEquals("i", action.getConsoleUrl());
Modified: oozie/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/core/pom.xml?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/pom.xml (original)
+++ oozie/trunk/core/pom.xml Mon Feb 25 21:42:07 2013
@@ -49,9 +49,15 @@
</dependency>
<dependency>
- <groupId>org.apache.oozie</groupId>
- <artifactId>oozie-hbase</artifactId>
- <scope>provided</scope>
+ <groupId>org.apache.oozie</groupId>
+ <artifactId>oozie-hbase</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.oozie</groupId>
+ <artifactId>oozie-hcatalog</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
@@ -102,6 +108,12 @@
</dependency>
<dependency>
+ <groupId>net.sf.ehcache</groupId>
+ <artifactId>ehcache-core</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
@@ -347,6 +359,24 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-client</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-kahadb-store</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<!-- For drawing runtime DAG -->
<dependency>
<groupId>net.sf.jung</groupId>
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java Mon Feb 25 21:42:07 2013
@@ -56,6 +56,8 @@ import org.apache.openjpa.persistence.jd
@NamedQuery(name = "UPDATE_COORD_ACTION_STATUS_PENDING_TIME", query = "update CoordinatorActionBean w set w.status =:status, w.pending =:pending, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
// Update query for InputCheck
@NamedQuery(name = "UPDATE_COORD_ACTION_FOR_INPUTCHECK", query = "update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.missingDependencies = :missingDependencies where w.id = :id"),
+ // Update query for Push-based missing dependency check
+ @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK", query = "update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"),
// Update query for Start
@NamedQuery(name = "UPDATE_COORD_ACTION_FOR_START", query = "update CoordinatorActionBean w set w.status =:status, w.lastModifiedTimestamp = :lastModifiedTime, w.runConf = :runConf, w.externalId = :externalId, w.pending = :pending, w.errorCode = :errorCode, w.errorMessage = :errorMessage where w.id = :id"),
@@ -71,11 +73,11 @@ import org.apache.openjpa.persistence.jd
@NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"),
// Select query used by ActionInfo command
- @NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies from CoordinatorActionBean a where a.id = :id"),
+ @NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies from CoordinatorActionBean a where a.id = :id"),
// Select Query used by Timeout command
@NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.status, a.runConf, a.pending from CoordinatorActionBean a where a.id = :id"),
// Select query used by InputCheck command
- @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.status, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"),
+ @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.status, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"),
// Select query used by CoordActionUpdate command
@NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml from CoordinatorActionBean a where a.externalId = :externalId"),
// Select query used by Check command
@@ -99,7 +101,7 @@ import org.apache.openjpa.persistence.jd
@NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
// Query to retrieve Coordinator actions sorted by nominal time
- @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select a.id, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.jobId, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.timeOut from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
+ @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select a.id, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.jobId, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
// Query to maintain backward compatibility for coord job info command
@NamedQuery(name = "GET_ALL_COLS_FOR_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
// Query to retrieve action id, action status, pending status and external Id of not completed Coordinator actions
@@ -126,7 +128,7 @@ import org.apache.openjpa.persistence.jd
@NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select a.id from CoordinatorActionBean a where a.status = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"),
- @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.status, a.externalId from CoordinatorActionBean a where (a.status = 'WAITING' OR a.status = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
+ @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.status, a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.status = 'WAITING' OR a.status = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
@NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.status, a.externalId from CoordinatorActionBean a where a.pending > 0 AND (a.status = 'SUSPENDED' OR a.status = 'KILLED' OR a.status = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
// Select query used by rerun, requires almost all columns so select * is used
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java Mon Feb 25 21:42:07 2013
@@ -177,6 +177,8 @@ public enum ErrorCode {
E0902(XLog.OPS, "Exception occured: [{0}]"),
E0903(XLog.OPS, "Invalid JobConf, it has not been created by HadoopAccessorService"),
E0904(XLog.STD, "Scheme [{0}] not supported in uri [{1}]"),
+ E0905(XLog.STD, "Scheme not present in uri [{0}]"),
+ E0906(XLog.STD, "URI parsing error : {0}"),
E1001(XLog.STD, "Could not read the coordinator job definition, {0}"),
E1002(XLog.STD, "Invalid coordinator application URI [{0}], {1}"),
@@ -230,6 +232,8 @@ public enum ErrorCode {
E1400(XLog.STD, "doAs (proxyuser) failure"),
+ E1501(XLog.STD, "Error in getting HCat Access [{0}]"),
+
ETEST(XLog.STD, "THIS SHOULD HAPPEN ONLY IN TESTING, invalid job id [{0}]"),;
private String template;
Added: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FSLauncherURIHandler implements LauncherURIHandler {
+
+ @Override
+ public boolean create(URI uri, Configuration conf) throws LauncherException {
+ boolean status = false;
+ try {
+ FileSystem fs = FileSystem.get(uri, conf);
+ Path path = getNormalizedPath(uri);
+ if (!fs.exists(path)) {
+ status = fs.mkdirs(path);
+ if (status) {
+ System.out.println("Creating directory at " + path + " succeeded.");
+ }
+ else {
+ System.out.println("Creating directory at " + path + " failed.");
+ }
+ }
+ }
+ catch (IOException e) {
+ throw new LauncherException("Creating directory at " + uri + " failed.", e);
+ }
+ return status;
+ }
+
+ @Override
+ public boolean delete(URI uri, Configuration conf) throws LauncherException {
+ boolean status = false;
+ try {
+ FileSystem fs = FileSystem.get(uri, conf);
+ Path path = getNormalizedPath(uri);
+ if (fs.exists(path)) {
+ status = fs.delete(path, true);
+ if (status) {
+ System.out.println("Deletion of path " + path + " succeeded.");
+ }
+ else {
+ System.out.println("Deletion of path " + path + " failed.");
+ }
+ }
+ }
+ catch (IOException e) {
+ throw new LauncherException("Deletion of path " + uri + " failed.", e);
+ }
+ return status;
+ }
+
+ private Path getNormalizedPath(URI uri) {
+ // Normalizes uri path replacing // with / in the path which users specify by mistake
+ return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
+ }
+
+ @Override
+ public List<Class<?>> getClassesForLauncher() {
+ return null;
+ }
+
+}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FileSystemActions.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FileSystemActions.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FileSystemActions.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FileSystemActions.java Mon Feb 25 21:42:07 2013
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.action.hadoop;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.w3c.dom.Node;
-/**
- * Class to perform file system operations specified in the prepare block of Workflow
- *
- */
-public class FileSystemActions {
- private static Collection<String> supportedFileSystems;
-
- public FileSystemActions(Collection<String> fileSystems) {
- supportedFileSystems = fileSystems;
- }
-
- /**
- * Method to execute the prepare actions based on the command
- *
- * @param n Child node of the prepare XML
- * @throws LauncherException
- */
- public void execute(Node n) throws LauncherException {
- String command = n.getNodeName();
- if (command.equals("delete")) {
- delete(new Path(n.getAttributes().getNamedItem("path").getNodeValue().trim()));
- } else if (command.equals("mkdir")) {
- mkdir(new Path(n.getAttributes().getNamedItem("path").getNodeValue().trim()));
- }
- }
-
- // Method to delete the specified file based on the path
- private void delete(Path path) throws LauncherException {
- try {
- validatePath(path, true);
- FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
- if (fs.exists(path)) {
- if (!fs.delete(path, true)) {
- String deleteFailed = "Deletion of path " + path.toString() + " failed.";
- System.out.println(deleteFailed);
- throw new LauncherException(deleteFailed);
- } else {
- System.out.println("Deletion of path " + path.toString() + " was successful.");
- }
- }
- } catch (IOException ex) {
- throw new LauncherException(ex.getMessage(), ex);
- }
-
- }
-
- // Method to create a directory based on the path
- private void mkdir(Path path) throws LauncherException {
- try {
- validatePath(path, true);
- FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
- if (!fs.exists(path)) {
- if (!fs.mkdirs(path)) {
- String mkdirFailed = "Creating directory at " + path + " failed.";
- System.out.println(mkdirFailed);
- throw new LauncherException(mkdirFailed);
- } else {
- System.out.println("Creating directory at path " + path + " was successful.");
- }
- }
- } catch (IOException ex) {
- throw new LauncherException(ex.getMessage(), ex);
- }
- }
-
- // Method to validate the path provided for the prepare action
- private void validatePath(Path path, boolean withScheme) throws LauncherException {
- String scheme = path.toUri().getScheme();
- if (withScheme) {
- if (scheme == null) {
- String nullScheme = "Scheme of the path " + path + " is null";
- System.out.println(nullScheme);
- throw new LauncherException(nullScheme);
- } else if (supportedFileSystems.size() != 1 || !supportedFileSystems.iterator().next().equals("*")) {
- if (!supportedFileSystems.contains(scheme.toLowerCase())) {
- String unsupportedScheme = "Scheme of '" + path + "' is not supported.";
- System.out.println(unsupportedScheme);
- throw new LauncherException(unsupportedScheme);
- }
- }
- } else if (scheme != null) {
- String notNullScheme = "Scheme of the path " + path + " is not null as specified.";
- System.out.println(notNullScheme);
- throw new LauncherException(notNullScheme);
- }
- }
-}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hcatalog.api.ConnectionFailureException;
+import org.apache.hcatalog.api.HCatClient;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.oozie.util.HCatURI;
+
+public class HCatLauncherURIHandler implements LauncherURIHandler {
+
+ private static List<Class<?>> classesToShip = new ArrayList<Class<?>>();
+
+ static {
+ classesToShip.add(HCatURI.class);
+ }
+
+ @Override
+ public boolean create(URI uri, Configuration conf) throws LauncherException {
+ throw new UnsupportedOperationException("Creation of partition is not supported for " + uri);
+ }
+
+ @Override
+ public boolean delete(URI uri, Configuration conf) throws LauncherException {
+ HCatClient client = getHCatClient(uri, conf);
+ try {
+ HCatURI hcatURI = new HCatURI(uri.toString());
+ client.dropPartitions(hcatURI.getDb(), hcatURI.getTable(), hcatURI.getPartitionMap(), true);
+ System.out.println("Dropped partitions for " + uri);
+ return true;
+ }
+ catch (ConnectionFailureException e) {
+ throw new LauncherException("Error trying to drop " + uri, e);
+ }
+ catch (HCatException e) {
+ throw new LauncherException("Error trying to drop " + uri, e);
+ }
+ catch (URISyntaxException e) {
+ throw new LauncherException("Error trying to drop " + uri, e);
+ }
+ finally {
+ closeQuietly(client);
+ }
+ }
+
+ private HCatClient getHCatClient(URI uri, Configuration conf) throws LauncherException {
+ final HiveConf hiveConf = new HiveConf(conf, this.getClass());
+ String serverURI = getMetastoreConnectURI(uri);
+ if (!serverURI.equals("")) {
+ hiveConf.set("hive.metastore.local", "false");
+ }
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
+ try {
+ System.out.println("Creating HCatClient for user=" + UserGroupInformation.getCurrentUser() + " and server="
+ + serverURI);
+ // Delegation token fetched from metastore has new Text() as service and
+ // HiveMetastoreClient looks for the same if not overriden by hive.metastore.token.signature
+ // We are good as long as HCatCredentialHelper does not change the service of the token.
+ return HCatClient.create(hiveConf);
+ }
+ catch (HCatException e) {
+ throw new LauncherException("Error trying to connect to " + serverURI, e);
+ }
+ catch (IOException e) {
+ throw new LauncherException("Error trying to connect to " + serverURI, e);
+ }
+ }
+
+ private String getMetastoreConnectURI(URI uri) {
+ String metastoreURI;
+ // For unit tests
+ if (uri.getAuthority().equals("unittest-local")) {
+ metastoreURI = "";
+ }
+ else {
+ // Hardcoding hcat to thrift mapping till support for webhcat(templeton)
+ // is added
+ metastoreURI = "thrift://" + uri.getAuthority();
+ }
+ return metastoreURI;
+ }
+
+ private void closeQuietly(HCatClient client) {
+ if (client != null) {
+ try {
+ client.close();
+ }
+ catch (Exception ignore) {
+ System.err.println("Error closing hcat client");
+ ignore.printStackTrace(System.err);
+ }
+ }
+ }
+
+ @Override
+ public List<Class<?>> getClassesForLauncher() {
+ return classesToShip;
+ }
+
+}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java Mon Feb 25 21:42:07 2013
@@ -61,6 +61,7 @@ import org.apache.oozie.client.WorkflowA
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.servlet.CallbackServlet;
import org.apache.oozie.util.ELEvaluator;
@@ -127,8 +128,8 @@ public class JavaActionExecutor extends
classes.add(LauncherSecurityManager.class);
classes.add(LauncherException.class);
classes.add(LauncherMainException.class);
- classes.add(FileSystemActions.class);
classes.add(PrepareActionsDriver.class);
+ classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
classes.add(ActionStats.class);
classes.add(ActionType.class);
return classes;
@@ -388,39 +389,44 @@ public class JavaActionExecutor extends
}
}
- protected void addShareLib(Path appPath, Configuration conf, String actionShareLibName)
- throws ActionExecutorException {
- if (actionShareLibName != null) {
- try {
- Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath();
- if (systemLibPath != null) {
- Path actionLibPath = new Path(systemLibPath, actionShareLibName);
- String user = conf.get("user.name");
- FileSystem fs;
- // If the actionLibPath has a valid scheme and authority, then use them to determine the filesystem that the
- // sharelib resides on; otherwise, assume it resides on the same filesystem as the appPath and use the appPath
- // to determine the filesystem
- if (actionLibPath.toUri().getScheme() != null && actionLibPath.toUri().getAuthority() != null) {
- fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, actionLibPath.toUri(), conf);
- }
- else {
- fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
- }
- if (fs.exists(actionLibPath)) {
- FileStatus[] files = fs.listStatus(actionLibPath);
- for (FileStatus file : files) {
- addToCache(conf, actionLibPath, file.getPath().toUri().getPath(), false);
+ protected void addShareLib(Path appPath, Configuration conf, String[] actionShareLibNames)
+ throws ActionExecutorException {
+ if (actionShareLibNames != null) {
+ for (String actionShareLibName : actionShareLibNames) {
+ try {
+ Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath();
+ if (systemLibPath != null) {
+ Path actionLibPath = new Path(systemLibPath, actionShareLibName.trim());
+ String user = conf.get("user.name");
+ FileSystem fs;
+ // If the actionLibPath has a valid scheme and authority, then use them to
+ // determine the filesystem that the sharelib resides on; otherwise, assume
+ // it resides on the same filesystem as the appPath and use the appPath to
+ // determine the filesystem
+ if (actionLibPath.toUri().getScheme() != null && actionLibPath.toUri().getAuthority() != null) {
+ fs = Services.get().get(HadoopAccessorService.class)
+ .createFileSystem(user, actionLibPath.toUri(), conf);
+ }
+ else {
+ fs = Services.get().get(HadoopAccessorService.class)
+ .createFileSystem(user, appPath.toUri(), conf);
+ }
+ if (fs.exists(actionLibPath)) {
+ FileStatus[] files = fs.listStatus(actionLibPath);
+ for (FileStatus file : files) {
+ addToCache(conf, actionLibPath, file.getPath().toUri().getPath(), false);
+ }
}
}
}
- }
- catch (HadoopAccessorException ex){
- throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
- ex.getErrorCode().toString(), ex.getMessage());
- }
- catch (IOException ex){
- throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
- "It should never happen", ex.getMessage());
+ catch (HadoopAccessorException ex) {
+ throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, ex.getErrorCode()
+ .toString(), ex.getMessage());
+ }
+ catch (IOException ex) {
+ throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
+ "It should never happen", ex.getMessage());
+ }
}
}
}
@@ -500,7 +506,7 @@ public class JavaActionExecutor extends
// Add action specific share libs
addActionShareLib(appPath, conf, context, actionXml);
// Add common sharelibs for Oozie
- addShareLib(appPath, conf, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
+ addShareLib(appPath, conf, new String[]{JavaActionExecutor.OOZIE_COMMON_LIBDIR});
}
private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml)
@@ -516,7 +522,7 @@ public class JavaActionExecutor extends
// Action sharelibs are only added if user has specified to use system libpath
if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) {
// add action specific sharelibs
- addShareLib(appPath, conf, getShareLibName(context, actionXml, conf));
+ addShareLib(appPath, conf, getShareLibNames(context, actionXml, conf));
}
}
@@ -585,8 +591,7 @@ public class JavaActionExecutor extends
prepareXML);
LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
- LauncherMapper.setupSupportedFileSystems(
- launcherJobConf, Services.get().getConf().get(HadoopAccessorService.SUPPORTED_FILESYSTEMS));
+ LauncherMapper.setupLauncherURIHandlerConf(launcherJobConf);
LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
LauncherMapper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
@@ -1161,14 +1166,15 @@ public class JavaActionExecutor extends
/**
- * Return the sharelib name for the action.
+ * Return the sharelib names for the action.
* <p/>
- * If <code>NULL</code> or emtpy, it means that the action does not use the action
+ * If <code>NULL</code> or empty, it means that the action does not use the action
* sharelib.
* <p/>
* If a non-empty string, i.e. <code>foo</code>, it means the action uses the
- * action sharelib subdirectory <code>foo</code> and all JARs in the sharelib
- * <code>foo</code> directory will be in the action classpath.
+ * action sharelib sub-directory <code>foo</code> and all JARs in the sharelib
+ * <code>foo</code> directory will be in the action classpath. Multiple sharelib
+ * sub-directories can be specified as a comma separated list.
* <p/>
* The resolution is done using the following precedence order:
* <ul>
@@ -1181,18 +1187,22 @@ public class JavaActionExecutor extends
*
* @param context executor context.
* @param actionXml
- *@param conf action configuration. @return the action sharelib name.
+ * @param conf action configuration.
+ * @return the action sharelib names.
*/
- protected String getShareLibName(Context context, Element actionXml, Configuration conf) {
- String name = conf.get(ACTION_SHARELIB_FOR + getType());
- if (name == null) {
+ protected String[] getShareLibNames(Context context, Element actionXml, Configuration conf) {
+ String[] names = conf.getStrings(ACTION_SHARELIB_FOR + getType());
+ if (names == null || names.length == 0) {
try {
XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
- name = jobConf.get(ACTION_SHARELIB_FOR + getType());
- if (name == null) {
- name = Services.get().getConf().get(ACTION_SHARELIB_FOR + getType());
- if (name == null) {
- name = getDefaultShareLibName(actionXml);
+ names = jobConf.getStrings(ACTION_SHARELIB_FOR + getType());
+ if (names == null || names.length == 0) {
+ names = Services.get().getConf().getStrings(ACTION_SHARELIB_FOR + getType());
+ if (names == null || names.length == 0) {
+ String name = getDefaultShareLibName(actionXml);
+ if (name != null) {
+ names = new String[] { name };
+ }
}
}
}
@@ -1200,7 +1210,7 @@ public class JavaActionExecutor extends
throw new RuntimeException("It cannot happen, " + ex.toString(), ex);
}
}
- return name;
+ return names;
}
private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for.";
Added: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+public class LauncherException extends Exception {
+
+ /**
+ * Constructs an <code>LauncherException</code> with the
+ * specified detail message.
+ *
+ * @param message the detail message.
+ */
+ LauncherException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and cause.
+ *
+ * @param message the detail message
+ * @param cause the cause exception
+ */
+ LauncherException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java Mon Feb 25 21:42:07 2013
@@ -33,13 +33,14 @@ import java.lang.reflect.Method;
import java.net.URI;
import java.security.Permission;
import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.StringTokenizer;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -53,12 +54,12 @@ import org.apache.hadoop.mapred.RunningJ
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.util.XLog;
public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable {
public static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class";
- public static final String CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS = "oozie.launcher.action.supported.filesystems";
public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = "oozie.action.max.output.data";
@@ -152,8 +153,10 @@ public class LauncherMapper<K1, V1, K2,
launcherConf.set(CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
}
- public static void setupSupportedFileSystems(Configuration launcherConf, String supportedFileSystems) {
- launcherConf.set(CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS, supportedFileSystems);
+ public static void setupLauncherURIHandlerConf(Configuration launcherConf) {
+ for(Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) {
+ launcherConf.set(entry.getKey(), entry.getValue());
+ }
}
public static void setupMainArguments(Configuration launcherConf, String[] args) {
@@ -547,6 +550,7 @@ public class LauncherMapper<K1, V1, K2,
System.out.println();
}
handleActionStatsData(reporter);
+ handleExternalChildIDs(reporter);
File newId = new File(System.getProperty("oozie.action.newId.properties"));
if (newId.exists()) {
Properties props = new Properties();
@@ -665,8 +669,7 @@ public class LauncherMapper<K1, V1, K2,
String prepareXML = getJobConf().get(ACTION_PREPARE_XML);
if (prepareXML != null) {
if (!prepareXML.equals("")) {
- PrepareActionsDriver.doOperations(
- getJobConf().getStringCollection(CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS), prepareXML);
+ PrepareActionsDriver.doOperations(prepareXML, getJobConf());
} else {
System.out.println("There are no prepare actions to execute.");
}
@@ -817,14 +820,3 @@ class LauncherSecurityManager extends Se
exitCode = 0;
}
}
-
-class LauncherException extends Exception {
-
- LauncherException(String message) {
- super(message);
- }
-
- LauncherException(String message, Throwable cause) {
- super(message, cause);
- }
-}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+public interface LauncherURIHandler {
+
+ /**
+ * Create the resource identified by the URI
+ *
+ * @param uri URI of the dependency
+ * @param conf Configuration to access the URI
+ *
+ * @return <code>true</code> if the URI did not exist and was successfully
+ * created; <code>false</code> if the URI already existed
+ *
+ * @throws LauncherException
+ */
+ public boolean create(URI uri, Configuration conf) throws LauncherException;
+
+ /**
+ * Delete the resource identified by the URI
+ *
+ * @param uri URI of the dependency
+ * @param conf Configuration to access the URI
+ *
+ * @return <code>true</code> if the URI exists and was successfully deleted;
+ * <code>false</code> if the URI does not exist
+ * @throws LauncherException
+ */
+ public boolean delete(URI uri, Configuration conf) throws LauncherException;
+
+
+ /**
+ * Get list of classes to ship to launcher for LauncherURIHandler
+ *
+ * @return list of classes to ship to launcher
+ */
+ public List<Class<?>> getClassesForLauncher();
+
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class LauncherURIHandlerFactory {
+
+ public static final String CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX = "oozie.launcher.action.urihandler.scheme.";
+ private Configuration conf;
+
+ public LauncherURIHandlerFactory(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Get LauncherURIHandler to perform operations on a URI in the launcher
+ * @param uri
+ * @return LauncherURIHandler to perform operations on the URI
+ * @throws LauncherException
+ */
+ public LauncherURIHandler getURIHandler(URI uri) throws LauncherException {
+ if (uri.getScheme() == null) {
+ throw new LauncherException("Scheme not present in uri " + uri);
+ }
+ else {
+ String className = conf.get(CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + uri.getScheme());
+ if (className == null) {
+ className = conf.get(CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + "*");
+ }
+ if (className == null) {
+ throw new LauncherException("Scheme " + uri.getScheme() + " not supported in uri " + uri.toString());
+ }
+ Class<?> clazz;
+ try {
+ clazz = Class.forName(className);
+ }
+ catch (ClassNotFoundException e) {
+ throw new LauncherException("Error instantiating LauncherURIHandler", e);
+ }
+ return (LauncherURIHandler) ReflectionUtils.newInstance(clazz, null);
+ }
+ }
+
+}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java Mon Feb 25 21:42:07 2013
@@ -20,12 +20,13 @@ package org.apache.oozie.action.hadoop;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.Collection;
+import java.net.URI;
+import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.oozie.service.HadoopAccessorService;
import org.xml.sax.SAXException;
import org.w3c.dom.Document;
+import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import javax.xml.parsers.DocumentBuilderFactory;
@@ -44,27 +45,25 @@ public class PrepareActionsDriver {
* @param prepareXML Prepare XML block in string format
* @throws LauncherException
*/
- static void doOperations(Collection<String> supportedFileSystems, String prepareXML) throws LauncherException {
+ static void doOperations(String prepareXML, Configuration conf) throws LauncherException {
try {
Document doc = getDocumentFromXML(prepareXML);
doc.getDocumentElement().normalize();
// Get the list of child nodes, basically, each one corresponding to a separate action
NodeList nl = doc.getDocumentElement().getChildNodes();
- FileSystemActions fsActions = new FileSystemActions(supportedFileSystems);
+ LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf);
for (int i = 0; i < nl.getLength(); ++i) {
- String commandType = "";
- /* Logic to find the command type goes here
- commandType = ..........;
- */
- // As of now, the available prepare action is of type hdfs. Hence, assigning the value directly
- commandType = "hdfs";
- if (commandType.equalsIgnoreCase("hdfs")) {
- fsActions.execute(nl.item(i));
- } /*else if(commandType.equalsIgnoreCase("hcat")) { //Other command types go here
- hCatActions.execute(nl.item(i));
- }*/
+ Node n = nl.item(i);
+ String operation = n.getNodeName();
+ if (n.getAttributes() == null || n.getAttributes().getNamedItem("path") == null) {
+ continue;
+ }
+ String path = n.getAttributes().getNamedItem("path").getNodeValue().trim();
+ URI uri = new URI(path);
+ LauncherURIHandler handler = factory.getURIHandler(uri);
+ execute(operation, uri, handler, conf);
}
} catch (IOException ioe) {
throw new LauncherException(ioe.getMessage(), ioe);
@@ -72,6 +71,24 @@ public class PrepareActionsDriver {
throw new LauncherException(saxe.getMessage(), saxe);
} catch (ParserConfigurationException pce) {
throw new LauncherException(pce.getMessage(), pce);
+ } catch (URISyntaxException use) {
+ throw new LauncherException(use.getMessage(), use);
+ }
+ }
+
+ /**
+ * Method to execute the prepare actions based on the command
+ *
+ * @param n Child node of the prepare XML
+ * @throws LauncherException
+ */
+ private static void execute(String operation, URI uri, LauncherURIHandler handler, Configuration conf)
+ throws LauncherException {
+ if (operation.equals("delete")) {
+ handler.delete(uri, conf);
+ }
+ else if (operation.equals("mkdir")) {
+ handler.create(uri, conf);
}
}
@@ -83,4 +100,5 @@ public class PrepareActionsDriver {
InputStream is = new ByteArrayInputStream(prepareXML.getBytes("UTF-8"));
return docBuilder.parse(is);
}
-}
+
+}
\ No newline at end of file
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java Mon Feb 25 21:42:07 2013
@@ -89,6 +89,10 @@ public class JsonCoordinatorAction imple
@Lob
private String missingDependencies;
+ @Column(name = "push_missing_dependencies")
+ @Lob
+ private String pushMissingDependencies;
+
@Basic
@Column(name = "external_status")
private String externalStatus;
@@ -142,6 +146,7 @@ public class JsonCoordinatorAction imple
// json.put(JsonTags.COORDINATOR_ACTION_END_TIME, JsonUtils
// .formatDateRfc822(endTime), timeZoneId);
json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, missingDependencies);
+ json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, pushMissingDependencies);
json.put(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, externalStatus);
json.put(JsonTags.COORDINATOR_ACTION_TRACKER_URI, trackerUri);
json.put(JsonTags.COORDINATOR_ACTION_CONSOLE_URL, consoleUrl);
@@ -240,6 +245,14 @@ public class JsonCoordinatorAction imple
return missingDependencies;
}
+ public String getPushMissingDependencies() {
+ return pushMissingDependencies;
+ }
+
+ public void setPushMissingDependencies(String pushMissingDependencies) {
+ this.pushMissingDependencies = pushMissingDependencies;
+ }
+
public String getExternalStatus() {
return externalStatus;
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java Mon Feb 25 21:42:07 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -139,7 +139,7 @@ public class JsonCoordinatorJob implemen
public JSONObject toJSONObject() {
return toJSONObject("GMT");
}
-
+
@SuppressWarnings("unchecked")
public JSONObject toJSONObject(String timeZoneId) {
JSONObject json = new JSONObject();
@@ -156,7 +156,7 @@ public class JsonCoordinatorJob implemen
json.put(JsonTags.COORDINATOR_JOB_CONCURRENCY, getConcurrency());
json.put(JsonTags.COORDINATOR_JOB_TIMEOUT, getTimeout());
json.put(JsonTags.COORDINATOR_JOB_LAST_ACTION_TIME, JsonUtils.formatDateRfc822(getLastActionTime(), timeZoneId));
- json.put(JsonTags.COORDINATOR_JOB_NEXT_MATERIALIZED_TIME,
+ json.put(JsonTags.COORDINATOR_JOB_NEXT_MATERIALIZED_TIME,
JsonUtils.formatDateRfc822(getNextMaterializedTime(), timeZoneId));
json.put(JsonTags.COORDINATOR_JOB_START_TIME, JsonUtils.formatDateRfc822(getStartTime(), timeZoneId));
json.put(JsonTags.COORDINATOR_JOB_END_TIME, JsonUtils.formatDateRfc822(getEndTime(), timeZoneId));
@@ -405,8 +405,6 @@ public class JsonCoordinatorJob implemen
/**
* Set pending to true
- *
- * @param pending set pending to true
*/
public void setPending() {
this.pending = 1;
@@ -414,8 +412,6 @@ public class JsonCoordinatorJob implemen
/**
* Set pending to false
- *
- * @param pending set pending to false
*/
public void resetPending() {
this.pending = 0;
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java Mon Feb 25 21:42:07 2013
@@ -19,11 +19,12 @@ package org.apache.oozie.command.coord;
import java.io.IOException;
import java.io.StringReader;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
@@ -34,16 +35,17 @@ import org.apache.oozie.command.CommandE
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.coord.CoordELEvaluator;
import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.dependency.URIHandlerException;
import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.service.HadoopAccessorException;
-import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.Instrumentation;
@@ -96,14 +98,7 @@ public class CoordActionInputCheckXComma
if (nominalTime.compareTo(currentTime) > 0) {
queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime
.getTime()), getCoordInputCheckRequeueInterval()));
- // update lastModifiedTime
- coordAction.setLastModifiedTime(new Date());
- try {
- jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(coordAction));
- }
- catch (JPAExecutorException e) {
- throw new CommandException(e);
- }
+ updateCoordAction(coordAction, false);
LOG.info("[" + actionId
+ "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
+ currentTime + ", nominal=" + nominalTime);
@@ -139,50 +134,74 @@ public class CoordActionInputCheckXComma
nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
}
String nonExistListStr = nonExistList.toString();
- if (!nonExistListStr.equals(missingDeps) || missingDeps.isEmpty()) {
- // missingDeps empty means action should become READY
+ if (!nonExistListStr.equals(missingDeps)) {
+ // missingDeps null means action should become READY
isChangeInDependency = true;
coordAction.setMissingDependencies(nonExistListStr);
}
- if (status == true) {
+ String pushDeps = coordAction.getPushMissingDependencies();
+ if (status == true && (pushDeps == null || pushDeps.length() == 0)) {
coordAction.setStatus(CoordinatorAction.Status.READY);
// pass jobID to the CoordActionReadyXCommand
queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
}
+ else if (!isTimeout(currentTime)) {
+ if (status == false) {
+ queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
+ getCoordInputCheckRequeueInterval());
+ }
+ else {
+ queue(new CoordPushDependencyCheckXCommand(coordAction.getId()));
+ }
+ }
else {
- long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
- .getCreatedTime().getTime()))
- / (60 * 1000);
- int timeOut = coordAction.getTimeOut();
- if ((timeOut >= 0) && (waitingTime > timeOut)) {
+ if (!nonExistListStr.isEmpty() && pushDeps == null || pushDeps.length() == 0) {
queue(new CoordActionTimeOutXCommand(coordAction), 100);
}
else {
- queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), getCoordInputCheckRequeueInterval());
+ // Let CoordPushDependencyCheckXCommand queue the timeout
+ queue(new CoordPushDependencyCheckXCommand(coordAction.getId()));
}
}
}
catch (Exception e) {
+ if (isTimeout(currentTime)) {
+ queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ }
throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
}
finally {
- coordAction.setLastModifiedTime(new Date());
cron.stop();
- if(jpaService != null) {
- try {
- if (isChangeInDependency) {
- jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(coordAction));
- }
- else {
- jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
- }
+ updateCoordAction(coordAction, isChangeInDependency);
+ }
+ return null;
+ }
+
+
+ private boolean isTimeout(Date currentTime) {
+ long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
+ .getCreatedTime().getTime()))
+ / (60 * 1000);
+ int timeOut = coordAction.getTimeOut();
+ return (timeOut >= 0) && (waitingTime > timeOut);
+ }
+
+ private void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
+ throws CommandException {
+ coordAction.setLastModifiedTime(new Date());
+ if (jpaService != null) {
+ try {
+ if (isChangeInDependency) {
+ jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(coordAction));
}
- catch(JPAExecutorException jex) {
- throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
+ else {
+ jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
}
}
+ catch (JPAExecutorException jex) {
+ throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
+ }
}
- return null;
}
/**
@@ -308,7 +327,7 @@ public class CoordActionInputCheckXComma
Element outputList = eAction.getChild("output-events", eAction.getNamespace());
if (outputList != null) {
for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
- if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) != null) {
+ if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) != null) {
throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
" not permitted in output-event ");
}
@@ -331,11 +350,11 @@ public class CoordActionInputCheckXComma
private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime,
Configuration conf) throws Exception {
for (Element dEvent : eDataEvents) {
- if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) == null) {
+ if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) == null) {
continue;
}
ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf);
- String uresolvedInstance = dEvent.getChild("unresolved-instances", dEvent.getNamespace()).getTextTrim();
+ String uresolvedInstance = dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()).getTextTrim();
String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
StringBuffer resolvedTmp = new StringBuffer();
for (int i = 0; i < unresolvedList.length; i++) {
@@ -360,7 +379,7 @@ public class CoordActionInputCheckXComma
uriInstance.addContent(resolvedTmp.toString());
dEvent.getContent().add(1, uriInstance);
}
- dEvent.removeChild("unresolved-instances", dEvent.getNamespace());
+ dEvent.removeChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace());
}
return true;
@@ -409,9 +428,10 @@ public class CoordActionInputCheckXComma
nonExistList.delete(0, nonExistList.length());
boolean allExists = true;
String existSeparator = "", nonExistSeparator = "";
+ String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
for (int i = 0; i < uriList.length; i++) {
if (allExists) {
- allExists = pathExists(uriList[i], conf);
+ allExists = pathExists(uriList[i], conf, user);
LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists);
}
if (allExists) {
@@ -434,19 +454,22 @@ public class CoordActionInputCheckXComma
* @return true if path exists
* @throws IOException thrown if unable to access the path
*/
- protected boolean pathExists(String sPath, Configuration actionConf) throws IOException {
+ protected boolean pathExists(String sPath, Configuration actionConf, String user) throws IOException {
LOG.debug("checking for the file " + sPath);
- Path path = new Path(sPath);
- String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
try {
- HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
- Configuration fsConf = has.createJobConf(path.toUri().getAuthority());
- return has.createFileSystem(user, path.toUri(), fsConf).exists(path);
+ URI uri = new URI(sPath);
+ URIHandlerService service = Services.get().get(URIHandlerService.class);
+ URIHandler handler = service.getURIHandler(uri);
+ return handler.exists(uri, actionConf, user);
}
- catch (HadoopAccessorException e) {
+ catch (URIHandlerException e) {
coordAction.setErrorCode(e.getErrorCode().toString());
coordAction.setErrorMessage(e.getMessage());
throw new IOException(e);
+ } catch (URISyntaxException e) {
+ coordAction.setErrorCode(ErrorCode.E0906.toString());
+ coordAction.setErrorMessage(e.getMessage());
+ throw new IOException(e);
}
}