You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/02/25 22:42:09 UTC
svn commit: r1449911 [2/8] - in /oozie/trunk: ./
client/src/main/java/org/apache/oozie/cli/
client/src/main/java/org/apache/oozie/client/
client/src/main/java/org/apache/oozie/client/rest/
client/src/test/java/org/apache/oozie/client/rest/ core/ core/s...
Added: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.dependency.DependencyChecker;
+import org.apache.oozie.service.PartitionDependencyManagerService;
+import org.apache.oozie.service.Services;
+
+public class CoordActionUpdatePushMissingDependency extends CoordPushDependencyCheckXCommand {
+
+ public CoordActionUpdatePushMissingDependency(String actionId) {
+ super("coord_action_push_md", actionId);
+ }
+
+ @Override
+ protected Void execute() throws CommandException {
+ LOG.info("STARTED for Action id [{0}]", actionId);
+ String pushMissingDeps = coordAction.getPushMissingDependencies();
+ if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
+ LOG.info("Nothing to check. Empty push missing dependency");
+ }
+ else {
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ Collection<String> availDepList = pdms.getAvailableDependencyURIs(actionId);
+ if (availDepList == null || availDepList.size() == 0) {
+ LOG.info("There are no available dependencies for action ID: [{0}]", actionId);
+ if (isTimeout()) { // Poll and check as one last try
+ queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
+ }
+ }
+ else {
+ LOG.debug("Updating action ID [{0}] with available uris=[{1}] where missing uris=[{2}]", actionId,
+ availDepList.toString(), pushMissingDeps);
+
+ String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
+ List<String> stillMissingDepsList = new ArrayList<String>(Arrays.asList(missingDepsArray));
+ stillMissingDepsList.removeAll(availDepList);
+ boolean isChangeInDependency = true;
+ if (stillMissingDepsList.size() == 0) {
+ // All push-based dependencies are available
+ onAllPushDependenciesAvailable();
+ }
+ else {
+ if (stillMissingDepsList.size() == missingDepsArray.length) {
+ isChangeInDependency = false;
+ }
+ else {
+ String stillMissingDeps = DependencyChecker.dependenciesAsString(stillMissingDepsList);
+ coordAction.setPushMissingDependencies(stillMissingDeps);
+ }
+ if (isTimeout()) { // Poll and check as one last try
+ queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
+ }
+ }
+ updateCoordAction(coordAction, isChangeInDependency);
+ removeAvailableDependencies(pdms, availDepList);
+ LOG.info("ENDED for Action id [{0}]", actionId);
+ }
+ }
+ return null;
+ }
+
+ private void removeAvailableDependencies(PartitionDependencyManagerService pdms, Collection<String> availDepList) {
+ if (pdms.removeAvailableDependencyURIs(actionId, availDepList)) {
+ LOG.debug("Successfully removed uris [{0}] for actionId: [{1}] from available list",
+ availDepList.toString(), actionId);
+ }
+ else {
+ LOG.warn("Failed to remove uris [{0}] for actionId: [{1}] from available list", availDepList.toString(),
+ actionId);
+ }
+ }
+
+ @Override
+ public String getEntityKey() {
+ return actionId;
+ }
+
+}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java Mon Feb 25 21:42:07 2013
@@ -18,9 +18,12 @@
package org.apache.oozie.command.coord;
import java.io.StringReader;
+import java.net.URI;
import java.util.Calendar;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorActionBean;
@@ -33,7 +36,11 @@ import org.apache.oozie.coord.CoordUtils
import org.apache.oozie.coord.CoordinatorJobException;
import org.apache.oozie.coord.SyncCoordAction;
import org.apache.oozie.coord.TimeUnit;
+import org.apache.oozie.dependency.DependencyChecker;
+import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.dependency.URIHandler.DependencyType;
import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ELEvaluator;
@@ -47,11 +54,13 @@ public class CoordCommandUtils {
public static int FUTURE = 2;
public static int OFFSET = 3;
public static int UNEXPECTED = -1;
- public static final String RESOLVED_UNRESOLVED_SEPARATOR = ";";
+ public static final String RESOLVED_UNRESOLVED_SEPARATOR = "!!";
+ public static final String UNRESOLVED_INST_TAG = "unresolved-instances";
/**
* parse a function like coord:latest(n)/future() and return the 'n'.
* <p/>
+ *
* @param function
* @param event
* @param appInst
@@ -197,8 +206,8 @@ public class CoordCommandUtils {
if (startCal != null && endCal != null) {
List<Integer> expandedFreqs = CoordELFunctions.expandOffsetTimes(startCal, endCal, eval);
for (int i = expandedFreqs.size() - 1; i >= 0; i--) {
- String matInstance = materializeInstance(event, "${coord:offset(" + expandedFreqs.get(i) + ", \"MINUTE\")}",
- appInst, conf, eval);
+ String matInstance = materializeInstance(event, "${coord:offset(" + expandedFreqs.get(i)
+ + ", \"MINUTE\")}", appInst, conf, eval);
if (matInstance == null || matInstance.length() == 0) {
// Earlier than dataset's initial instance
break;
@@ -219,7 +228,8 @@ public class CoordCommandUtils {
if (funcType == CURRENT) {
// Everything could be resolved NOW. no latest() ELs
for (int i = endIndex; i >= startIndex; i--) {
- String matInstance = materializeInstance(event, "${coord:current(" + i + ")}", appInst, conf, eval);
+ String matInstance = materializeInstance(event, "${coord:current(" + i + ")}", appInst, conf,
+ eval);
if (matInstance == null || matInstance.length() == 0) {
// Earlier than dataset's initial instance
break;
@@ -273,28 +283,29 @@ public class CoordCommandUtils {
*
* @param event
* @param instances
- * @param dependencyList
* @throws Exception
*/
- public static void separateResolvedAndUnresolved(Element event, StringBuilder instances, StringBuffer dependencyList)
+ private static String separateResolvedAndUnresolved(Element event, StringBuilder instances)
throws Exception {
StringBuilder unresolvedInstances = new StringBuilder();
StringBuilder urisWithDoneFlag = new StringBuilder();
+ StringBuilder depList = new StringBuilder();
String uris = createEarlyURIs(event, instances.toString(), unresolvedInstances, urisWithDoneFlag);
if (uris.length() > 0) {
Element uriInstance = new Element("uris", event.getNamespace());
uriInstance.addContent(uris);
event.getContent().add(1, uriInstance);
- if (dependencyList.length() > 0) {
- dependencyList.append(CoordELFunctions.INSTANCE_SEPARATOR);
+ if (depList.length() > 0) {
+ depList.append(CoordELFunctions.INSTANCE_SEPARATOR);
}
- dependencyList.append(urisWithDoneFlag);
+ depList.append(urisWithDoneFlag);
}
if (unresolvedInstances.length() > 0) {
- Element elemInstance = new Element("unresolved-instances", event.getNamespace());
+ Element elemInstance = new Element(UNRESOLVED_INST_TAG, event.getNamespace());
elemInstance.addContent(unresolvedInstances.toString());
event.getContent().add(1, elemInstance);
}
+ return depList.toString();
}
/**
@@ -318,10 +329,10 @@ public class CoordCommandUtils {
Element doneFlagElement = event.getChild("dataset", event.getNamespace()).getChild("done-flag",
event.getNamespace());
- String doneFlag = CoordUtils.getDoneFlag(doneFlagElement);
+ URIHandlerService uriService = Services.get().get(URIHandlerService.class);
for (int i = 0; i < instanceList.length; i++) {
- if(instanceList[i].trim().length() == 0) {
+ if (instanceList[i].trim().length() == 0) {
continue;
}
int funcType = getFuncType(instanceList[i]);
@@ -340,11 +351,10 @@ public class CoordCommandUtils {
String uriPath = CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace())
.getChild("uri-template", event.getNamespace()).getTextTrim());
+ URIHandler uriHandler = uriService.getURIHandler(uriPath);
+ uriHandler.validate(uriPath);
uris.append(uriPath);
- if (doneFlag.length() > 0) {
- uriPath += "/" + doneFlag;
- }
- urisWithDoneFlag.append(uriPath);
+ urisWithDoneFlag.append(uriHandler.getURIWithDoneFlag(uriPath, CoordUtils.getDoneFlag(doneFlagElement)));
}
return uris.toString();
}
@@ -415,22 +425,20 @@ public class CoordCommandUtils {
appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone")));
appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration")));
- StringBuffer dependencyList = new StringBuffer();
+ Map<String, StringBuilder> dependencyMap = null;
Element inputList = eAction.getChild("input-events", eAction.getNamespace());
List<Element> dataInList = null;
if (inputList != null) {
dataInList = inputList.getChildren("data-in", eAction.getNamespace());
- materializeDataEvents(dataInList, appInst, conf, dependencyList);
+ dependencyMap = materializeDataEvents(dataInList, appInst, conf);
}
Element outputList = eAction.getChild("output-events", eAction.getNamespace());
List<Element> dataOutList = null;
if (outputList != null) {
dataOutList = outputList.getChildren("data-out", eAction.getNamespace());
- StringBuffer tmp = new StringBuffer();
- // no dependency checks
- materializeDataEvents(dataOutList, appInst, conf, tmp);
+ materializeDataEvents(dataOutList, appInst, conf);
}
eAction.removeAttribute("start");
@@ -439,8 +447,9 @@ public class CoordCommandUtils {
eAction.setAttribute("action-nominal-time", DateUtils.formatDateOozieTZ(nominalTime));
eAction.setAttribute("action-actual-time", DateUtils.formatDateOozieTZ(actualTime));
- boolean isSla = CoordCommandUtils.materializeSLA(eAction.getChild("action", eAction.getNamespace()).getChild(
- "info", eAction.getNamespace("sla")), nominalTime, conf);
+ boolean isSla = CoordCommandUtils.materializeSLA(
+ eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")),
+ nominalTime, conf);
// Setting up action bean
actionBean.setCreatedConf(XmlUtils.prettyPrint(conf).toString());
@@ -451,7 +460,16 @@ public class CoordCommandUtils {
actionBean.setLastModifiedTime(new Date());
actionBean.setStatus(CoordinatorAction.Status.WAITING);
actionBean.setActionNumber(instanceCount);
- actionBean.setMissingDependencies(dependencyList.toString());
+ if (dependencyMap != null) {
+ StringBuilder sbPull = dependencyMap.get(DependencyType.PULL.name());
+ if (sbPull != null) {
+ actionBean.setMissingDependencies(sbPull.toString());
+ }
+ StringBuilder sbPush = dependencyMap.get(DependencyType.PUSH.name());
+ if (sbPush != null) {
+ actionBean.setPushMissingDependencies(sbPush.toString());
+ }
+ }
actionBean.setNominalTime(nominalTime);
if (isSla == true) {
actionBean.setSlaXml(XmlUtils.prettyPrint(
@@ -469,14 +487,20 @@ public class CoordCommandUtils {
}
else {
String action = XmlUtils.prettyPrint(eAction).toString();
- CoordActionInputCheckXCommand coordActionInput = new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId());
StringBuilder actionXml = new StringBuilder(action);
- StringBuilder existList = new StringBuilder();
- StringBuilder nonExistList = new StringBuilder();
- StringBuilder nonResolvedList = new StringBuilder();
- getResolvedList(actionBean.getMissingDependencies(), nonExistList, nonResolvedList);
Configuration actionConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
- coordActionInput.checkInput(actionXml, existList, nonExistList, actionConf);
+ if (actionBean.getPushMissingDependencies() != null) {
+ DependencyChecker.checkForAvailability(actionBean.getPushMissingDependencies(), actionConf, true);
+ }
+ if (actionBean.getMissingDependencies() != null) {
+ CoordActionInputCheckXCommand coordActionInput = new CoordActionInputCheckXCommand(actionBean.getId(),
+ actionBean.getJobId());
+ StringBuilder existList = new StringBuilder();
+ StringBuilder nonExistList = new StringBuilder();
+ StringBuilder nonResolvedList = new StringBuilder();
+ getResolvedList(actionBean.getMissingDependencies(), nonExistList, nonResolvedList);
+ coordActionInput.checkInput(actionXml, existList, nonExistList, actionConf);
+ }
return actionXml.toString();
}
}
@@ -491,13 +515,18 @@ public class CoordCommandUtils {
* @param conf
* @throws Exception
*/
- public static void materializeDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf,
- StringBuffer dependencyList) throws Exception {
+ public static Map<String, StringBuilder> materializeDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf
+ ) throws Exception {
if (events == null) {
- return;
+ return null;
}
- StringBuffer unresolvedList = new StringBuffer();
+ StringBuilder unresolvedList = new StringBuilder();
+ Map<String, StringBuilder> dependencyMap = new HashMap<String, StringBuilder>();
+ URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+ StringBuilder pullMissingDep = null;
+ StringBuilder pushMissingDep = null;
+
for (Element event : events) {
StringBuilder instances = new StringBuilder();
ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf);
@@ -506,8 +535,24 @@ public class CoordCommandUtils {
// Handle start-instance and end-instance
resolveInstanceRange(event, instances, appInst, conf, eval);
// Separate out the unresolved instances
- separateResolvedAndUnresolved(event, instances, dependencyList);
- String tmpUnresolved = event.getChildTextTrim("unresolved-instances", event.getNamespace());
+ String resolvedList = separateResolvedAndUnresolved(event, instances);
+ if (!resolvedList.isEmpty()) {
+ Element uri = event.getChild("dataset", event.getNamespace()).getChild("uri-template",
+ event.getNamespace());
+ String uriTemplate = uri.getText();
+ URI baseURI = uriService.getAuthorityWithScheme(uriTemplate);
+ URIHandler handler = uriService.getURIHandler(baseURI);
+ if (handler.getDependencyType(baseURI).equals(DependencyType.PULL)) {
+ pullMissingDep = (pullMissingDep == null) ? new StringBuilder(resolvedList) : pullMissingDep.append(
+ CoordELFunctions.INSTANCE_SEPARATOR).append(resolvedList);
+ }
+ else {
+ pushMissingDep = (pushMissingDep == null) ? new StringBuilder(resolvedList) : pushMissingDep.append(
+ CoordELFunctions.INSTANCE_SEPARATOR).append(resolvedList);
+ }
+ }
+
+ String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INST_TAG, event.getNamespace());
if (tmpUnresolved != null) {
if (unresolvedList.length() > 0) {
unresolvedList.append(CoordELFunctions.INSTANCE_SEPARATOR);
@@ -516,10 +561,14 @@ public class CoordCommandUtils {
}
}
if (unresolvedList.length() > 0) {
- dependencyList.append(RESOLVED_UNRESOLVED_SEPARATOR);
- dependencyList.append(unresolvedList);
+ if (pullMissingDep == null) {
+ pullMissingDep = new StringBuilder();
+ }
+ pullMissingDep.append(RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedList);
}
- return;
+ dependencyMap.put(DependencyType.PULL.name(), pullMissingDep);
+ dependencyMap.put(DependencyType.PUSH.name(), pushMissingDep);
+ return dependencyMap;
}
/**
@@ -538,7 +587,7 @@ public class CoordCommandUtils {
}
else {
resolved.append(missDepList.substring(0, index));
- unresolved.append(missDepList.substring(index + 1));
+ unresolved.append(missDepList.substring(index + RESOLVED_UNRESOLVED_SEPARATOR.length()));
}
}
return resolved.toString();
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java Mon Feb 25 21:42:07 2013
@@ -32,6 +32,7 @@ import org.apache.oozie.SLAEventBean;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.SLAEvent.SlaAppType;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.MaterializeTransitionXCommand;
import org.apache.oozie.command.PreconditionException;
@@ -106,6 +107,16 @@ public class CoordMaterializeTransitionX
public void performWrites() throws CommandException {
try {
jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ // register the partition related dependencies of actions
+ for (JsonBean actionBean : insertList) {
+ if (actionBean instanceof CoordinatorActionBean) {
+ CoordinatorActionBean coordAction = (CoordinatorActionBean) actionBean;
+ if (coordAction.getPushMissingDependencies() != null) {
+ // TODO: Delay in catchup mode?
+ queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100);
+ }
+ }
+ }
}
catch (JPAExecutorException jex) {
throw new CommandException(jex);
@@ -329,6 +340,7 @@ public class CoordMaterializeTransitionX
if (!dryrun) {
storeToDB(actionBean, action); // Storing to table
+
}
else {
actionStrings.append("action for new instance");
Added: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.URI;
+import java.util.Date;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.XException;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.dependency.DependencyChecker;
+import org.apache.oozie.dependency.ActionDependency;
+import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Service;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
+import org.apache.oozie.util.LogUtils;
+import org.apache.oozie.util.StatusUtils;
+import org.apache.oozie.util.XConfiguration;
+
+public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> {
+ protected String actionId;
+ protected JPAService jpaService = null;
+ protected CoordinatorActionBean coordAction = null;
+ protected CoordinatorJobBean coordJob = null;
+
+ /**
+ * Property name of command re-queue interval for coordinator push check in
+ * milliseconds.
+ */
+ public static final String CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
+ + "coord.push.check.requeue.interval";
+ /**
+ * Default re-queue interval in ms. It is applied when no value defined in
+ * the oozie configuration.
+ */
+ private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 600000;
+ private boolean registerForNotification;
+
+ public CoordPushDependencyCheckXCommand(String actionId) {
+ this(actionId, false);
+ }
+
+ public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification) {
+ super("coord_push_dep_check", "coord_push_dep_check", 0);
+ this.actionId = actionId;
+ this.registerForNotification = registerForNotification;
+ }
+
+ protected CoordPushDependencyCheckXCommand(String actionName, String actionId) {
+ super(actionName, actionName, 0);
+ this.actionId = actionId;
+ }
+
+ @Override
+ protected Void execute() throws CommandException {
+ String pushMissingDeps = coordAction.getPushMissingDependencies();
+ if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
+ LOG.info("Nothing to check. Empty push missing dependency");
+ }
+ else {
+ LOG.info("Push missing dependencies for actionID [{0}] is [{1}] ", actionId, pushMissingDeps);
+
+ try {
+ Configuration actionConf = null;
+ try {
+ actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
+ }
+ catch (IOException e) {
+ throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
+ }
+
+ String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
+ // Check all dependencies during materialization to avoid registering in the cache.
+ // But check only first missing one afterwards similar to
+ // CoordActionInputCheckXCommand for efficiency. listPartitions is costly.
+ ActionDependency actionDep = DependencyChecker.checkForAvailability(missingDepsArray, actionConf,
+ !registerForNotification);
+
+ boolean isChangeInDependency = true;
+ boolean timeout = false;
+ if (actionDep.getMissingDependencies().size() == 0) {
+ // All push-based dependencies are available
+ onAllPushDependenciesAvailable();
+ }
+ else {
+ if (actionDep.getMissingDependencies().size() == missingDepsArray.length) {
+ isChangeInDependency = false;
+ }
+ else {
+ String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDep
+ .getMissingDependencies());
+ coordAction.setPushMissingDependencies(stillMissingDeps);
+ }
+ // Checking for timeout
+ timeout = isTimeout();
+ if (timeout) {
+ queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ }
+ else {
+ queue(new CoordPushDependencyCheckXCommand(coordAction.getId()),
+ getCoordPushCheckRequeueInterval());
+ }
+ }
+
+ updateCoordAction(coordAction, isChangeInDependency);
+ if (registerForNotification) {
+ registerForNotification(actionDep.getMissingDependencies(), actionConf);
+ }
+ else {
+ unregisterAvailableDependencies(actionDep);
+ }
+ if (timeout) {
+ unregisterMissingDependencies(actionDep.getMissingDependencies());
+ }
+ }
+ catch (Exception e) {
+ if (isTimeout()) {
+ queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ }
+ throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Return the re-queue interval for coord push dependency check
+ * @return
+ */
+ public long getCoordPushCheckRequeueInterval() {
+ long requeueInterval = Services.get().getConf().getLong(CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL,
+ DEFAULT_COMMAND_REQUEUE_INTERVAL);
+ return requeueInterval;
+ }
+
+ /**
+ * Returns true if timeout period has been reached
+ *
+ * @return true if it is time for timeout else false
+ */
+ protected boolean isTimeout() {
+ long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
+ .getCreatedTime().getTime()))
+ / (60 * 1000);
+ int timeOut = coordAction.getTimeOut();
+ return (timeOut >= 0) && (waitingTime > timeOut);
+ }
+
+ protected void onAllPushDependenciesAvailable() {
+ coordAction.setPushMissingDependencies("");
+ if (coordAction.getMissingDependencies() == null || coordAction.getMissingDependencies().length() == 0) {
+ Date nominalTime = coordAction.getNominalTime();
+ Date currentTime = new Date();
+ // The action should become READY only if current time > nominal time;
+ // CoordActionInputCheckXCommand will take care of moving it to READY when it is nominal time.
+ if (nominalTime.compareTo(currentTime) > 0) {
+ LOG.info("[" + actionId + "]::ActionInputCheck:: nominal Time is newer than current time. Current="
+ + currentTime + ", nominal=" + nominalTime);
+ }
+ else {
+ coordAction.setStatus(CoordinatorAction.Status.READY);
+ // pass jobID to the CoordActionReadyXCommand
+ queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
+ }
+ }
+ }
+
+ protected void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
+ throws CommandException {
+ coordAction.setLastModifiedTime(new Date());
+ if (jpaService != null) {
+ try {
+ if (isChangeInDependency) {
+ jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
+ }
+ else {
+ jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
+ }
+ }
+ catch (JPAExecutorException jex) {
+ throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
+ }
+ }
+ }
+
+ private void registerForNotification(List<String> missingDeps, Configuration actionConf) {
+ URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+ String user = actionConf.get(OozieClient.USER_NAME, OozieClient.USER_NAME);
+ for (String missingDep : missingDeps) {
+ try {
+ URI missingURI = new URI(missingDep);
+ URIHandler handler = uriService.getURIHandler(missingURI);
+ handler.registerForNotification(missingURI, actionConf, user, actionId);
+ LOG.debug("Registered uri [{0}] for actionId: [{1}] for notifications",
+ missingURI, actionId);
+ }
+ catch (Exception e) {
+ LOG.warn("Exception while registering uri for actionId: [{0}] for notifications", actionId, e);
+ }
+ }
+ }
+
+ private void unregisterAvailableDependencies(ActionDependency actionDependency) {
+ URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+ for (String availableDep : actionDependency.getAvailableDependencies()) {
+ try {
+ URI availableURI = new URI(availableDep);
+ URIHandler handler = uriService.getURIHandler(availableURI);
+ if (handler.unregisterFromNotification(availableURI, actionId)) {
+ LOG.debug("Successfully unregistered uri [{0}] for actionId: [{1}] from notifications",
+ availableURI, actionId);
+ }
+ else {
+ LOG.warn("Unable to unregister uri [{0}] for actionId: [{1}] from notifications", availableURI,
+ actionId);
+ }
+ }
+ catch (Exception e) {
+ LOG.warn("Exception while unregistering uri for actionId: [{0}] for notifications", actionId, e);
+ }
+ }
+ }
+
+ private void unregisterMissingDependencies(List<String> missingDeps) {
+ URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+ for (String missingDep : missingDeps) {
+ try {
+ URI missingURI = new URI(missingDep);
+ URIHandler handler = uriService.getURIHandler(missingURI);
+ if (handler.unregisterFromNotification(missingURI, actionId)) {
+ LOG.debug("Successfully unregistered uri [{0}] for actionId: [{1}] from notifications", missingURI,
+ actionId);
+ }
+ else {
+ LOG.warn("Unable to unregister uri [{0}] for actionId: [{1}] from notifications", missingURI,
+ actionId);
+ }
+ }
+ catch (Exception e) {
+ LOG.warn("Exception while registering uri for actionId: [{0}] for notifications", actionId, e);
+ }
+ }
+ }
+
+ @Override
+ public String getEntityKey() {
+ return coordAction.getJobId();
+ }
+
+ @Override
+ public String getKey(){
+ return getName() + "_" + actionId;
+ }
+
+ @Override
+ protected boolean isLockRequired() {
+ return true;
+ }
+
+ @Override
+ protected void eagerLoadState() throws CommandException {
+ try {
+ jpaService = Services.get().get(JPAService.class);
+
+ if (jpaService != null) {
+ coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
+ coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
+ LogUtils.setLogInfo(coordAction, logInfo);
+ }
+ else {
+ throw new CommandException(ErrorCode.E0610);
+ }
+ }
+ catch (XException ex) {
+ throw new CommandException(ex);
+ }
+ }
+
+ @Override
+ protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
+ if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
+ throw new PreconditionException(ErrorCode.E1100, "[" + actionId
+ + "]::CoordPushDependencyCheck:: Ignoring action. Should be in WAITING state, but state="
+ + coordAction.getStatus());
+ }
+
+ // if eligible to do action input check when running with backward
+ // support is true
+ if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
+ return;
+ }
+
+ if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR
+ && coordJob.getStatus() != Job.Status.PAUSED && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
+ throw new PreconditionException(ErrorCode.E1100, "[" + actionId
+ + "]::CoordPushDependencyCheck:: Ignoring action."
+ + " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
+ + coordJob.getStatus());
+ }
+ }
+
+ @Override
+ protected void loadState() throws CommandException {
+ eagerLoadState();
+ }
+
+ @Override
+ protected void verifyPrecondition() throws CommandException, PreconditionException {
+ eagerVerifyPrecondition();
+ }
+
+}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java Mon Feb 25 21:42:07 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.command.coord.CoordCommandUtils;
import org.apache.oozie.service.ELService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.DateUtils;
@@ -163,7 +164,7 @@ public class CoordELEvaluator {
}
else {
}
- if (data.getChild("unresolved-instances", data.getNamespace()) != null) {
+ if (data.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, data.getNamespace()) != null) {
e.setVariable(".datain." + data.getAttributeValue("name") + ".unresolved", "true"); // TODO:
// check
// null
@@ -180,7 +181,7 @@ public class CoordELEvaluator {
}
else {
}// TODO
- if (data.getChild("unresolved-instances", data.getNamespace()) != null) {
+ if (data.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, data.getNamespace()) != null) {
e.setVariable(".dataout." + data.getAttributeValue("name") + ".unresolved", "true"); // TODO:
// check
// null
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java Mon Feb 25 21:42:07 2013
@@ -17,7 +17,7 @@
*/
package org.apache.oozie.coord;
-import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
@@ -25,16 +25,15 @@ import java.util.List;
import java.util.TimeZone;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.dependency.URIHandler.Context;
+import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
-import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.Services;
-import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.URIHandlerService;
/**
* This class implements the EL function related to coordinator
@@ -300,37 +299,50 @@ public class CoordELFunctions {
String user = ParamChecker
.notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
String doneFlag = ds.getDoneFlag();
- while (instance >= checkedInstance) {
- ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
- String uriPath = uriEval.evaluate(uriTemplate, String.class);
- String pathWithDoneFlag = uriPath;
- if (doneFlag.length() > 0) {
- pathWithDoneFlag += "/" + doneFlag;
- }
- if (isPathAvailable(pathWithDoneFlag, user, null, conf)) {
- LOG.debug("Found future(" + available + "): " + pathWithDoneFlag);
- if (available == endOffset) {
- LOG.debug("Matched future(" + available + "): " + pathWithDoneFlag);
- resolved = true;
- resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
- resolvedURIPaths.append(uriPath);
- retVal = resolvedInstances.toString();
- eval.setVariable("resolved_path", resolvedURIPaths.toString());
- break;
- } else if (available >= startOffset) {
- LOG.debug("Matched future(" + available + "): " + pathWithDoneFlag);
- resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(INSTANCE_SEPARATOR);
- resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
+ URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+ URIHandler uriHandler = null;
+ Context uriContext = null;
+ try {
+ while (instance >= checkedInstance) {
+ ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
+ String uriPath = uriEval.evaluate(uriTemplate, String.class);
+ if (uriHandler == null) {
+ URI uri = new URI(uriPath);
+ uriHandler = uriService.getURIHandler(uri);
+ uriContext = uriHandler.getContext(uri, conf, user);
}
- available++;
+ String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
+ if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
+ if (available == endOffset) {
+ LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
+ resolved = true;
+ resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
+ resolvedURIPaths.append(uriPath);
+ retVal = resolvedInstances.toString();
+ eval.setVariable("resolved_path", resolvedURIPaths.toString());
+ break;
+ }
+ else if (available >= startOffset) {
+ LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
+ resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
+ INSTANCE_SEPARATOR);
+ resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
+ }
+ available++;
+ }
+ // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
+ // -datasetFrequency);
+ nominalInstanceCal = (Calendar) initInstance.clone();
+ instCount[0]++;
+ nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
+ checkedInstance++;
+ // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
+ }
+ }
+ finally {
+ if (uriContext != null) {
+ uriContext.destroy();
}
- // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
- // -datasetFrequency);
- nominalInstanceCal = (Calendar) initInstance.clone();
- instCount[0]++;
- nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
- checkedInstance++;
- // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
}
if (!resolved) {
// return unchanged future function with variable 'is_resolved'
@@ -973,37 +985,52 @@ public class CoordELFunctions {
String user = ParamChecker
.notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
String doneFlag = ds.getDoneFlag();
- while (nominalInstanceCal.compareTo(initInstance) >= 0) {
- ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
- String uriPath = uriEval.evaluate(uriTemplate, String.class);
- String pathWithDoneFlag = uriPath;
- if (doneFlag.length() > 0) {
- pathWithDoneFlag += "/" + doneFlag;
- }
- if (isPathAvailable(pathWithDoneFlag, user, null, conf)) {
- LOG.debug("Found latest(" + available + "): " + pathWithDoneFlag);
- if (available == startOffset) {
- LOG.debug("Matched latest(" + available + "): " + pathWithDoneFlag);
- resolved = true;
- resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
- resolvedURIPaths.append(uriPath);
- retVal = resolvedInstances.toString();
- eval.setVariable("resolved_path", resolvedURIPaths.toString());
- break;
- } else if (available <= endOffset) {
- LOG.debug("Matched latest(" + available + "): " + pathWithDoneFlag);
- resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(INSTANCE_SEPARATOR);
- resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
+ URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+ URIHandler uriHandler = null;
+ Context uriContext = null;
+ try {
+ while (nominalInstanceCal.compareTo(initInstance) >= 0) {
+ ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
+ String uriPath = uriEval.evaluate(uriTemplate, String.class);
+ if (uriHandler == null) {
+ URI uri = new URI(uriPath);
+ uriHandler = uriService.getURIHandler(uri);
+ uriContext = uriHandler.getContext(uri, conf, user);
}
+ String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
+ if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
+ XLog.getLog(CoordELFunctions.class)
+ .debug("Found latest(" + available + "): " + uriWithDoneFlag);
+ if (available == startOffset) {
+ LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
+ resolved = true;
+ resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
+ resolvedURIPaths.append(uriPath);
+ retVal = resolvedInstances.toString();
+ eval.setVariable("resolved_path", resolvedURIPaths.toString());
+ break;
+ }
+ else if (available <= endOffset) {
+ LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
+ resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
+ INSTANCE_SEPARATOR);
+ resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
+ }
- available--;
+ available--;
+ }
+ // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
+ // -datasetFrequency);
+ nominalInstanceCal = (Calendar) initInstance.clone();
+ instCount[0]--;
+ nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
+ // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
+ }
+ }
+ finally {
+ if (uriContext != null) {
+ uriContext.destroy();
}
- // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
- // -datasetFrequency);
- nominalInstanceCal = (Calendar) initInstance.clone();
- instCount[0]--;
- nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
- // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
}
if (!resolved) {
// return unchanged latest function with variable 'is_resolved'
@@ -1026,26 +1053,6 @@ public class CoordELFunctions {
return retVal;
}
- // TODO : Not an efficient way. In a loop environment, we could do something
- // outside the loop
- /**
- * Check whether a URI path exists
- *
- * @param sPath
- * @param conf
- * @return
- * @throws IOException
- */
-
- private static boolean isPathAvailable(String sPath, String user, String group, Configuration conf)
- throws IOException, HadoopAccessorException {
- // sPath += "/" + END_OF_OPERATION_INDICATOR_FILE;
- Path path = new Path(sPath);
- HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
- Configuration fsConf = has.createJobConf(path.toUri().getAuthority());
- return has.createFileSystem(user, path.toUri(), fsConf).exists(path);
- }
-
/**
* @param tm
* @return a new Evaluator to be used for URI-template evaluation
Added: oozie/trunk/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,427 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.coord;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.DagELFunctions;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
+import org.apache.oozie.util.ELEvaluator;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.XLog;
+
+/**
+ * This class implements the EL function for HCat datasets in coordinator
+ */
+
+public class HCatELFunctions {
+ private static XLog LOG = XLog.getLog(HCatELFunctions.class);
+ private static final Configuration EMPTY_CONF = new Configuration(true);
+
+ enum EventType {
+ input, output
+ }
+
+ /* Workflow Parameterization EL functions */
+
+ /**
+ * Return true if partitions exists or false if not.
+ *
+ * @param uri hcatalog partition uri.
+ * @return <code>true</code> if the uri exists, <code>false</code> if it does not.
+ * @throws Exception
+ */
+ public static boolean hcat_exists(String uri) throws Exception {
+ URI hcatURI = new URI(uri);
+ URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+ URIHandler handler = uriService.getURIHandler(hcatURI);
+ WorkflowJob workflow = DagELFunctions.getWorkflow();
+ String user = workflow.getUser();
+ return handler.exists(hcatURI, EMPTY_CONF, user);
+ }
+
+ /* Coord EL functions */
+
+ /**
+ * Echo the same EL function without evaluating anything
+ *
+ * @param dataInName
+ * @return the same EL function
+ */
+ public static String ph1_coord_databaseIn_echo(String dataName) {
+ // Checking if the dataIn is correct?
+ isValidDataEvent(dataName);
+ return echoUnResolved("databaseIn", "'" + dataName + "'");
+ }
+
+ public static String ph1_coord_databaseOut_echo(String dataName) {
+ // Checking if the dataOut is correct?
+ isValidDataEvent(dataName);
+ return echoUnResolved("databaseOut", "'" + dataName + "'");
+ }
+
+ public static String ph1_coord_tableIn_echo(String dataName) {
+ // Checking if the dataIn is correct?
+ isValidDataEvent(dataName);
+ return echoUnResolved("tableIn", "'" + dataName + "'");
+ }
+
+ public static String ph1_coord_tableOut_echo(String dataName) {
+ // Checking if the dataOut is correct?
+ isValidDataEvent(dataName);
+ return echoUnResolved("tableOut", "'" + dataName + "'");
+ }
+
+ public static String ph1_coord_dataInPartitionFilter_echo(String dataInName, String type) {
+ // Checking if the dataIn/dataOut is correct?
+ isValidDataEvent(dataInName);
+ return echoUnResolved("dataInPartitionFilter", "'" + dataInName + "', '" + type + "'");
+ }
+
+ public static String ph1_coord_dataInPartitionMin_echo(String dataInName, String partition) {
+ // Checking if the dataIn/dataOut is correct?
+ isValidDataEvent(dataInName);
+ return echoUnResolved("dataInPartitionMin", "'" + dataInName + "', '" + partition + "'");
+ }
+
+ public static String ph1_coord_dataInPartitionMax_echo(String dataInName, String partition) {
+ // Checking if the dataIn/dataOut is correct?
+ isValidDataEvent(dataInName);
+ return echoUnResolved("dataInPartitionMax", "'" + dataInName + "', '" + partition + "'");
+ }
+
+ public static String ph1_coord_dataOutPartitions_echo(String dataOutName) {
+ // Checking if the dataIn/dataOut is correct?
+ isValidDataEvent(dataOutName);
+ return echoUnResolved("dataOutPartitions", "'" + dataOutName + "'");
+ }
+
+ public static String ph1_coord_dataOutPartitionValue_echo(String dataOutName, String partition) {
+ // Checking if the dataIn/dataOut is correct?
+ isValidDataEvent(dataOutName);
+ return echoUnResolved("dataOutPartitionValue", "'" + dataOutName + "', '" + partition + "'");
+ }
+
+ /**
+ * Extract the hcat DB name from the URI-template associate with
+ * 'dataInName'. Caller needs to specify the EL-evaluator level variable
+ * 'oozie.coord.el.dataset.bean' with synchronous dataset object
+ * (SyncCoordDataset)
+ *
+ * @param dataInName
+ * @return DB name
+ */
+ public static String ph3_coord_databaseIn(String dataName) {
+ HCatURI hcatURI = getURIFromResolved(dataName, EventType.input);
+ if (hcatURI != null) {
+ return hcatURI.getDb();
+ }
+ else {
+ return "";
+ }
+ }
+
+ /**
+ * Extract the hcat DB name from the URI-template associate with
+ * 'dataOutName'. Caller needs to specify the EL-evaluator level variable
+ * 'oozie.coord.el.dataset.bean' with synchronous dataset object
+ * (SyncCoordDataset)
+ *
+ * @param dataOutName
+ * @return DB name
+ */
+ public static String ph3_coord_databaseOut(String dataName) {
+ HCatURI hcatURI = getURIFromResolved(dataName, EventType.output);
+ if (hcatURI != null) {
+ return hcatURI.getDb();
+ }
+ else {
+ return "";
+ }
+ }
+
+ /**
+ * Extract the hcat Table name from the URI-template associate with
+ * 'dataInName'. Caller needs to specify the EL-evaluator level variable
+ * 'oozie.coord.el.dataset.bean' with synchronous dataset object
+ * (SyncCoordDataset)
+ *
+ * @param dataInName
+ * @return Table name
+ */
+ public static String ph3_coord_tableIn(String dataName) {
+ HCatURI hcatURI = getURIFromResolved(dataName, EventType.input);
+ if (hcatURI != null) {
+ return hcatURI.getTable();
+ }
+ else {
+ return "";
+ }
+ }
+
+ /**
+ * Extract the hcat Table name from the URI-template associate with
+ * 'dataOutName'. Caller needs to specify the EL-evaluator level variable
+ * 'oozie.coord.el.dataset.bean' with synchronous dataset object
+ * (SyncCoordDataset)
+ *
+ * @param dataOutName
+ * @return Table name
+ */
+ public static String ph3_coord_tableOut(String dataName) {
+ HCatURI hcatURI = getURIFromResolved(dataName, EventType.output);
+ if (hcatURI != null) {
+ return hcatURI.getTable();
+ }
+ else {
+ return "";
+ }
+ }
+
+ /**
+ * Used to specify the HCat partition filter which is input dependency for workflow job.<p/> Look for two evaluator-level
+ * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
+ * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
+ * unresolved, this function will echo back the original function <p/> otherwise it sends the partition filter.
+ *
+ * @param dataInName : Datain name
+ * @param type : for action type - pig, MR or hive
+ */
+ public static String ph3_coord_dataInPartitionFilter(String dataInName, String type) {
+ ELEvaluator eval = ELEvaluator.getCurrent();
+ String uris = (String) eval.getVariable(".datain." + dataInName);
+ Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
+ if (unresolved != null && unresolved.booleanValue() == true) {
+ return "${coord:dataInPartitionFilter('" + dataInName + "', '" + type + "')}";
+ }
+ return createPartitionFilter(uris, type);
+ }
+
+ /**
+ * Used to specify the HCat partition's value defining output for workflow job.<p/> Look for two evaluator-level
+ * variables <p/> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p/> A defines the current list of
+ * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
+ * unresolved, this function will echo back the original function <p/> otherwise it sends the partition value.
+ *
+ * @param dataOutName : Dataout name
+ * @param partitionName : Specific partition name whose value is wanted
+ */
+ public static String ph3_coord_dataOutPartitionValue(String dataOutName, String partitionName) {
+ ELEvaluator eval = ELEvaluator.getCurrent();
+ String uri = (String) eval.getVariable(".dataout." + dataOutName);
+ Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved");
+ if (unresolved != null && unresolved.booleanValue() == true) {
+ return "${coord:dataOutPartitionValue('" + dataOutName + "', '" + partitionName + "')}";
+ }
+ try {
+ HCatURI hcatUri = new HCatURI(uri);
+ return hcatUri.getPartitionValue(partitionName);
+ }
+ catch(URISyntaxException urie) {
+ LOG.warn("Exception with uriTemplate [{0}]. Reason [{1}]: ", uri, urie);
+ throw new RuntimeException("HCat URI can't be parsed " + urie);
+ }
+ }
+
+ /**
+ * Used to specify the entire HCat partition defining output for workflow job.<p/> Look for two evaluator-level
+ * variables <p/> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p/> A defines the data-out
+ * HCat URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
+ * unresolved, this function will echo back the original function <p/> otherwise it sends the partition.
+ *
+ * @param dataOutName : DataOut name
+ */
+ public static String ph3_coord_dataOutPartitions(String dataOutName) {
+ ELEvaluator eval = ELEvaluator.getCurrent();
+ String uri = (String) eval.getVariable(".dataout." + dataOutName);
+ Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved");
+ if (unresolved != null && unresolved.booleanValue() == true) {
+ return "${coord:dataOutPartitions('" + dataOutName + "')}";
+ }
+ try {
+ return new HCatURI(uri).toPartitionString();
+ }
+ catch (URISyntaxException e) {
+ throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e);
+ }
+ }
+
+ /**
+ * Used to specify the MAXIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level
+ * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
+ * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
+ * unresolved, this function will echo back the original function <p/> otherwise it sends the max partition value.
+ *
+ * @param dataInName : Datain name
+ * @param partitionName : Specific partition name whose MAX value is wanted
+ */
+ public static String ph3_coord_dataInPartitionMin(String dataInName, String partitionName) {
+ ELEvaluator eval = ELEvaluator.getCurrent();
+ String uris = (String) eval.getVariable(".datain." + dataInName);
+ Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
+ if (unresolved != null && unresolved.booleanValue() == true) {
+ return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}";
+ }
+ String minPartition = null;
+ if (uris != null) {
+ String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
+ // get the partition values list and find minimum
+ try {
+ // initialize minValue with first partition value
+ minPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName);
+ if (minPartition == null || minPartition.isEmpty()) {
+ throw new RuntimeException("No value in data-in uri for partition key: " + partitionName);
+ }
+ for (int i = 1; i < uriList.length; i++) {
+ String value = new HCatURI(uriList[i]).getPartitionValue(partitionName);
+ if(value.compareTo(minPartition) < 0) { //sticking to string comparison since some numerical date
+ //values can also contain letters e.g. 20120101T0300Z (UTC)
+ minPartition = value;
+ }
+ }
+ }
+ catch(URISyntaxException urie) {
+ throw new RuntimeException("HCat URI can't be parsed " + urie);
+ }
+ }
+ else {
+ LOG.warn("URI is null");
+ return null;
+ }
+ return minPartition;
+ }
+
+ /**
+ * Used to specify the MINIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level
+ * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
+ * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
+ * unresolved, this function will echo back the original function <p/> otherwise it sends the min partition value.
+ *
+ * @param dataInName : Datain name
+ * @param partitionName : Specific partition name whose MIN value is wanted
+ */
+ public static String ph3_coord_dataInPartitionMax(String dataInName, String partitionName) {
+ ELEvaluator eval = ELEvaluator.getCurrent();
+ String uris = (String) eval.getVariable(".datain." + dataInName);
+ Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
+ if (unresolved != null && unresolved.booleanValue() == true) {
+ return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}";
+ }
+ String maxPartition = null;
+ if (uris != null) {
+ String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
+ // get the partition values list and find minimum
+ try {
+ // initialize minValue with first partition value
+ maxPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName);
+ if (maxPartition == null || maxPartition.isEmpty()) {
+ throw new RuntimeException("No value in data-in uri for partition key: " + partitionName);
+ }
+ for(int i = 1; i < uriList.length; i++) {
+ String value = new HCatURI(uriList[i]).getPartitionValue(partitionName);
+ if(value.compareTo(maxPartition) > 0) {
+ maxPartition = value;
+ }
+ }
+ }
+ catch(URISyntaxException urie) {
+ throw new RuntimeException("HCat URI can't be parsed " + urie);
+ }
+ }
+ else {
+ LOG.warn("URI is null");
+ return null;
+ }
+ return maxPartition;
+ }
+
+ private static String createPartitionFilter(String uris, String type) {
+ String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
+ StringBuilder filter = new StringBuilder("");
+ if (uriList.length > 0) {
+ for (String uri : uriList) {
+ if (filter.length() > 0) {
+ filter.append(" OR ");
+ }
+ try {
+ filter.append(new HCatURI(uri).toPartitionFilter(type));
+ }
+ catch (URISyntaxException e) {
+ throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e);
+ }
+ }
+ }
+ return filter.toString();
+ }
+
+ private static HCatURI getURIFromResolved(String dataInName, EventType type) {
+ StringBuilder uriTemplate = new StringBuilder();
+ ELEvaluator eval = ELEvaluator.getCurrent();
+ String uris;
+ if(type == EventType.input) {
+ uris = (String) eval.getVariable(".datain." + dataInName);
+ }
+ else { //type=output
+ uris = (String) eval.getVariable(".dataout." + dataInName);
+ }
+ if (uris != null) {
+ String[] uri = uris.split(CoordELFunctions.DIR_SEPARATOR, -1);
+ uriTemplate.append(uri[0]);
+ }
+ else {
+ LOG.warn("URI is NULL");
+ return null;
+ }
+ LOG.info("uriTemplate [{0}] ", uriTemplate);
+ HCatURI hcatURI;
+ try {
+ hcatURI = new HCatURI(uriTemplate.toString());
+ }
+ catch (URISyntaxException e) {
+ LOG.info("uriTemplate [{0}]. Reason [{1}]: ", uriTemplate, e);
+ throw new RuntimeException("HCat URI can't be parsed " + e);
+ }
+ return hcatURI;
+ }
+
+ private static boolean isValidDataEvent(String dataInName) {
+ ELEvaluator eval = ELEvaluator.getCurrent();
+ String val = (String) eval.getVariable("oozie.dataname." + dataInName);
+ if (val == null || (val.equals("data-in") == false && val.equals("data-out") == false)) {
+ LOG.error("dataset name " + dataInName + " is not valid. val :" + val);
+ throw new RuntimeException("data set name " + dataInName + " is not valid");
+ }
+ return true;
+ }
+
+ private static String echoUnResolved(String functionName, String n) {
+ return echoUnResolvedPre(functionName, n, "coord:");
+ }
+
+ private static String echoUnResolvedPre(String functionName, String n, String prefix) {
+ ELEvaluator eval = ELEvaluator.getCurrent();
+ eval.setVariable(".wrap", "true");
+ return prefix + functionName + "(" + n + ")"; // Unresolved
+ }
+
+}
\ No newline at end of file
Added: oozie/trunk/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.dependency;
+
+import java.util.List;
+
+public class ActionDependency {
+
+ private List<String> missingDependencies;
+ private List<String> availableDependencies;
+
+ ActionDependency(List<String> missingDependencies, List<String> availableDependencies) {
+ this.missingDependencies = missingDependencies;
+ this.availableDependencies = availableDependencies;
+ }
+
+ public List<String> getMissingDependencies() {
+ return missingDependencies;
+ }
+
+ public List<String> getAvailableDependencies() {
+ return availableDependencies;
+ }
+
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.dependency;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
+import org.apache.oozie.util.ParamChecker;
+import org.apache.oozie.util.XLog;
+
+public class DependencyChecker {
+
+ private static XLog LOG = XLog.getLog(DependencyChecker.class);
+
+ /**
+ * Return a string of missing dependencies concatenated by CoordELFunctions.INSTANCE_SEPARATOR
+ *
+ * @param missingDependencies list of missing dependencies
+ * @return missing dependencies as a string
+ */
+ public static String dependenciesAsString(List<String> missingDependencies) {
+ return StringUtils.join(missingDependencies, CoordELFunctions.INSTANCE_SEPARATOR);
+ }
+
+ /**
+ * Return a array of missing dependencies
+ *
+ * @param missingDependencies missing dependencies concatenated by
+ * CoordELFunctions.INSTANCE_SEPARATOR
+ * @return missing dependencies as a array
+ */
+ public static String[] dependenciesAsArray(String missingDependencies) {
+ return missingDependencies.split(CoordELFunctions.INSTANCE_SEPARATOR);
+ }
+
+ /**
+ * Get the currently missing and available dependencies after checking the list of known missing
+ * dependencies against the source.
+ *
+ * @param missingDependencies known missing dependencies
+ * @param actionConf Configuration for the action
+ * @param stopOnFirstMissing Does not continue check for the rest of list if there is a missing
+ * dependency
+ * @return ActionDependency which has the list of missing and available dependencies
+ * @throws CommandException
+ */
+ public static ActionDependency checkForAvailability(String missingDependencies, Configuration actionConf,
+ boolean stopOnFirstMissing) throws CommandException {
+ return checkForAvailability(dependenciesAsArray(missingDependencies), actionConf, stopOnFirstMissing);
+ }
+
+ /**
+ * Get the currently missing and available dependencies after checking the list of known missing
+ * dependencies against the source.
+ *
+ * @param missingDependencies known missing dependencies
+ * @param actionConf Configuration for the action
+ * @param stopOnFirstMissing Does not continue check for the rest of list if there is a missing
+ * dependency
+ * @return ActionDependency which has the list of missing and available dependencies
+ * @throws CommandException
+ */
+ public static ActionDependency checkForAvailability(String[] missingDependencies, Configuration actionConf,
+ boolean stopOnFirstMissing) throws CommandException {
+ String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
+ List<String> missingDeps = new ArrayList<String>();
+ List<String> availableDeps = new ArrayList<String>();
+ URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+ boolean continueChecking = true;
+ try {
+ for (int index = 0; index < missingDependencies.length; index++) {
+ if (continueChecking) {
+ String dependency = missingDependencies[index];
+
+ URI uri = new URI(dependency);
+ URIHandler uriHandler = uriService.getURIHandler(uri);
+ LOG.debug("Checking for the availability of dependency [{0}] ", dependency);
+ if (uriHandler.exists(uri, actionConf, user)) {
+ LOG.debug("Dependency [{0}] is available", dependency);
+ availableDeps.add(dependency);
+ }
+ else {
+ LOG.debug("Dependency [{0}] is missing", dependency);
+ missingDeps.add(dependency);
+ if (stopOnFirstMissing) {
+ continueChecking = false;
+ }
+ }
+
+ }
+ else {
+ missingDeps.add(missingDependencies[index]);
+ }
+ }
+ }
+ catch (URISyntaxException e) {
+ throw new CommandException(ErrorCode.E0906, e.getMessage(), e);
+ }
+ catch (URIHandlerException e) {
+ throw new CommandException(e);
+ }
+ return new ActionDependency(missingDeps, availableDeps);
+ }
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.dependency;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.hadoop.FSLauncherURIHandler;
+import org.apache.oozie.action.hadoop.LauncherURIHandler;
+import org.apache.oozie.service.HadoopAccessorException;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+
+public class FSURIHandler implements URIHandler {
+
+ private HadoopAccessorService service;
+ private Set<String> supportedSchemes;
+ private List<Class<?>> classesToShip;
+
+ @Override
+ public void init(Configuration conf) {
+ service = Services.get().get(HadoopAccessorService.class);
+ supportedSchemes = service.getSupportedSchemes();
+ classesToShip = new FSLauncherURIHandler().getClassesForLauncher();
+ }
+
+ @Override
+ public Set<String> getSupportedSchemes() {
+ return supportedSchemes;
+ }
+
+ @Override
+ public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
+ return FSLauncherURIHandler.class;
+ }
+
+ @Override
+ public List<Class<?>> getClassesForLauncher() {
+ return classesToShip;
+ }
+
+ @Override
+ public DependencyType getDependencyType(URI uri) throws URIHandlerException {
+ return DependencyType.PULL;
+ }
+
+ @Override
+ public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
+ throws URIHandlerException {
+ throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme());
+ }
+
+ @Override
+ public boolean unregisterFromNotification(URI uri, String actionID) {
+ throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme());
+ }
+
+ @Override
+ public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException {
+ FileSystem fs = getFileSystem(uri, conf, user);
+ return new FSContext(conf, user, fs);
+ }
+
+ @Override
+ public boolean exists(URI uri, Context context) throws URIHandlerException {
+ try {
+ FileSystem fs = ((FSContext) context).getFileSystem();
+ return fs.exists(getNormalizedPath(uri));
+ }
+ catch (IOException e) {
+ throw new HadoopAccessorException(ErrorCode.E0902, e);
+ }
+ }
+
+ @Override
+ public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
+ try {
+ FileSystem fs = getFileSystem(uri, conf, user);
+ return fs.exists(getNormalizedPath(uri));
+ }
+ catch (IOException e) {
+ throw new HadoopAccessorException(ErrorCode.E0902, e);
+ }
+ }
+
+ @Override
+ public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
+ if (doneFlag.length() > 0) {
+ uri += "/" + doneFlag;
+ }
+ return uri;
+ }
+
+ @Override
+ public void validate(String uri) throws URIHandlerException {
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+
+ private Path getNormalizedPath(URI uri) {
+ // Normalizes uri path replacing // with / in the path which users specify by mistake
+ return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
+ }
+
+ private FileSystem getFileSystem(URI uri, Configuration conf, String user) throws HadoopAccessorException {
+ if (user == null) {
+ throw new HadoopAccessorException(ErrorCode.E0902, "user has to be specified to access FileSystem");
+ }
+ Configuration fsConf = service.createJobConf(uri.getAuthority());
+ return service.createFileSystem(user, uri, fsConf);
+ }
+
+ static class FSContext extends Context {
+
+ private FileSystem fs;
+
+ /**
+ * Create a FSContext that can be used to access a filesystem URI
+ *
+ * @param conf Configuration to access the URI
+ * @param user name of the user the URI should be accessed as
+ * @param fs FileSystem to access
+ */
+ public FSContext(Configuration conf, String user, FileSystem fs) {
+ super(conf, user);
+ this.fs = fs;
+ }
+
+ /**
+ * Get the FileSystem to access the URI
+ * @return FileSystem to access the URI
+ */
+ public FileSystem getFileSystem() {
+ return fs;
+ }
+ }
+
+}