You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/02/15 23:26:26 UTC
svn commit: r1446792 [1/2] - in /oozie/branches/hcat-intre: ./
client/src/main/java/org/apache/oozie/cli/ core/
core/src/main/java/org/apache/oozie/
core/src/main/java/org/apache/oozie/action/hadoop/
core/src/main/java/org/apache/oozie/command/coord/ c...
Author: virag
Date: Fri Feb 15 22:26:25 2013
New Revision: 1446792
URL: http://svn.apache.org/r1446792
Log:
OOZIE-1217 Address review comments in OOZIE-1210 (rohini via virag)
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorException.java
Removed:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetaDataAccessorException.java
Modified:
oozie/branches/hcat-intre/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
oozie/branches/hcat-intre/core/pom.xml
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIContext.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/cache/SimpleHCatDependencyCache.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/URIHandlerService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/MappingRule.java
oozie/branches/hcat-intre/core/src/main/resources/oozie-log4j.properties
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/util/TestHCatURI.java
oozie/branches/hcat-intre/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
oozie/branches/hcat-intre/minitest/src/test/java/org/apache/oozie/test/WorkflowTest.java
oozie/branches/hcat-intre/pom.xml
oozie/branches/hcat-intre/release-log.txt
Modified: oozie/branches/hcat-intre/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/client/src/main/java/org/apache/oozie/cli/OozieCLI.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/client/src/main/java/org/apache/oozie/cli/OozieCLI.java (original)
+++ oozie/branches/hcat-intre/client/src/main/java/org/apache/oozie/cli/OozieCLI.java Fri Feb 15 22:26:25 2013
@@ -990,7 +990,7 @@ public class OozieCLI {
+ maskDate(action.getCreatedTime(), timeZoneId, verbose) + VERBOSE_DELIMITER
+ maskDate(action.getNominalTime(), timeZoneId, verbose) + action.getStatus() + VERBOSE_DELIMITER
+ maskDate(action.getLastModifiedTime(), timeZoneId, verbose) + VERBOSE_DELIMITER
- + maskIfNull(getAllMissingDependencies(action)));
+ + maskIfNull(getFirstMissingDependencies(action)));
System.out.println(RULER);
}
@@ -1052,7 +1052,7 @@ public class OozieCLI {
System.out.println("Nominal Time : " + maskDate(coordAction.getNominalTime(), timeZoneId, false));
System.out.println("Status : " + coordAction.getStatus());
System.out.println("Last Modified : " + maskDate(coordAction.getLastModifiedTime(), timeZoneId, false));
- System.out.println("Missing Dependencies : " + maskIfNull(getAllMissingDependencies(coordAction)));
+ System.out.println("First Missing Dependency : " + maskIfNull(getFirstMissingDependencies(coordAction)));
System.out.println(RULER);
}
@@ -1630,7 +1630,7 @@ public class OozieCLI {
}
}
- private String getAllMissingDependencies(CoordinatorAction action) {
+ private String getFirstMissingDependencies(CoordinatorAction action) {
StringBuilder allDeps = new StringBuilder();
String missingDep = action.getMissingDependencies();
boolean depExists = false;
Modified: oozie/branches/hcat-intre/core/pom.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/pom.xml?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/pom.xml (original)
+++ oozie/branches/hcat-intre/core/pom.xml Fri Feb 15 22:26:25 2013
@@ -355,10 +355,22 @@
<dependency>
<groupId>org.apache.activemq</groupId>
- <artifactId>activemq-all</artifactId>
+ <artifactId>activemq-client</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-kahadb-store</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<!-- For drawing runtime DAG -->
<dependency>
<groupId>net.sf.jung</groupId>
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java Fri Feb 15 22:26:25 2013
@@ -232,12 +232,7 @@ public enum ErrorCode {
E1400(XLog.STD, "doAs (proxyuser) failure"),
- E1501(XLog.STD, "Partition Dependency Manager could not add cache entry"),
- E1502(XLog.STD, "Partition cache lookup error"),
- E1503(XLog.STD, "Error in Metadata URI [{0}]"), //TODO - Error code not used, replace this
- E1504(XLog.STD, "Error in getting HCat Access [{0}]"),
- E1505(XLog.STD, "Error with JMS Message, Details: [{0}]"),
- E1506(XLog.STD, "Error in creating connection or message listener, Details: [{0}]"),
+ E1501(XLog.STD, "Error in getting HCat Access [{0}]"),
ETEST(XLog.STD, "THIS SHOULD HAPPEN ONLY IN TESTING, invalid job id [{0}]"),;
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java Fri Feb 15 22:26:25 2013
@@ -75,7 +75,7 @@ public class HCatLauncherURIHandler impl
}
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
try {
- System.out.println("Creating HCatClient for user=" + UserGroupInformation.getLoginUser() + " and server="
+ System.out.println("Creating HCatClient for user=" + UserGroupInformation.getCurrentUser() + " and server="
+ serverURI);
// Delegation token fetched from metastore has new Text() as service and
// HiveMetastoreClient looks for the same if not overriden by hive.metastore.token.signature
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java Fri Feb 15 22:26:25 2013
@@ -165,6 +165,9 @@ public class CoordActionInputCheckXComma
}
}
catch (Exception e) {
+ if (isTimeout(currentTime)) {
+ queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ }
throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
}
finally {
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java Fri Feb 15 22:26:25 2013
@@ -1,15 +1,13 @@
package org.apache.oozie.command.coord;
-import java.net.URI;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.dependency.DependencyChecker;
-import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.Services;
-import org.apache.oozie.service.URIHandlerService;
public class CoordActionUpdatePushMissingDependency extends CoordPushDependencyCheckXCommand {
@@ -23,69 +21,56 @@ public class CoordActionUpdatePushMissin
String pushMissingDeps = coordAction.getPushMissingDependencies();
if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
LOG.info("Nothing to check. Empty push missing dependency");
- return null;
- }
-
- PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
- Collection<String> availDepList = pdms.getAvailableDependencyURIs(actionId);
- if (availDepList == null || availDepList.size() == 0) {
- LOG.info("There is no available dependency List of action ID: [{0}]", actionId);
- if (isTimeout()) { // Poll and check as one last try
- queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
- }
- return null;
- }
-
- LOG.debug("Updating action Id " + actionId + " for available partition of " + availDepList.toString()
- + "missing parts :" + pushMissingDeps);
-
- String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
- List<String> missingDepsAfterCheck = new ArrayList<String>();
- for (String missingDep : missingDepsArray) {
- if (!availDepList.contains(missingDep)) {
- missingDepsAfterCheck.add(missingDep);
- }
- }
- boolean isChangeInDependency = true;
- if (missingDepsAfterCheck.size() == 0) { // All push-based dependencies are available
- onAllPushDependenciesAvailable();
}
else {
- if (missingDepsAfterCheck.size() == missingDepsArray.length) {
- isChangeInDependency = false;
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ Collection<String> availDepList = pdms.getAvailableDependencyURIs(actionId);
+ if (availDepList == null || availDepList.size() == 0) {
+ LOG.info("There are no available dependencies for action ID: [{0}]", actionId);
+ if (isTimeout()) { // Poll and check as one last try
+ queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
+ }
}
else {
- String stillMissingDeps = DependencyChecker.dependenciesAsString(missingDepsAfterCheck);
- coordAction.setPushMissingDependencies(stillMissingDeps);
- }
- if (isTimeout()) { // Poll and check as one last try
- queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
- }
- }
- updateCoordAction(coordAction, isChangeInDependency);
- unregisterAvailableDependencies(availDepList);
- LOG.info("ENDED for Action id [{0}]", actionId);
- return null;
- }
+ LOG.debug("Updating action ID [{0}] with available uris=[{1}] where missing uris=[{2}]", actionId,
+ availDepList.toString(), pushMissingDeps);
- private void unregisterAvailableDependencies(Collection<String> availDepList) {
- URIHandlerService uriService = Services.get().get(URIHandlerService.class);
- for (String availDepURI : availDepList) {
- try {
- URI availableURI = new URI(availDepURI);
- URIHandler handler = uriService.getURIHandler(availableURI);
- if (handler.unregisterFromNotification(availableURI, actionId)) {
- LOG.debug("Succesfully unregistered uri [{0}] for actionId: [{1}] from notifications",
- availableURI, actionId);
+ String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
+ List<String> stillMissingDepsList = new ArrayList<String>(Arrays.asList(missingDepsArray));
+ stillMissingDepsList.removeAll(availDepList);
+ boolean isChangeInDependency = true;
+ if (stillMissingDepsList.size() == 0) {
+ // All push-based dependencies are available
+ onAllPushDependenciesAvailable();
}
else {
- LOG.warn("Unable to unregister uri [{0}] for actionId: [{1}] from notifications", availableURI,
- actionId);
+ if (stillMissingDepsList.size() == missingDepsArray.length) {
+ isChangeInDependency = false;
+ }
+ else {
+ String stillMissingDeps = DependencyChecker.dependenciesAsString(stillMissingDepsList);
+ coordAction.setPushMissingDependencies(stillMissingDeps);
+ }
+ if (isTimeout()) { // Poll and check as one last try
+ queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
+ }
}
+ updateCoordAction(coordAction, isChangeInDependency);
+ removeAvailableDependencies(pdms, availDepList);
+ LOG.info("ENDED for Action id [{0}]", actionId);
}
- catch (Exception e) {
- LOG.warn("Exception while unregistering uri for actionId: [{0}] from notifications", actionId, e);
- }
+ }
+ return null;
+ }
+
+ private void removeAvailableDependencies(PartitionDependencyManagerService pdms, Collection<String> availDepList) {
+ if (pdms.removeAvailableDependencyURIs(actionId, availDepList)) {
+ LOG.debug("Successfully removed uris [{0}] for actionId: [{1}] from available list",
+ availDepList.toString(), actionId);
+ }
+ else {
+ LOG.warn("Failed to remove uris [{0}] for actionId: [{1}] from available list", availDepList.toString(),
+ actionId);
}
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Fri Feb 15 22:26:25 2013
@@ -84,62 +84,73 @@ public class CoordPushDependencyCheckXCo
@Override
protected Void execute() throws CommandException {
- LOG.info("STARTED for Action id [{0}]", actionId);
String pushMissingDeps = coordAction.getPushMissingDependencies();
if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
LOG.info("Nothing to check. Empty push missing dependency");
- return null;
}
- LOG.info("Push missing dependencies for actionID [{0}] is [{1}] ", actionId, pushMissingDeps);
+ else {
+ LOG.info("Push missing dependencies for actionID [{0}] is [{1}] ", actionId, pushMissingDeps);
- Configuration actionConf = null;
- try {
- actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
- }
- catch (IOException e) {
- throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
- }
+ try {
+ Configuration actionConf = null;
+ try {
+ actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
+ }
+ catch (IOException e) {
+ throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
+ }
- String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
- // Check all dependencies during materialization to avoid registering in the cache. But
- // check only first missing one afterwards similar to CoordActionInputCheckXCommand for efficiency.
- // listPartitions is costly.
- ActionDependency actionDep = DependencyChecker.checkForAvailability(missingDepsArray, actionConf,
- !registerForNotification);
-
- boolean isChangeInDependency = true;
- boolean timeout = false;
- if (actionDep.getMissingDependencies().size() == 0) { // All push-based dependencies are available
- onAllPushDependenciesAvailable();
- }
- else {
- if (actionDep.getMissingDependencies().size() == missingDepsArray.length) {
- isChangeInDependency = false;
- }
- else {
- String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDep.getMissingDependencies());
- coordAction.setPushMissingDependencies(stillMissingDeps);
- }
- // Checking for timeout
- timeout = isTimeout();
- if (timeout) {
- queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
+ // Check all dependencies during materialization to avoid registering in the cache.
+ // But check only first missing one afterwards similar to
+ // CoordActionInputCheckXCommand for efficiency. listPartitions is costly.
+ ActionDependency actionDep = DependencyChecker.checkForAvailability(missingDepsArray, actionConf,
+ !registerForNotification);
+
+ boolean isChangeInDependency = true;
+ boolean timeout = false;
+ if (actionDep.getMissingDependencies().size() == 0) {
+ // All push-based dependencies are available
+ onAllPushDependenciesAvailable();
+ }
+ else {
+ if (actionDep.getMissingDependencies().size() == missingDepsArray.length) {
+ isChangeInDependency = false;
+ }
+ else {
+ String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDep
+ .getMissingDependencies());
+ coordAction.setPushMissingDependencies(stillMissingDeps);
+ }
+ // Checking for timeout
+ timeout = isTimeout();
+ if (timeout) {
+ queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ }
+ else {
+ queue(new CoordPushDependencyCheckXCommand(coordAction.getId()),
+ getCoordPushCheckRequeueInterval());
+ }
+ }
+
+ updateCoordAction(coordAction, isChangeInDependency);
+ if (registerForNotification) {
+ registerForNotification(actionDep.getMissingDependencies(), actionConf);
+ }
+ else {
+ unregisterAvailableDependencies(actionDep);
+ }
+ if (timeout) {
+ unregisterMissingDependencies(actionDep.getMissingDependencies());
+ }
}
- else {
- queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), getCoordPushCheckRequeueInterval());
+ catch (Exception e) {
+ if (isTimeout()) {
+ queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ }
+ throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
}
}
-
- updateCoordAction(coordAction, isChangeInDependency);
- if (registerForNotification) {
- registerForNotification(actionDep.getMissingDependencies(), actionConf);
- }
- else {
- unregisterAvailableDependencies(actionDep);
- }
- if (timeout) {
- unregisterMissingDependencies(actionDep.getMissingDependencies());
- }
return null;
}
@@ -153,7 +164,11 @@ public class CoordPushDependencyCheckXCo
return requeueInterval;
}
- // returns true if it is time for timeout
+ /**
+ * Returns true if timeout period has been reached
+ *
+ * @return true if it is time for timeout else false
+ */
protected boolean isTimeout() {
long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
.getCreatedTime().getTime()))
@@ -165,9 +180,19 @@ public class CoordPushDependencyCheckXCo
protected void onAllPushDependenciesAvailable() {
coordAction.setPushMissingDependencies("");
if (coordAction.getMissingDependencies() == null || coordAction.getMissingDependencies().length() == 0) {
- coordAction.setStatus(CoordinatorAction.Status.READY);
- // pass jobID to the CoordActionReadyXCommand
- queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
+ Date nominalTime = coordAction.getNominalTime();
+ Date currentTime = new Date();
+ // The action should become READY only if current time > nominal time;
+ // CoordActionInputCheckXCommand will take care of moving it to READY when it is nominal time.
+ if (nominalTime.compareTo(currentTime) > 0) {
+ LOG.info("[" + actionId + "]::ActionInputCheck:: nominal Time is newer than current time. Current="
+ + currentTime + ", nominal=" + nominalTime);
+ }
+ else {
+ coordAction.setStatus(CoordinatorAction.Status.READY);
+ // pass jobID to the CoordActionReadyXCommand
+ queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
+ }
}
}
@@ -307,10 +332,12 @@ public class CoordPushDependencyCheckXCo
@Override
protected void loadState() throws CommandException {
+ eagerLoadState();
}
@Override
protected void verifyPrecondition() throws CommandException, PreconditionException {
+ eagerVerifyPrecondition();
}
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java Fri Feb 15 22:26:25 2013
@@ -224,11 +224,6 @@ public class CoordELEvaluator {
// System.out.println("eDATA :"+ XmlUtils.prettyPrint(eData));
Date initInstance = DateUtils.parseDateOozieTZ(eDataset.getAttributeValue("initial-instance"));
ds.setInitInstance(initInstance);
- String name = eDataset.getAttributeValue("name");
- ds.setName(name);
- String uriTemplate = eDataset.getChild("uri-template", eData.getNamespace()).getTextTrim();
- ds.setUriTemplate(uriTemplate);
- // ds.setTimeUnit(TimeUnit.MINUTES);
if (eDataset.getAttributeValue("frequency") != null) {
int frequency = Integer.parseInt(eDataset.getAttributeValue("frequency"));
ds.setFrequency(frequency);
@@ -256,6 +251,13 @@ public class CoordELEvaluator {
else {
ds.setType("ASYNC");
}
+ String name = eDataset.getAttributeValue("name");
+ ds.setName(name);
+ // System.out.println(name + " VAL "+ eDataset.getChild("uri-template",
+ // eData.getNamespace()));
+ String uriTemplate = eDataset.getChild("uri-template", eData.getNamespace()).getTextTrim();
+ ds.setUriTemplate(uriTemplate);
+ // ds.setTimeUnit(TimeUnit.MINUTES);
return ds;
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java Fri Feb 15 22:26:25 2013
@@ -26,7 +26,7 @@ import java.util.TimeZone;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.dependency.URIContext;
+import org.apache.oozie.dependency.URIHandler.Context;
import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ELEvaluator;
@@ -301,42 +301,48 @@ public class CoordELFunctions {
String doneFlag = ds.getDoneFlag();
URIHandlerService uriService = Services.get().get(URIHandlerService.class);
URIHandler uriHandler = null;
- URIContext uriContext = null;
- while (instance >= checkedInstance) {
- ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
- String uriPath = uriEval.evaluate(uriTemplate, String.class);
- if (uriHandler == null) {
- URI uri = new URI(uriPath);
- uriHandler = uriService.getURIHandler(uri);
- uriContext = uriHandler.getURIContext(uri, conf, user);
- }
- String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
- if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
- if (available == endOffset) {
- LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
- resolved = true;
- resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
- resolvedURIPaths.append(uriPath);
- retVal = resolvedInstances.toString();
- eval.setVariable("resolved_path", resolvedURIPaths.toString());
- break;
- } else if (available >= startOffset) {
- LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
- resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(INSTANCE_SEPARATOR);
- resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
+ Context uriContext = null;
+ try {
+ while (instance >= checkedInstance) {
+ ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
+ String uriPath = uriEval.evaluate(uriTemplate, String.class);
+ if (uriHandler == null) {
+ URI uri = new URI(uriPath);
+ uriHandler = uriService.getURIHandler(uri);
+ uriContext = uriHandler.getContext(uri, conf, user);
+ }
+ String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
+ if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
+ if (available == endOffset) {
+ LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
+ resolved = true;
+ resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
+ resolvedURIPaths.append(uriPath);
+ retVal = resolvedInstances.toString();
+ eval.setVariable("resolved_path", resolvedURIPaths.toString());
+ break;
+ }
+ else if (available >= startOffset) {
+ LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
+ resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
+ INSTANCE_SEPARATOR);
+ resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
+ }
+ available++;
}
- available++;
+ // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
+ // -datasetFrequency);
+ nominalInstanceCal = (Calendar) initInstance.clone();
+ instCount[0]++;
+ nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
+ checkedInstance++;
+ // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
}
- // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
- // -datasetFrequency);
- nominalInstanceCal = (Calendar) initInstance.clone();
- instCount[0]++;
- nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
- checkedInstance++;
- // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
}
- if (uriContext != null) {
- uriContext.destroy();
+ finally {
+ if (uriContext != null) {
+ uriContext.destroy();
+ }
}
if (!resolved) {
// return unchanged future function with variable 'is_resolved'
@@ -981,43 +987,50 @@ public class CoordELFunctions {
String doneFlag = ds.getDoneFlag();
URIHandlerService uriService = Services.get().get(URIHandlerService.class);
URIHandler uriHandler = null;
- URIContext uriContext = null;
- while (nominalInstanceCal.compareTo(initInstance) >= 0) {
- ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
- String uriPath = uriEval.evaluate(uriTemplate, String.class);
- if (uriHandler == null) {
- URI uri = new URI(uriPath);
- uriHandler = uriService.getURIHandler(uri);
- uriContext = uriHandler.getURIContext(uri, conf, user);
- }
- String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
- if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
- XLog.getLog(CoordELFunctions.class).debug("Found latest(" + available + "): " + uriWithDoneFlag);
- if (available == startOffset) {
- LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
- resolved = true;
- resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
- resolvedURIPaths.append(uriPath);
- retVal = resolvedInstances.toString();
- eval.setVariable("resolved_path", resolvedURIPaths.toString());
- break;
- } else if (available <= endOffset) {
- LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
- resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(INSTANCE_SEPARATOR);
- resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
+ Context uriContext = null;
+ try {
+ while (nominalInstanceCal.compareTo(initInstance) >= 0) {
+ ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
+ String uriPath = uriEval.evaluate(uriTemplate, String.class);
+ if (uriHandler == null) {
+ URI uri = new URI(uriPath);
+ uriHandler = uriService.getURIHandler(uri);
+ uriContext = uriHandler.getContext(uri, conf, user);
}
+ String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
+ if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
+ XLog.getLog(CoordELFunctions.class)
+ .debug("Found latest(" + available + "): " + uriWithDoneFlag);
+ if (available == startOffset) {
+ LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
+ resolved = true;
+ resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
+ resolvedURIPaths.append(uriPath);
+ retVal = resolvedInstances.toString();
+ eval.setVariable("resolved_path", resolvedURIPaths.toString());
+ break;
+ }
+ else if (available <= endOffset) {
+ LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
+ resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
+ INSTANCE_SEPARATOR);
+ resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
+ }
- available--;
+ available--;
+ }
+ // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
+ // -datasetFrequency);
+ nominalInstanceCal = (Calendar) initInstance.clone();
+ instCount[0]--;
+ nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
+ // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
}
- // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
- // -datasetFrequency);
- nominalInstanceCal = (Calendar) initInstance.clone();
- instCount[0]--;
- nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
- // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
}
- if (uriContext != null) {
- uriContext.destroy();
+ finally {
+ if (uriContext != null) {
+ uriContext.destroy();
+ }
}
if (!resolved) {
// return unchanged latest function with variable 'is_resolved'
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java Fri Feb 15 22:26:25 2013
@@ -98,12 +98,13 @@ public class DependencyChecker {
URI uri = new URI(dependency);
URIHandler uriHandler = uriService.getURIHandler(uri);
- LOG.debug("Checking for the availability of [{0}] ", dependency);
+ LOG.debug("Checking for the availability of dependency [{0}] ", dependency);
if (uriHandler.exists(uri, actionConf, user)) {
LOG.debug("Dependency [{0}] is available", dependency);
availableDeps.add(dependency);
}
else {
+ LOG.debug("Dependency [{0}] is missing", dependency);
missingDeps.add(dependency);
if (stopOnFirstMissing) {
continueChecking = false;
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIContext.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIContext.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIContext.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIContext.java Fri Feb 15 22:26:25 2013
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.dependency;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-
-public class FSURIContext extends URIContext {
-
- private FileSystem fs;
-
- /**
- * Create a FSURIContext that can be used to access a filesystem URI
- *
- * @param conf Configuration to access the URI
- * @param user name of the user the URI should be accessed as
- * @param fs FileSystem to access
- */
- public FSURIContext(Configuration conf, String user, FileSystem fs) {
- super(conf, user);
- this.fs = fs;
- }
-
- /**
- * Get the FileSystem to access the URI
- * @return FileSystem to access the URI
- */
- public FileSystem getFileSystem() {
- return fs;
- }
-
-}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java Fri Feb 15 22:26:25 2013
@@ -77,15 +77,15 @@ public class FSURIHandler implements URI
}
@Override
- public URIContext getURIContext(URI uri, Configuration conf, String user) throws URIHandlerException {
+ public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException {
FileSystem fs = getFileSystem(uri, conf, user);
- return new FSURIContext(conf, user, fs);
+ return new FSContext(conf, user, fs);
}
@Override
- public boolean exists(URI uri, URIContext uriContext) throws URIHandlerException {
+ public boolean exists(URI uri, Context context) throws URIHandlerException {
try {
- FileSystem fs = ((FSURIContext) uriContext).getFileSystem();
+ FileSystem fs = ((FSContext) context).getFileSystem();
return fs.exists(getNormalizedPath(uri));
}
catch (IOException e) {
@@ -134,4 +134,29 @@ public class FSURIHandler implements URI
return service.createFileSystem(user, uri, fsConf);
}
+ static class FSContext extends Context {
+
+ private FileSystem fs;
+
+ /**
+ * Create a FSContext that can be used to access a filesystem URI
+ *
+ * @param conf Configuration to access the URI
+ * @param user name of the user the URI should be accessed as
+ * @param fs FileSystem to access
+ */
+ public FSContext(Configuration conf, String user, FileSystem fs) {
+ super(conf, user);
+ this.fs = fs;
+ }
+
+ /**
+ * Get the FileSystem to access the URI
+ * @return FileSystem to access the URI
+ */
+ public FileSystem getFileSystem() {
+ return fs;
+ }
+ }
+
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java Fri Feb 15 22:26:25 2013
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.dependency;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hcatalog.api.HCatClient;
-import org.apache.oozie.util.XLog;
-
-public class HCatURIContext extends URIContext {
-
- private static XLog LOG = XLog.getLog(HCatURIContext.class);
- private HCatClient hcatClient;
-
- /**
- * Create a HCatURIContext that can be used to access a hcat URI
- *
- * @param conf Configuration to access the URI
- * @param user name of the user the URI should be accessed as
- * @param hcatClient HCatClient to talk to hcatalog server
- */
- public HCatURIContext(Configuration conf, String user, HCatClient hcatClient) {
- super(conf, user);
- this.hcatClient = hcatClient;
- }
-
- /**
- * Get the HCatClient to talk to hcatalog server
- *
- * @return HCatClient to talk to hcatalog server
- */
- public HCatClient getHCatClient() {
- return hcatClient;
- }
-
- @Override
- public void destroy() {
- try {
- hcatClient.close();
- }
- catch (Exception ignore) {
- LOG.warn("Error closing hcat client", ignore);
- }
- }
-
-}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java Fri Feb 15 22:26:25 2013
@@ -39,7 +39,7 @@ import org.apache.oozie.action.hadoop.HC
import org.apache.oozie.action.hadoop.LauncherURIHandler;
import org.apache.oozie.jms.HCatMessageHandler;
import org.apache.oozie.service.HCatAccessorService;
-import org.apache.oozie.service.MetaDataAccessorException;
+import org.apache.oozie.service.HCatAccessorException;
import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIHandlerService;
@@ -115,7 +115,7 @@ public class HCatURIHandler implements U
hcatService.registerForNotification(hcatURI, topic, new HCatMessageHandler(uri.getAuthority()));
}
catch (HCatException e) {
- throw new MetaDataAccessorException(ErrorCode.E1504, e);
+ throw new HCatAccessorException(ErrorCode.E1501, e);
}
finally {
closeQuietly(client, true);
@@ -139,14 +139,14 @@ public class HCatURIHandler implements U
}
@Override
- public URIContext getURIContext(URI uri, Configuration conf, String user) throws URIHandlerException {
+ public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException {
HCatClient client = getHCatClient(uri, conf, user);
- return new HCatURIContext(conf, user, client);
+ return new HCatContext(conf, user, client);
}
@Override
- public boolean exists(URI uri, URIContext uriContext) throws URIHandlerException {
- HCatClient client = ((HCatURIContext) uriContext).getHCatClient();
+ public boolean exists(URI uri, Context context) throws URIHandlerException {
+ HCatClient client = ((HCatContext) context).getHCatClient();
return exists(uri, client, false);
}
@@ -177,7 +177,7 @@ public class HCatURIHandler implements U
}
- private HCatClient getHCatClient(URI uri, Configuration conf, String user) throws MetaDataAccessorException {
+ private HCatClient getHCatClient(URI uri, Configuration conf, String user) throws HCatAccessorException {
final HiveConf hiveConf = new HiveConf(conf, this.getClass());
String serverURI = getMetastoreConnectURI(uri);
if (!serverURI.equals("")) {
@@ -203,10 +203,10 @@ public class HCatURIHandler implements U
return HCatClient.create(hiveConf);
}
catch (HCatException e) {
- throw new MetaDataAccessorException(ErrorCode.E1504, e);
+ throw new HCatAccessorException(ErrorCode.E1501, e);
}
catch (IOException e) {
- throw new MetaDataAccessorException(ErrorCode.E1504, e);
+ throw new HCatAccessorException(ErrorCode.E1501, e);
}
}
@@ -225,7 +225,7 @@ public class HCatURIHandler implements U
return metastoreURI;
}
- private boolean exists(URI uri, HCatClient client, boolean closeClient) throws MetaDataAccessorException {
+ private boolean exists(URI uri, HCatClient client, boolean closeClient) throws HCatAccessorException {
try {
HCatURI hcatURI = new HCatURI(uri.toString());
List<HCatPartition> partitions = client.getPartitions(hcatURI.getDb(), hcatURI.getTable(),
@@ -233,13 +233,13 @@ public class HCatURIHandler implements U
return (partitions != null && !partitions.isEmpty());
}
catch (ConnectionFailureException e) {
- throw new MetaDataAccessorException(ErrorCode.E1504, e);
+ throw new HCatAccessorException(ErrorCode.E1501, e);
}
catch (HCatException e) {
- throw new MetaDataAccessorException(ErrorCode.E0902, e);
+ throw new HCatAccessorException(ErrorCode.E0902, e);
}
catch (URISyntaxException e) {
- throw new MetaDataAccessorException(ErrorCode.E0902, e);
+ throw new HCatAccessorException(ErrorCode.E0902, e);
}
finally {
closeQuietly(client, closeClient);
@@ -257,4 +257,42 @@ public class HCatURIHandler implements U
}
}
+ static class HCatContext extends Context {
+
+ private static XLog LOG = XLog.getLog(HCatContext.class);
+ private HCatClient hcatClient;
+
+ /**
+ * Create a HCatContext that can be used to access a hcat URI
+ *
+ * @param conf Configuration to access the URI
+ * @param user name of the user the URI should be accessed as
+ * @param hcatClient HCatClient to talk to hcatalog server
+ */
+ public HCatContext(Configuration conf, String user, HCatClient hcatClient) {
+ super(conf, user);
+ this.hcatClient = hcatClient;
+ }
+
+ /**
+ * Get the HCatClient to talk to hcatalog server
+ *
+ * @return HCatClient to talk to hcatalog server
+ */
+ public HCatClient getHCatClient() {
+ return hcatClient;
+ }
+
+ @Override
+ public void destroy() {
+ try {
+ hcatClient.close();
+ }
+ catch (Exception ignore) {
+ LOG.warn("Error closing hcat client", ignore);
+ }
+ }
+
+ }
+
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java Fri Feb 15 22:26:25 2013
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.dependency;
-
-import org.apache.hadoop.conf.Configuration;
-
-public abstract class URIContext {
-
- private Configuration conf;
- private String user;
-
- /**
- * Create a URIContext that can be used to access a URI
- * @param conf Configuration to access the URI
- * @param user name of the user the URI should be accessed as
- */
- public URIContext(Configuration conf, String user) {
- this.conf = conf;
- this.user = user;
- }
-
- /**
- * Get the Configuration to access the URI
- * @return Configuration to access the URI
- */
- public Configuration getConfiguration() {
- return conf;
- }
-
- /**
- * Get the name of the user the URI will be accessed as
- * @return the user name the URI will be accessed as
- */
- public String getUser() {
- return user;
- }
-
- /**
- * Destroy the URIContext
- */
- public void destroy() {
- }
-
-}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java Fri Feb 15 22:26:25 2013
@@ -97,7 +97,7 @@ public interface URIHandler {
public boolean unregisterFromNotification(URI uri, String actionID);
/**
- * Get the URIContext which can be used to access URI of the same scheme and
+ * Get the Context which can be used to access URI of the same scheme and
* host
*
* @param uri URI which identifies the scheme and host
@@ -108,20 +108,20 @@ public interface URIHandler {
*
* @throws URIHandlerException
*/
- public URIContext getURIContext(URI uri, Configuration conf, String user) throws URIHandlerException;
+ public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException;
/**
* Check if the dependency identified by the URI is available
*
* @param uri URI of the dependency
- * @param uriContext Context to access the URI
+ * @param context Context to access the URI
*
* @return <code>true</code> if the URI exists; <code>false</code> if the
* URI does not exist
*
* @throws URIHandlerException
*/
- public boolean exists(URI uri, URIContext uriContext) throws URIHandlerException;
+ public boolean exists(URI uri, Context context) throws URIHandlerException;
/**
* Check if the dependency identified by the URI is available
@@ -163,4 +163,44 @@ public interface URIHandler {
*/
public void destroy();
+ public static abstract class Context {
+
+ private Configuration conf;
+ private String user;
+
+ /**
+ * Create a Context that can be used to access a URI
+ *
+ * @param conf Configuration to access the URI
+ * @param user name of the user the URI should be accessed as
+ */
+ public Context(Configuration conf, String user) {
+ this.conf = conf;
+ this.user = user;
+ }
+
+ /**
+ * Get the Configuration to access the URI
+ * @return Configuration to access the URI
+ */
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ /**
+ * Get the name of the user the URI will be accessed as
+ * @return the user name the URI will be accessed as
+ */
+ public String getUser() {
+ return user;
+ }
+
+ /**
+ * Destroy the Context
+ */
+ public void destroy() {
+ }
+
+ }
+
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/cache/SimpleHCatDependencyCache.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/cache/SimpleHCatDependencyCache.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/cache/SimpleHCatDependencyCache.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/cache/SimpleHCatDependencyCache.java Fri Feb 15 22:26:25 2013
@@ -239,7 +239,12 @@ public class SimpleHCatDependencyCache i
@Override
public Collection<String> getAvailableDependencyURIs(String actionID) {
- return availableDeps.get(actionID);
+ Collection<String> available = availableDeps.get(actionID);
+ if (available != null) {
+ // Return a copy
+ available = new ArrayList<String>(available);
+ }
+ return available;
}
@Override
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java Fri Feb 15 22:26:25 2013
@@ -33,8 +33,8 @@ public class JMSExceptionListener implem
/**
* Create ExceptionLister for a JMS Connection
*
- * @param jmsConnectString The connect string specifiying parameters for JMS connection
- * @param connCtxt The actual connection on which this listener will be registered
+ * @param connInfo Information to connect to the JMS compliant messaging service
+ * @param connCtxt The actual connection on which this listener will be registered
*/
public JMSExceptionListener(JMSConnectionInfo connInfo, ConnectionContext connCtxt) {
this.connInfo = connInfo;
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorException.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorException.java?rev=1446792&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorException.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorException.java Fri Feb 15 22:26:25 2013
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.XException;
+import org.apache.oozie.dependency.URIHandlerException;
+
+public class HCatAccessorException extends URIHandlerException {
+
+ /**
+ * Create an HCatAccessorException exception from a XException.
+ *
+ * @param cause the XException cause.
+ */
+ public HCatAccessorException(XException cause) {
+ super(cause);
+ }
+
+ /**
+ * Create a HCatAccessorException exception.
+ *
+ * @param errorCode error code.
+ * @param params parameters for the error code message template.
+ */
+ public HCatAccessorException(ErrorCode errorCode, Object... params) {
+ super(errorCode, params);
+ }
+}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorService.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorService.java Fri Feb 15 22:26:25 2013
@@ -21,8 +21,11 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.jms.HCatMessageHandler;
import org.apache.oozie.jms.JMSConnectionInfo;
@@ -46,6 +49,10 @@ public class HCatAccessorService impleme
*/
private Map<String, JMSConnectionInfo> publisherJMSConnInfoMap;
/**
+ * List of non publishers(host:port)
+ */
+ private Set<String> nonJMSPublishers;
+ /**
* Mapping of table to the topic name for the table
*/
private Map<String, String> registeredTopicsMap;
@@ -56,6 +63,7 @@ public class HCatAccessorService impleme
conf = services.getConf();
this.jmsService = services.get(JMSAccessorService.class);
initializeMappingRules();
+ this.nonJMSPublishers = new HashSet<String>();
this.publisherJMSConnInfoMap = new HashMap<String, JMSConnectionInfo>();
this.registeredTopicsMap = new HashMap<String, String>();
}
@@ -88,11 +96,12 @@ public class HCatAccessorService impleme
* @return true if we have JMS connection information for the source URI, else false
*/
public boolean isKnownPublisher(URI sourceURI) {
- if (publisherJMSConnInfoMap.containsKey(sourceURI.getAuthority())) {
+ if (nonJMSPublishers.contains(sourceURI.getAuthority())) {
return true;
}
else {
- return getJMSConnectionInfo(sourceURI) != null;
+ JMSConnectionInfo connInfo = publisherJMSConnInfoMap.get(sourceURI.getAuthority());
+ return connInfo == null ? (getJMSConnectionInfo(sourceURI) != null) : true;
}
}
@@ -105,22 +114,33 @@ public class HCatAccessorService impleme
*/
public JMSConnectionInfo getJMSConnectionInfo(URI publisherURI) {
String publisherAuthority = publisherURI.getAuthority();
+ JMSConnectionInfo connInfo = null;
if (publisherJMSConnInfoMap.containsKey(publisherAuthority)) {
- return publisherJMSConnInfoMap.get(publisherAuthority);
+ connInfo = publisherJMSConnInfoMap.get(publisherAuthority);
}
else {
String schemeWithAuthority = publisherURI.getScheme() + "://" + publisherAuthority;
for (MappingRule mr : mappingRules) {
String jndiPropertiesString = mr.applyRule(schemeWithAuthority);
if (jndiPropertiesString != null) {
- JMSConnectionInfo connInfo = new JMSConnectionInfo(jndiPropertiesString);
+ connInfo = new JMSConnectionInfo(jndiPropertiesString);
publisherJMSConnInfoMap.put(publisherAuthority, connInfo);
- return connInfo;
+ LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority);
+ break;
}
}
- publisherJMSConnInfoMap.put(publisherAuthority, defaultJMSConnInfo);
- return defaultJMSConnInfo;
+ if (connInfo == null && defaultJMSConnInfo != null) {
+ connInfo = defaultJMSConnInfo;
+ publisherJMSConnInfoMap.put(publisherAuthority, defaultJMSConnInfo);
+ LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority);
+ }
+ else {
+ nonJMSPublishers.add(publisherAuthority);
+ LOG.info("Adding hcat server [{0}] to the list of non JMS publishers", schemeWithAuthority);
+ }
+
}
+ return connInfo;
}
/**
@@ -130,8 +150,7 @@ public class HCatAccessorService impleme
* @return true if registered to a JMS topic for the table in the given hcatURI
*/
public boolean isRegisteredForNotification(HCatURI hcatURI) {
- return registeredTopicsMap.containsKey(hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb()
- + DELIMITER + hcatURI.getTable());
+ return registeredTopicsMap.containsKey(getKeyForRegisteredTopicsMap(hcatURI));
}
/**
@@ -145,12 +164,11 @@ public class HCatAccessorService impleme
JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI());
jmsService.registerForNotification(connInfo, topic, msgHandler);
registeredTopicsMap.put(
- hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable(), topic);
+ getKeyForRegisteredTopicsMap(hcatURI), topic);
}
public void unregisterFromNotification(HCatURI hcatURI) {
- String topic = registeredTopicsMap.remove(hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb()
- + DELIMITER + hcatURI.getTable());
+ String topic = registeredTopicsMap.remove(getKeyForRegisteredTopicsMap(hcatURI));
if (topic != null) {
JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI());
jmsService.unregisterFromNotification(connInfo, topic);
@@ -171,6 +189,11 @@ public class HCatAccessorService impleme
}
}
+ private String getKeyForRegisteredTopicsMap(HCatURI hcatURI) {
+ return hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb()
+ + DELIMITER + hcatURI.getTable();
+ }
+
@Override
public void destroy() {
publisherJMSConnInfoMap.clear();
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java Fri Feb 15 22:26:25 2013
@@ -40,12 +40,12 @@ import org.apache.oozie.util.XLog;
import com.google.common.annotations.VisibleForTesting;
/**
- * This class will 1. Create/Manage JMS connections using user configured properties 2.
- * Create/Manage session for specific connection/topic. 3. Provide a way to create a subscriber and
- * publisher 4. Pure JMS complian (implementation independent but primarily tested against Apace
- * ActiveMQ) For connection property, it reads property from oozie-site.xml. Since it supports
- * multiple connections, each property will be grouped with fixed tag. the caller will use the tag
- * to accees the connection/session/subscriber/producer.
+ * This class will <ul>
+ * <li> Create/Manage JMS connections using user configured JNDI properties. </li>
+ * <li> Create/Manage session for specific connection/topic and reconnects on failures. </li>
+ * <li> Provide a way to create a subscriber and publisher </li>
+ * <li> Pure JMS compliant (implementation independent but primarily tested against Apache ActiveMQ). </li>
+ * </ul>
*/
public class JMSAccessorService implements Service {
public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSAccessorService.";
@@ -94,31 +94,30 @@ public class JMSAccessorService implemen
* @param msgHandler Handler which will process the messages received on the topic
*/
public void registerForNotification(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) {
- if (isTopicInRetryList(connInfo, topic)) {
- return;
- }
- if (isConnectionInRetryList(connInfo)) {
- queueTopicForRetry(connInfo, topic, msgHandler);
- return;
- }
- Map<String, MessageReceiver> topicsMap = getReceiversTopicsMap(connInfo);
- if (topicsMap.containsKey(topic)) {
- return;
- }
- synchronized (topicsMap) {
- if (!topicsMap.containsKey(topic)) {
- ConnectionContext connCtxt = createConnectionContext(connInfo);
- if (connCtxt == null) {
- queueTopicForRetry(connInfo, topic, msgHandler);
- return;
- }
- MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, msgHandler);
- if (receiver == null) {
- queueTopicForRetry(connInfo, topic, msgHandler);
- }
- else {
- LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo);
- topicsMap.put(topic, receiver);
+ if (!isTopicInRetryList(connInfo, topic)) {
+ if (isConnectionInRetryList(connInfo)) {
+ queueTopicForRetry(connInfo, topic, msgHandler);
+ }
+ else {
+ Map<String, MessageReceiver> topicsMap = getReceiversTopicsMap(connInfo);
+ if (!topicsMap.containsKey(topic)) {
+ synchronized (topicsMap) {
+ if (!topicsMap.containsKey(topic)) {
+ ConnectionContext connCtxt = createConnectionContext(connInfo);
+ if (connCtxt == null) {
+ queueTopicForRetry(connInfo, topic, msgHandler);
+ return;
+ }
+ MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, msgHandler);
+ if (receiver == null) {
+ queueTopicForRetry(connInfo, topic, msgHandler);
+ }
+ else {
+ LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo);
+ topicsMap.put(topic, receiver);
+ }
+ }
+ }
}
}
}
@@ -138,22 +137,26 @@ public class JMSAccessorService implemen
}
else {
Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
- MessageReceiver receiver = topicsMap.remove(topic);
- if (receiver != null) {
- try {
- receiver.getSession().close();
+ if (topicsMap != null) {
+ MessageReceiver receiver = null;
+ synchronized (topicsMap) {
+ receiver = topicsMap.remove(topic);
+ if (topicsMap.isEmpty()) {
+ receiversMap.remove(connInfo);
+ }
}
- catch (JMSException e) {
- LOG.warn("Unable to close session " + receiver.getSession(), e);
+ if (receiver != null) {
+ try {
+ receiver.getSession().close();
+ }
+ catch (JMSException e) {
+ LOG.warn("Unable to close session " + receiver.getSession(), e);
+ }
+ }
+ else {
+ LOG.warn("Received request to unregister from topic [{0}] on [{1}], but no matching session.",
+ topic, connInfo);
}
- }
- else {
- LOG.warn(
- "Received request to unregister from topic [{0}] on [{1}], but no matching session.",
- topic, connInfo);
- }
- if (topicsMap.isEmpty()) {
- receiversMap.remove(connInfo);
}
}
}
@@ -406,7 +409,6 @@ public class JMSAccessorService implemen
private static class ConnectionRetryInfo {
private int numAttempt;
private int nextDelay;
- private boolean retryConnection;
private Map<String, MessageHandler> retryTopicsMap;
public ConnectionRetryInfo(int numAttempt, int nextDelay) {
@@ -435,14 +437,6 @@ public class JMSAccessorService implemen
return retryTopicsMap;
}
- public boolean shouldRetryConnection() {
- return retryConnection;
- }
-
- public void setRetryConnection(boolean retryConnection) {
- this.retryConnection = retryConnection;
- }
-
}
public class JMSRetryRunnable implements Runnable {
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java Fri Feb 15 22:26:25 2013
@@ -138,8 +138,8 @@ public class PartitionDependencyManagerS
* @param dependencyURIs set of dependency URIs
* @return true if successful, else false
*/
- public void removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
- dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs);
+ public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
+ return dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs);
}
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/URIHandlerService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/URIHandlerService.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/URIHandlerService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/URIHandlerService.java Fri Feb 15 22:26:25 2013
@@ -64,7 +64,7 @@ public class URIHandlerService implement
String[] classes = conf.getStrings(URI_HANDLERS, FSURIHandler.class.getName());
for (String classname : classes) {
- Class<?> clazz = Class.forName(classname);
+ Class<?> clazz = Class.forName(classname.trim());
URIHandler uriHandler = (URIHandler) ReflectionUtils.newInstance(clazz, null);
uriHandler.init(conf);
for (String scheme : uriHandler.getSupportedSchemes()) {
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java Fri Feb 15 22:26:25 2013
@@ -20,7 +20,6 @@ package org.apache.oozie.util;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
@@ -199,26 +198,20 @@ public class HCatURI {
}
@Override
+ public int hashCode() {
+ return (uri == null) ? 0 : uri.hashCode();
+ }
+
+ @Override
public boolean equals(Object obj) {
- HCatURI uri = (HCatURI) obj;
- boolean equals = true;
- Map<String, String> p = this.getPartitionMap();
-
- if (this.getServer().equals(uri.getServer()) && this.db.equals(uri.getDb()) && this.table.equals(uri.getTable())
- && p.size() == uri.getPartitionMap().size()) {
- Iterator<Map.Entry<String, String>> it1 = uri.getPartitionMap().entrySet().iterator();
- while (it1.hasNext()) {
- Map.Entry<String, String> entry = it1.next();
- String key = entry.getKey();
- if (!(p.containsKey(key) && p.get(key).equals(entry.getValue()))) {
- equals = false;
- }
- }
- }
- else {
- equals = false;
- }
- return equals;
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ HCatURI other = (HCatURI) obj;
+ return uri.equals(other.uri);
}
/**
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/MappingRule.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/MappingRule.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/MappingRule.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/MappingRule.java Fri Feb 15 22:26:25 2013
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.oozie.util;
import java.util.regex.Matcher;
Modified: oozie/branches/hcat-intre/core/src/main/resources/oozie-log4j.properties
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/resources/oozie-log4j.properties?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/resources/oozie-log4j.properties (original)
+++ oozie/branches/hcat-intre/core/src/main/resources/oozie-log4j.properties Fri Feb 15 22:26:25 2013
@@ -40,6 +40,7 @@ log4j.logger.oozieaudit=NONE, none
log4j.logger.org.apache.oozie.local=DEBUG, test
log4j.logger.org.apache.oozie.client=DEBUG, test
log4j.logger.org.apache.oozie.test=DEBUG, test
+log4j.logger.org.apache.oozie.dependency=DEBUG, test
log4j.logger.org.apache.oozie.wf=DEBUG, test
log4j.logger.org.apache.oozie.action=DEBUG, test
log4j.logger.org.apache.oozie.command=DEBUG, test
Modified: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java (original)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java Fri Feb 15 22:26:25 2013
@@ -67,7 +67,7 @@ public class TestCoordActionUpdatePushMi
new CoordActionUpdatePushMissingDependency(actionId).call();
checkCoordAction(actionId, "", CoordinatorAction.Status.READY);
-
+ assertNull(pdms.getAvailableDependencyURIs(actionId));
}
@Test
@@ -100,13 +100,13 @@ public class TestCoordActionUpdatePushMi
pdms.partitionAvailable("hcat.server.com:5080", "mydb", "clicks",
getPartitionMap("src=search;datastamp=11;region=us"));
availableURIs = pdms.getAvailableDependencyURIs(actionId);
- assertEquals(2, availableURIs.size());
+ assertEquals(1, availableURIs.size());
assertTrue(availableURIs.contains(newHCatDependency1));
- assertTrue(availableURIs.contains(newHCatDependency2));
new CoordActionUpdatePushMissingDependency(actionId).call();
checkCoordAction(actionId, "", CoordinatorAction.Status.READY);
+ assertNull(pdms.getAvailableDependencyURIs(actionId));
}
Modified: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java (original)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java Fri Feb 15 22:26:25 2013
@@ -102,7 +102,7 @@ public class TestPartitionDependencyMana
assertTrue(pdms.getAvailableDependencyURIs(actionId3).contains(dep2.getURI().toString()));
assertTrue(pdms.getAvailableDependencyURIs(actionId3).contains(dep3.getURI().toString()));
- pdms.removeAvailableDependencyURIs(actionId3, pdms.getAvailableDependencyURIs(actionId3));
+ assertTrue(pdms.removeAvailableDependencyURIs(actionId3, pdms.getAvailableDependencyURIs(actionId3)));
assertNull(pdms.getAvailableDependencyURIs(actionId3));
}
Modified: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/util/TestHCatURI.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/util/TestHCatURI.java?rev=1446792&r1=1446791&r2=1446792&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/util/TestHCatURI.java (original)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/util/TestHCatURI.java Fri Feb 15 22:26:25 2013
@@ -19,7 +19,7 @@ package org.apache.oozie.util;
import static org.junit.Assert.*;
import java.net.URISyntaxException;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import org.junit.Test;
@@ -77,40 +77,27 @@ public class TestHCatURI {
@Test
public void testGetHCatUri() {
- Map<String, String> partitions = new HashMap<String, String>();
+ Map<String, String> partitions = new LinkedHashMap<String, String>();
partitions.put("datastamp", "12");
partitions.put("region", "us");
String hcatUri = HCatURI.getHCatURI("hcat", "hcat.server.com:5080", "mydb", "clicks", partitions);
- HCatURI uri1 = null;
- HCatURI uri2 = null;
- try {
- uri1 = new HCatURI(hcatUri);
- uri2 = new HCatURI("hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us");
- }
- catch (URISyntaxException e) {
- fail(e.getMessage());
- }
- assertTrue(uri1.equals(uri2));
+ assertEquals("hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us", hcatUri);
}
@Test
public void testEqualsPositive() {
HCatURI uri1 = null;
HCatURI uri2 = null;
- HCatURI uri3 = null;
try {
uri1 = new HCatURI("hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us;timestamp=1201");
uri2 = new HCatURI("hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us;timestamp=1201");
- uri3 = new HCatURI("hcat://hcat.server.com:5080/mydb/clicks/region=us;timestamp=1201;datastamp=12");
}
catch (URISyntaxException e) {
fail(e.getMessage());
}
- assertTrue(uri1.equals(uri2));
- assertTrue(uri2.equals(uri1));
- assertTrue(uri1.equals(uri3));
- assertTrue(uri3.equals(uri1));
+
+ assertEquals(uri1, uri2);
}
@Test