You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2012/12/27 20:46:07 UTC
svn commit: r1426301 [1/3] - in /oozie/branches/hcat-intre: ./
core/src/main/java/org/apache/oozie/action/hadoop/
core/src/main/java/org/apache/oozie/command/coord/
core/src/main/java/org/apache/oozie/coord/
core/src/main/java/org/apache/oozie/dependen...
Author: virag
Date: Thu Dec 27 19:46:06 2012
New Revision: 1426301
URL: http://svn.apache.org/viewvc?rev=1426301&view=rev
Log:
OOZIE-1125 Prepare actions for hcat (rohini via virag)
Added:
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/MiniHCatServer.java
oozie/branches/hcat-intre/resourcelib/webhcat-java-client-0.4.1.jar (with props)
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/FileSystemActions.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetaDataAccessorService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/URIHandlerService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/XLog.java
oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestFileSystemActions.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/coord/TestCoordCommandUtils.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/dependency/TestHCatURIHandler.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestHCatMessageHandler.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestMetaDataAccessorService.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XHCatTestCase.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XTestCase.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/util/TestHCatURI.java
oozie/branches/hcat-intre/release-log.txt
oozie/branches/hcat-intre/resourcelib/README
oozie/branches/hcat-intre/tests/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
oozie/branches/hcat-intre/tests/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigMain.java
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/FileSystemActions.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/FileSystemActions.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/FileSystemActions.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/FileSystemActions.java Thu Dec 27 19:46:06 2012
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.action.hadoop;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.w3c.dom.Node;
-/**
- * Class to perform file system operations specified in the prepare block of Workflow
- *
- */
-public class FileSystemActions {
- private static Collection<String> supportedFileSystems;
-
- public FileSystemActions(Collection<String> fileSystems) {
- supportedFileSystems = fileSystems;
- }
-
- /**
- * Method to execute the prepare actions based on the command
- *
- * @param n Child node of the prepare XML
- * @throws LauncherException
- */
- public void execute(Node n) throws LauncherException {
- String command = n.getNodeName();
- if (command.equals("delete")) {
- delete(new Path(n.getAttributes().getNamedItem("path").getNodeValue().trim()));
- } else if (command.equals("mkdir")) {
- mkdir(new Path(n.getAttributes().getNamedItem("path").getNodeValue().trim()));
- }
- }
-
- // Method to delete the specified file based on the path
- private void delete(Path path) throws LauncherException {
- try {
- validatePath(path, true);
- FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
- if (fs.exists(path)) {
- if (!fs.delete(path, true)) {
- String deleteFailed = "Deletion of path " + path.toString() + " failed.";
- System.out.println(deleteFailed);
- throw new LauncherException(deleteFailed);
- } else {
- System.out.println("Deletion of path " + path.toString() + " was successful.");
- }
- }
- } catch (IOException ex) {
- throw new LauncherException(ex.getMessage(), ex);
- }
-
- }
-
- // Method to create a directory based on the path
- private void mkdir(Path path) throws LauncherException {
- try {
- validatePath(path, true);
- FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
- if (!fs.exists(path)) {
- if (!fs.mkdirs(path)) {
- String mkdirFailed = "Creating directory at " + path + " failed.";
- System.out.println(mkdirFailed);
- throw new LauncherException(mkdirFailed);
- } else {
- System.out.println("Creating directory at path " + path + " was successful.");
- }
- }
- } catch (IOException ex) {
- throw new LauncherException(ex.getMessage(), ex);
- }
- }
-
- // Method to validate the path provided for the prepare action
- private void validatePath(Path path, boolean withScheme) throws LauncherException {
- String scheme = path.toUri().getScheme();
- if (withScheme) {
- if (scheme == null) {
- String nullScheme = "Scheme of the path " + path + " is null";
- System.out.println(nullScheme);
- throw new LauncherException(nullScheme);
- } else if (supportedFileSystems.size() != 1 || !supportedFileSystems.iterator().next().equals("*")) {
- if (!supportedFileSystems.contains(scheme.toLowerCase())) {
- String unsupportedScheme = "Scheme of '" + path + "' is not supported.";
- System.out.println(unsupportedScheme);
- throw new LauncherException(unsupportedScheme);
- }
- }
- } else if (scheme != null) {
- String notNullScheme = "Scheme of the path " + path + " is not null as specified.";
- System.out.println(notNullScheme);
- throw new LauncherException(notNullScheme);
- }
- }
-}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java Thu Dec 27 19:46:06 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.oozie.util.XLog;
@@ -52,7 +53,8 @@ public class HCatCredentialHelper {
XLog.getLog(getClass()).debug(
"HCatCredentialHelper: set: User name for which token will be asked from HCat: "
+ launcherJobConf.get(USER_NAME));
- String tokenStrForm = client.getDelegationToken(launcherJobConf.get(USER_NAME));
+ String tokenStrForm = client.getDelegationToken(launcherJobConf.get(USER_NAME), UserGroupInformation
+ .getLoginUser().getShortUserName());
Token<DelegationTokenIdentifier> hcatToken = new Token<DelegationTokenIdentifier>();
hcatToken.decodeFromUrlString(tokenStrForm);
launcherJobConf.getCredentials().addToken(new Text("HCat Token"), hcatToken);
@@ -77,7 +79,7 @@ public class HCatCredentialHelper {
hiveConf = new HiveConf();
XLog.getLog(getClass()).debug("getHCatClient: Principal: " + principal + " Server: " + server);
// specified a thrift url
-
+
hiveConf.set(HIVE_METASTORE_SASL_ENABLED, "true");
hiveConf.set(HIVE_METASTORE_KERBEROS_PRINCIPAL, principal);
hiveConf.set(HIVE_METASTORE_LOCAL, "false");
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java Thu Dec 27 19:46:06 2012
@@ -62,6 +62,7 @@ import org.apache.oozie.client.WorkflowA
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.servlet.CallbackServlet;
import org.apache.oozie.util.ELEvaluator;
@@ -128,8 +129,8 @@ public class JavaActionExecutor extends
classes.add(LauncherSecurityManager.class);
classes.add(LauncherException.class);
classes.add(LauncherMainException.class);
- classes.add(FileSystemActions.class);
classes.add(PrepareActionsDriver.class);
+ classes.addAll(Services.get().get(URIHandlerService.class).getURIHandlerClassesToShip());
classes.add(ActionStats.class);
classes.add(ActionType.class);
return classes;
@@ -575,8 +576,7 @@ public class JavaActionExecutor extends
prepareXML);
LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
- LauncherMapper.setupSupportedFileSystems(
- launcherJobConf, Services.get().getConf().get(HadoopAccessorService.SUPPORTED_FILESYSTEMS));
+ LauncherMapper.setupURIServiceConf(launcherJobConf);
LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
LauncherMapper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java Thu Dec 27 19:46:06 2012
@@ -33,6 +33,7 @@ import java.lang.reflect.Method;
import java.net.URI;
import java.security.Permission;
import java.text.MessageFormat;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.StringTokenizer;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -50,12 +51,12 @@ import org.apache.hadoop.mapred.RunningJ
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.util.XLog;
public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable {
public static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class";
- public static final String CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS = "oozie.launcher.action.supported.filesystems";
public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = "oozie.action.max.output.data";
@@ -149,8 +150,10 @@ public class LauncherMapper<K1, V1, K2,
launcherConf.set(CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
}
- public static void setupSupportedFileSystems(Configuration launcherConf, String supportedFileSystems) {
- launcherConf.set(CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS, supportedFileSystems);
+ public static void setupURIServiceConf(Configuration launcherConf) {
+ for(Entry<String, String> entry : Services.get().get(URIHandlerService.class).getURIHandlerServiceConfig()) {
+ launcherConf.set(entry.getKey(), entry.getValue());
+ }
}
public static void setupMainArguments(Configuration launcherConf, String[] args) {
@@ -648,8 +651,7 @@ public class LauncherMapper<K1, V1, K2,
String prepareXML = getJobConf().get(ACTION_PREPARE_XML);
if (prepareXML != null) {
if (!prepareXML.equals("")) {
- PrepareActionsDriver.doOperations(
- getJobConf().getStringCollection(CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS), prepareXML);
+ PrepareActionsDriver.doOperations(prepareXML, getJobConf());
} else {
System.out.println("There are no prepare actions to execute.");
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java Thu Dec 27 19:46:06 2012
@@ -20,12 +20,16 @@ package org.apache.oozie.action.hadoop;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.Collection;
+import java.net.URI;
+import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.service.URIAccessorException;
+import org.apache.oozie.service.URIHandlerService;
import org.xml.sax.SAXException;
import org.w3c.dom.Document;
+import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import javax.xml.parsers.DocumentBuilderFactory;
@@ -38,33 +42,37 @@ import javax.xml.parsers.ParserConfigura
*/
public class PrepareActionsDriver {
+ public static enum PREPARE_ACTION {
+ mkdir,
+ delete;
+ };
+
/**
* Method to parse the prepare XML and execute the corresponding prepare actions
*
* @param prepareXML Prepare XML block in string format
* @throws LauncherException
*/
- static void doOperations(Collection<String> supportedFileSystems, String prepareXML) throws LauncherException {
+ static void doOperations(String prepareXML, Configuration conf) throws LauncherException {
try {
Document doc = getDocumentFromXML(prepareXML);
doc.getDocumentElement().normalize();
// Get the list of child nodes, basically, each one corresponding to a separate action
NodeList nl = doc.getDocumentElement().getChildNodes();
- FileSystemActions fsActions = new FileSystemActions(supportedFileSystems);
+ URIHandlerService service = new URIHandlerService();
+ service.init(conf, false);
for (int i = 0; i < nl.getLength(); ++i) {
- String commandType = "";
- /* Logic to find the command type goes here
- commandType = ..........;
- */
- // As of now, the available prepare action is of type hdfs. Hence, assigning the value directly
- commandType = "hdfs";
- if (commandType.equalsIgnoreCase("hdfs")) {
- fsActions.execute(nl.item(i));
- } /*else if(commandType.equalsIgnoreCase("hcat")) { //Other command types go here
- hCatActions.execute(nl.item(i));
- }*/
+ Node n = nl.item(i);
+ String operation = n.getNodeName();
+ if (n.getAttributes() == null || n.getAttributes().getNamedItem("path") == null) {
+ continue;
+ }
+ String path = n.getAttributes().getNamedItem("path").getNodeValue().trim();
+ URI uri = new URI(path);
+ URIHandler handler = service.getURIHandler(uri, true);
+ execute(operation, uri, handler, conf);
}
} catch (IOException ioe) {
throw new LauncherException(ioe.getMessage(), ioe);
@@ -72,6 +80,29 @@ public class PrepareActionsDriver {
throw new LauncherException(saxe.getMessage(), saxe);
} catch (ParserConfigurationException pce) {
throw new LauncherException(pce.getMessage(), pce);
+ } catch (URISyntaxException use) {
+ throw new LauncherException(use.getMessage(), use);
+ } catch (URIAccessorException uae) {
+ throw new LauncherException(uae.getMessage(), uae);
+ } catch (ClassNotFoundException cnfe) {
+ throw new LauncherException(cnfe.getMessage(), cnfe);
+ }
+ }
+
+ /**
+ * Method to execute the prepare actions based on the command
+ *
+ * @param n Child node of the prepare XML
+ * @throws LauncherException
+ * @throws URIAccessorException
+ */
+ private static void execute(String operation, URI uri, URIHandler handler, Configuration conf)
+ throws URIAccessorException {
+ if (operation.equals(PREPARE_ACTION.delete.name())) {
+ handler.delete(uri, conf, null);
+ }
+ else if (operation.equals(PREPARE_ACTION.mkdir.name())) {
+ handler.create(uri, conf, null);
}
}
@@ -83,4 +114,5 @@ public class PrepareActionsDriver {
InputStream is = new ByteArrayInputStream(prepareXML.getBytes("UTF-8"));
return docBuilder.parse(is);
}
-}
+
+}
\ No newline at end of file
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java Thu Dec 27 19:46:06 2012
@@ -55,7 +55,7 @@ public class CoordCommandUtils {
public static int FUTURE = 2;
public static int OFFSET = 3;
public static int UNEXPECTED = -1;
- public static final String RESOLVED_UNRESOLVED_SEPARATOR = ";";
+ public static final String RESOLVED_UNRESOLVED_SEPARATOR = "!!";
public static final String UNRESOLVED_INST_TAG = "unresolved-instances";
/**
@@ -576,7 +576,7 @@ public class CoordCommandUtils {
}
else {
resolved.append(missDepList.substring(0, index));
- unresolved.append(missDepList.substring(index + 1));
+ unresolved.append(missDepList.substring(index + RESOLVED_UNRESOLVED_SEPARATOR.length()));
}
}
return resolved.toString();
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java Thu Dec 27 19:46:06 2012
@@ -335,6 +335,9 @@ public class CoordELFunctions {
checkedInstance++;
// DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
}
+ if (uriContext != null) {
+ uriContext.dispose();
+ }
if (!resolved) {
// return unchanged future function with variable 'is_resolved'
// to 'false'
@@ -1013,6 +1016,9 @@ public class CoordELFunctions {
nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
// DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
}
+ if (uriContext != null) {
+ uriContext.dispose();
+ }
if (!resolved) {
// return unchanged latest function with variable 'is_resolved'
// to 'false'
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java Thu Dec 27 19:46:06 2012
@@ -19,37 +19,68 @@ package org.apache.oozie.dependency;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.coord.CoordUtils;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIAccessorException;
+import org.apache.oozie.service.URIHandlerService;
+import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.jdom.Element;
public class FSURIHandler extends URIHandler {
private static XLog LOG = XLog.getLog(FSURIHandler.class);
+ private boolean isFrontEnd;
private HadoopAccessorService service;
+ private Set<String> supportedSchemes;
+ private List<Class<?>> classesToShip;
public FSURIHandler() {
- service = Services.get().get(HadoopAccessorService.class);
+ this.classesToShip = new ArrayList<Class<?>>();
+ classesToShip.add(FSURIHandler.class);
+ classesToShip.add(FSURIContext.class);
+ classesToShip.add(HadoopAccessorService.class);
+ classesToShip.add(HadoopAccessorException.class);
+ classesToShip.add(XConfiguration.class); //Not sure why it fails in init with CNFE for this.
}
@Override
- public void init(Configuration conf) {
-
+ public void init(Configuration conf, boolean isFrontEnd) {
+ this.isFrontEnd = isFrontEnd;
+ if (isFrontEnd) {
+ service = Services.get().get(HadoopAccessorService.class);
+ supportedSchemes = service.getSupportedSchemes();
+ }
+ if (supportedSchemes == null) {
+ supportedSchemes = new HashSet<String>();
+ String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX
+ + this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX,
+ HadoopAccessorService.DEFAULT_SUPPORTED_SCHEMES);
+ supportedSchemes.addAll(Arrays.asList(schemes));
+ }
}
@Override
public Set<String> getSupportedSchemes() {
- return service.getSupportedSchemes();
+ return supportedSchemes;
+ }
+
+ public Collection<Class<?>> getClassesToShip() {
+ return classesToShip;
}
@Override
@@ -129,8 +160,25 @@ public class FSURIHandler extends URIHan
}
private FileSystem getFileSystem(URI uri, Configuration conf, String user) throws HadoopAccessorException {
- Configuration fsConf = service.createJobConf(uri.getAuthority());
- return service.createFileSystem(user, uri, fsConf);
+ if (isFrontEnd) {
+ if (user == null) {
+ throw new HadoopAccessorException(ErrorCode.E0902, "user has to be specified to access FileSystem");
+ }
+ Configuration fsConf = service.createJobConf(uri.getAuthority());
+ return service.createFileSystem(user, uri, fsConf);
+ }
+ else {
+ try {
+ if (user != null && !user.equals(UserGroupInformation.getLoginUser().getShortUserName())) {
+ throw new HadoopAccessorException(ErrorCode.E0902,
+ "Cannot access FileSystem as a different user in backend");
+ }
+ return FileSystem.get(uri, conf);
+ }
+ catch (IOException e) {
+ throw new HadoopAccessorException(ErrorCode.E0902, e);
+ }
+ }
}
private boolean create(FileSystem fs, URI uri) throws URIAccessorException {
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java Thu Dec 27 19:46:06 2012
@@ -33,4 +33,12 @@ public class HCatURIContext extends URIC
return hcatClient;
}
+ public void dispose() {
+ try {
+ hcatClient.close();
+ }
+ catch (Exception ignore) {
+ }
+ }
+
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java Thu Dec 27 19:46:06 2012
@@ -17,28 +17,31 @@
*/
package org.apache.oozie.dependency;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.HashMap;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hcatalog.api.ConnectionFailureException;
import org.apache.hcatalog.api.HCatClient;
import org.apache.hcatalog.api.HCatPartition;
-import org.apache.hcatalog.api.HCatTable;
import org.apache.hcatalog.common.HCatException;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.service.JMSAccessorService;
import org.apache.oozie.service.MetaDataAccessorException;
-import org.apache.oozie.service.MetaDataAccessorService;
-import org.apache.oozie.service.MetadataServiceException;
-import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIAccessorException;
+import org.apache.oozie.service.URIHandlerService;
+import org.apache.oozie.service.UserGroupInformationService;
import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.XLog;
import org.jdom.Element;
@@ -46,20 +49,39 @@ import org.jdom.Element;
public class HCatURIHandler extends URIHandler {
private static XLog LOG = XLog.getLog(HCatURIHandler.class);
- private MetaDataAccessorService service;
+ private boolean isFrontEnd;
+ private Set<String> supportedSchemes;
+ private UserGroupInformationService ugiService;
+ private List<Class<?>> classesToShip;
public HCatURIHandler() {
- service = Services.get().get(MetaDataAccessorService.class);
+ this.classesToShip = new ArrayList<Class<?>>();
+ classesToShip.add(HCatURIHandler.class);
+ classesToShip.add(HCatURIContext.class);
+ classesToShip.add(HCatURI.class);
+ classesToShip.add(MetaDataAccessorException.class);
+ classesToShip.add(UserGroupInformationService.class);
}
@Override
- public void init(Configuration conf) {
-
+ public void init(Configuration conf, boolean isFrontEnd) {
+ this.isFrontEnd = isFrontEnd;
+ if (isFrontEnd) {
+ ugiService = Services.get().get(UserGroupInformationService.class);
+ }
+ supportedSchemes = new HashSet<String>();
+ String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX
+ + this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "hcat");
+ supportedSchemes.addAll(Arrays.asList(schemes));
}
@Override
public Set<String> getSupportedSchemes() {
- return service.getSupportedSchemes();
+ return supportedSchemes;
+ }
+
+ public Collection<Class<?>> getClassesToShip() {
+ return classesToShip;
}
@Override
@@ -76,7 +98,7 @@ public class HCatURIHandler extends URIH
@Override
public URIContext getURIContext(URI uri, Configuration conf, String user) throws URIAccessorException {
- HCatClient client = service.getHCatClient(uri, conf, user);
+ HCatClient client = getHCatClient(uri, conf, user);
return new HCatURIContext(conf, user, client);
}
@@ -89,19 +111,19 @@ public class HCatURIHandler extends URIH
@Override
public boolean exists(URI uri, URIContext uriContext) throws URIAccessorException {
HCatClient client = ((HCatURIContext) uriContext).getHCatClient();
- return exists(uri, client);
+ return exists(uri, client, false);
}
@Override
public boolean exists(URI uri, Configuration conf, String user) throws URIAccessorException {
- HCatClient client = service.getHCatClient(uri, conf, user);
- return exists(uri, client);
+ HCatClient client = getHCatClient(uri, conf, user);
+ return exists(uri, client, true);
}
@Override
public boolean delete(URI uri, Configuration conf, String user) throws URIAccessorException {
- HCatClient hCatClient = service.getHCatClient(uri, conf, user);
- return delete(hCatClient, uri);
+ HCatClient hCatClient = getHCatClient(uri, conf, user);
+ return delete(hCatClient, uri, true);
}
@Override
@@ -130,11 +152,69 @@ public class HCatURIHandler extends URIH
}
- private boolean exists(URI uri, HCatClient client) throws MetaDataAccessorException {
+ private HCatClient getHCatClient(URI uri, Configuration conf, String user) throws MetaDataAccessorException {
+ final HiveConf hiveConf = new HiveConf(conf, this.getClass());
+ String serverURI = getMetastoreConnectURI(uri);
+ if (!serverURI.equals("")) {
+ hiveConf.set(HiveConf.ConfVars.METASTORE_MODE.varname, "false");
+ }
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
+ HCatClient client = null;
+ try {
+ LOG.info("Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user,
+ UserGroupInformation.getLoginUser(), serverURI);
+ if (isFrontEnd) {
+ if (user == null) {
+ throw new MetaDataAccessorException(ErrorCode.E0902,
+ "user has to be specified to access metastore server");
+ }
+ UserGroupInformation ugi = ugiService.getProxyUser(user);
+ client = ugi.doAs(new PrivilegedExceptionAction<HCatClient>() {
+ public HCatClient run() throws Exception {
+ return HCatClient.create(hiveConf);
+ }
+ });
+ }
+ else {
+ if (user != null && !user.equals(UserGroupInformation.getLoginUser().getShortUserName())) {
+ throw new MetaDataAccessorException(ErrorCode.E0902,
+ "Cannot access metastore server as a different user in backend");
+ }
+ // Delegation token fetched from metastore has new Text() as service and HiveMetastoreClient
+ // looks for the same if not overriden by hive.metastore.token.signature
+ // We are good as long as HCatCredentialHelper does not change the service of the token.
+ client = HCatClient.create(hiveConf);
+ }
+ }
+ catch (HCatException e) {
+ throw new MetaDataAccessorException(ErrorCode.E1504, e);
+ }
+ catch (IOException e) {
+ throw new MetaDataAccessorException(ErrorCode.E1504, e);
+ }
+ catch (InterruptedException e) {
+ throw new MetaDataAccessorException(ErrorCode.E1504, e);
+ }
+
+ return client;
+ }
+
+ private String getMetastoreConnectURI(URI uri) {
+ // For unit tests
+ if (uri.getAuthority().equals("unittest-local")) {
+ return "";
+ }
+ // Hardcoding hcat to thrift mapping till support for webhcat(templeton)
+ // is added
+ String metastoreURI = "thrift://" + uri.getAuthority();
+ return metastoreURI;
+ }
+
+ private boolean exists(URI uri, HCatClient client, boolean closeClient) throws MetaDataAccessorException {
try {
HCatURI hcatURI = new HCatURI(uri.toString());
- List<HCatPartition> partitions = client.listPartitionsByFilter(hcatURI.getDb(), hcatURI.getTable(),
- hcatURI.toFilter());
+ List<HCatPartition> partitions = client.getPartitions(hcatURI.getDb(), hcatURI.getTable(),
+ hcatURI.getPartitionMap());
if (partitions == null || partitions.isEmpty()) {
return false;
}
@@ -149,35 +229,17 @@ public class HCatURIHandler extends URIH
catch (URISyntaxException e) {
throw new MetaDataAccessorException(ErrorCode.E0902, e);
}
+ finally {
+ closeQuietly(client, closeClient);
+ }
}
- private boolean delete(HCatClient client, URI uri) throws URIAccessorException {
+ private boolean delete(HCatClient client, URI uri, boolean closeClient) throws URIAccessorException {
try {
HCatURI hcatURI = new HCatURI(uri.toString());
- List<HCatPartition> partitions = client.listPartitionsByFilter(hcatURI.getDb(), hcatURI.getTable(),
- hcatURI.toFilter());
- if (partitions == null || partitions.isEmpty()) {
- return false;
- }
- else {
- // Only works if all partitions match. HCat team working on adding a dropPartitions API.
- // client.dropPartition(hcatURI.getDb(), hcatURI.getTable(),hcatURI.getPartitionMap(), true);
- // Tried an alternate way. But another bug. table.getPartCols() is empty.
- // TODO: Change this code and enable tests after fix from hcat team.
- HCatTable table = client.getTable(hcatURI.getDb(), hcatURI.getTable());
- List<HCatFieldSchema> partCols = table.getPartCols();
- Map<String, String> partKeyVals = new HashMap<String, String>();
- for (HCatPartition partition : partitions) {
- List<String> partVals = partition.getValues();
- partKeyVals.clear();
- for (int i = 0; i < partCols.size(); i++) {
- partKeyVals.put(partCols.get(i).getName(), partVals.get(i));
- }
- client.dropPartition(hcatURI.getDb(), hcatURI.getTable(), partKeyVals, true);
- }
- LOG.info("Dropped partitions for " + uri);
- return true;
- }
+ client.dropPartitions(hcatURI.getDb(), hcatURI.getTable(), hcatURI.getPartitionMap(), true);
+ LOG.info("Dropped partitions for " + uri);
+ return true;
}
catch (ConnectionFailureException e) {
throw new MetaDataAccessorException(ErrorCode.E1504, e);
@@ -188,6 +250,19 @@ public class HCatURIHandler extends URIH
catch (URISyntaxException e) {
throw new MetaDataAccessorException(ErrorCode.E0902, e);
}
+ finally {
+ closeQuietly(client, closeClient);
+ }
+ }
+
+ private void closeQuietly(HCatClient client, boolean close) {
+ if (close && client != null) {
+ try {
+ client.close();
+ }
+ catch (Exception ignore) {
+ }
+ }
}
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java Thu Dec 27 19:46:06 2012
@@ -37,4 +37,7 @@ public abstract class URIContext {
return user;
}
+ public void dispose() {
+ }
+
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java Thu Dec 27 19:46:06 2012
@@ -18,6 +18,7 @@
package org.apache.oozie.dependency;
import java.net.URI;
+import java.util.Collection;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
@@ -26,7 +27,7 @@ import org.jdom.Element;
public abstract class URIHandler {
- public abstract void init(Configuration conf);
+ public abstract void init(Configuration conf, boolean isFrontEnd);
/**
* Get the list of uri schemes supported by this URIHandler
@@ -35,6 +36,11 @@ public abstract class URIHandler {
*/
public abstract Set<String> getSupportedSchemes();
+ /** Get the list of dependent classes to ship to the hadoop launcher job for prepare actions
+ * @return dependent classes to ship to the hadoop job
+ */
+ public abstract Collection<Class<?>> getClassesToShip();
+
/**
* Get the type of dependency type of the URI. When the availability of the
* URI is to be determined by polling the type is DependencyType.PULL, and
@@ -73,7 +79,8 @@ public abstract class URIHandler {
*
* @param uri URI of the dependency
* @param conf Configuration to access the URI
- * @param user name of the user the URI should be accessed as
+ * @param user name of the user the URI should be accessed as. If null the
+ * logged in user is used.
*
* @return <code>true</code> if the URI did not exist and was successfully
* created; <code>false</code> if the URI already existed
@@ -100,6 +107,8 @@ public abstract class URIHandler {
*
* @param uri URI of the dependency
* @param conf Configuration to access the URI
+ * @param user name of the user the URI should be accessed as. If null the
+ * logged in user is used.
*
* @return <code>true</code> if the URI exists; <code>false</code> if the
* URI does not exist
@@ -113,11 +122,11 @@ public abstract class URIHandler {
*
* @param uri URI of the dependency
* @param conf Configuration to access the URI
- * @param user name of the user the URI should be accessed as
+ * @param user name of the user the URI should be accessed as. If null the
+ * logged in user is used.
*
* @return <code>true</code> if the URI exists and was successfully deleted;
* <code>false</code> if the URI does not exist
- *
* @throws URIAccessorException
*/
public abstract boolean delete(URI uri, Configuration conf, String user) throws URIAccessorException;
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java Thu Dec 27 19:46:06 2012
@@ -79,6 +79,7 @@ public class HadoopAccessorService imple
* Supported filesystem schemes for namespace federation
*/
public static final String SUPPORTED_FILESYSTEMS = CONF_PREFIX + "supported.filesystems";
+ public static final String[] DEFAULT_SUPPORTED_SCHEMES = new String[]{"hdfs","hftp","webhdfs"};
private Set<String> supportedSchemes;
private boolean allSchemesSupported;
@@ -126,7 +127,7 @@ public class HadoopAccessorService imple
preLoadActionConfigs(conf);
supportedSchemes = new HashSet<String>();
- String[] schemesFromConf = conf.getStrings(SUPPORTED_FILESYSTEMS, new String[]{"hdfs","hftp","webhdfs"});
+ String[] schemesFromConf = conf.getStrings(SUPPORTED_FILESYSTEMS, DEFAULT_SUPPORTED_SCHEMES);
if(schemesFromConf != null) {
for (String scheme: schemesFromConf) {
scheme = scheme.trim();
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetaDataAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetaDataAccessorService.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetaDataAccessorService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetaDataAccessorService.java Thu Dec 27 19:46:06 2012
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.service;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hcatalog.api.HCatClient;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.util.XLog;
-
-/**
- * This service provides a way of getting HCatClient instance
- */
-public class MetaDataAccessorService implements Service {
-
- public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
- private static XLog log;
- private Set<String> supportedSchemes;
- private UserGroupInformationService ugiService;
-
- @Override
- public void init(Services services) throws ServiceException {
- init(services.getConf());
- }
-
- private void init(Configuration conf) {
- log = XLog.getLog(getClass());
- supportedSchemes = new HashSet<String>();
- supportedSchemes.add("hcat");
- ugiService = Services.get().get(UserGroupInformationService.class);
- }
-
- public Set<String> getSupportedSchemes() {
- return supportedSchemes;
- }
-
- /**
- * Get an HCatClient object using doAs for end user
- *
- * @param uri : hcatalog access uri
- * @param user : end user id
- * @return : HCatClient
- * @throws Exception
- */
- public HCatClient getHCatClient(String uri, String user) throws MetaDataAccessorException {
- try {
- return getHCatClient(new URI(uri), new Configuration(), user);//TODO: Remove
- } catch (URISyntaxException e) {
- throw new MetaDataAccessorException(ErrorCode.E1504, e);
- }
- }
-
- /**
- * Get an HCatClient object using doAs for end user
- *
- * @param uri : hcatalog access uri
- * @param user : end user id
- * @return : HCatClient
- * @throws Exception
- */
- public HCatClient getHCatClient(URI uri, Configuration conf, String user) throws MetaDataAccessorException {
- final HiveConf hiveConf = new HiveConf(conf, this.getClass());
- String serverURI = getMetastoreConnectURI(uri);
- updateConf(conf, serverURI);
- HCatClient client = null;
- try {
- UserGroupInformation ugi = ugiService.getProxyUser(user);
- log.info("Create HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user,
- UserGroupInformation.getLoginUser(), uri);
- client = ugi.doAs(new PrivilegedExceptionAction<HCatClient>() {
- public HCatClient run() throws Exception {
- return HCatClient.create(hiveConf);
- }
- });
- }
- catch (Exception e) {
- throw new MetaDataAccessorException(ErrorCode.E1504, e);
- }
- return client;
- }
-
- public String getMetastoreConnectURI(URI uri) {
- //Hardcoding hcat to thrift mapping till support for webhcat(templeton) is added
- String metastoreURI = "thrift://" + uri.getHost() + ":" + uri.getPort();
- return metastoreURI;
- }
-
- private void updateConf(Configuration conf, String serverURI) {
- conf.set(HIVE_METASTORE_URIS, serverURI);
- }
-
- @Override
- public void destroy() {
-
- }
-
- @Override
- public Class<? extends Service> getInterface() {
- return MetaDataAccessorService.class;
- }
-
-}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/URIHandlerService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/URIHandlerService.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/URIHandlerService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/URIHandlerService.java Thu Dec 27 19:46:06 2012
@@ -19,16 +19,24 @@ package org.apache.oozie.service;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Stack;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.XException;
import org.apache.oozie.dependency.FSURIHandler;
+import org.apache.oozie.dependency.URIContext;
import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
public class URIHandlerService implements Service {
@@ -36,41 +44,115 @@ public class URIHandlerService implement
private static final String CONF_PREFIX = Service.CONF_PREFIX + "URIHandlerService.";
public static final String URI_HANDLERS = CONF_PREFIX + "uri.handlers";
public static final String URI_HANDLER_DEFAULT = CONF_PREFIX + "uri.handler.default";
+ public static final String URI_HANDLER_SUPPORTED_SCHEMES_PREFIX = CONF_PREFIX + "uri.handler.";
+ public static final String URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX = ".supported.schemes";
private static XLog LOG = XLog.getLog(URIHandlerService.class);
private Configuration conf;
+ private Configuration backendConf;
private Map<String, URIHandler> cache;
+ private Set<Class<?>> classesToShip;
private URIHandler defaultHandler;
@Override
public void init(Services services) throws ServiceException {
- conf = services.getConf();
+ try {
+ init(services.getConf(), true);
+ }
+ catch (Exception e) {
+ throw new ServiceException(ErrorCode.E0902, e);
+ }
+ }
+
+ public void init(Configuration conf, boolean isFrontEnd) throws ClassNotFoundException {
+ this.conf = conf;
cache = new HashMap<String, URIHandler>();
+
String[] classes = conf.getStrings(URI_HANDLERS, FSURIHandler.class.getName());
- try {
- for (String classname : classes) {
- Class<?> clazz = Class.forName(classname);
- URIHandler uriHandler = (URIHandler) ReflectionUtils.newInstance(clazz, null);
- uriHandler.init(conf);
- for (String scheme : uriHandler.getSupportedSchemes()) {
- cache.put(scheme, uriHandler);
- }
+ for (String classname : classes) {
+ Class<?> clazz = Class.forName(classname);
+ URIHandler uriHandler = (URIHandler) ReflectionUtils.newInstance(clazz, null);
+ uriHandler.init(conf, isFrontEnd);
+ for (String scheme : uriHandler.getSupportedSchemes()) {
+ cache.put(scheme, uriHandler);
}
- Class<?> defaultClass = conf.getClass(URI_HANDLER_DEFAULT, null);
- defaultHandler = (defaultClass == null) ? new FSURIHandler() : (URIHandler) ReflectionUtils.newInstance(
- defaultClass, null);
- defaultHandler.init(conf);
- for (String scheme : defaultHandler.getSupportedSchemes()) {
- cache.put(scheme, defaultHandler);
- }
- LOG.info("Loaded urihandlers {0}", (Object[])classes);
- LOG.info("Loaded default urihandler {0}", defaultHandler.getClass().getName());
}
- catch (ClassNotFoundException e) {
- throw new ServiceException(ErrorCode.E0902, e);
+
+ Class<?> defaultClass = conf.getClass(URI_HANDLER_DEFAULT, null);
+ defaultHandler = (defaultClass == null) ? new FSURIHandler() : (URIHandler) ReflectionUtils.newInstance(
+ defaultClass, null);
+ defaultHandler.init(conf, isFrontEnd);
+ for (String scheme : defaultHandler.getSupportedSchemes()) {
+ cache.put(scheme, defaultHandler);
+ }
+
+ if (isFrontEnd) {
+ initClassesToShip();
+ initURIServiceBackendConf();
+ }
+
+ LOG.info("Loaded urihandlers {0}", Arrays.toString(classes));
+ LOG.info("Loaded default urihandler {0}", defaultHandler.getClass().getName());
+ }
+
+ /**
+ * Initialize classes that need to be shipped for using URIHandlerService in the launcher job
+ */
+ private void initClassesToShip(){
+ classesToShip = new HashSet<Class<?>>();
+ classesToShip.add(Service.class);
+ classesToShip.add(ServiceException.class);
+ classesToShip.add(URIHandlerService.class);
+ classesToShip.add(URIHandler.class);
+ classesToShip.add(URIContext.class);
+ classesToShip.add(ErrorCode.class);
+ classesToShip.add(XException.class);
+ classesToShip.add(URIAccessorException.class);
+ // XLog, XLog$Level, XLog$Info, XLog$Info$InfoThreadLocal. Could not find a way to
+ // get anonymous inner classes. So created InfoThreadLocal class.
+ classesToShip.addAll(getDeclaredClasses(XLog.class));
+ classesToShip.add(ParamChecker.class);
+
+ for (URIHandler handler : cache.values()) {
+ classesToShip.add(handler.getClass());
+ classesToShip.addAll(handler.getClassesToShip());
}
}
+ /**
+ * Initialize configuration required for using URIHandlerService in the launcher job
+ */
+ private void initURIServiceBackendConf() {
+ backendConf = new Configuration(false);
+ String handlersConf = conf.get(URI_HANDLERS);
+ if (handlersConf != null) {
+ backendConf.set(URI_HANDLERS, handlersConf);
+ }
+ String defaultHandlerConf = conf.get(URI_HANDLER_DEFAULT);
+ if (defaultHandlerConf != null) {
+ backendConf.set(URI_HANDLER_DEFAULT, defaultHandlerConf);
+ }
+
+ for (URIHandler handler : cache.values()) {
+ String schemeConf = URI_HANDLER_SUPPORTED_SCHEMES_PREFIX + handler.getClass().getSimpleName()
+ + URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX;
+ backendConf.set(schemeConf, collectionToString(handler.getSupportedSchemes()));
+ }
+ }
+
+ private String collectionToString(Collection<String> strs) {
+ if (strs.size() == 0) {
+ return "";
+ }
+ StringBuilder sbuf = new StringBuilder();
+ for (String str : strs) {
+ sbuf.append(str);
+ sbuf.append(",");
+ }
+ sbuf.setLength(sbuf.length() - 1);
+ return sbuf.toString();
+ }
+
@Override
public void destroy() {
Set<URIHandler> handlers = new HashSet<URIHandler>();
@@ -86,6 +168,22 @@ public class URIHandlerService implement
return URIHandlerService.class;
}
+ /**
+ * Return the classes to be shipped to the launcher
+ * @return the set of classes to be shipped to the launcher
+ */
+ public Set<Class<?>> getURIHandlerClassesToShip() {
+ return classesToShip;
+ }
+
+ /**
+ * Return the configuration required to instantiate URIHandlerService in the launcher
+ * @return configuration
+ */
+ public Configuration getURIHandlerServiceConfig() {
+ return backendConf;
+ }
+
public URIHandler getURIHandler(String uri) throws URIAccessorException {
try {
return getURIHandler(new URI(uri));
@@ -148,4 +246,18 @@ public class URIHandlerService implement
}
}
+ private Collection<Class<?>> getDeclaredClasses(Class<?> clazz) {
+ List<Class<?>> classes = new ArrayList<Class<?>>();
+ Stack<Class<?>> stack = new Stack<Class<?>>();
+ stack.push(clazz);
+ do {
+ clazz = stack.pop();
+ classes.add(clazz);
+ for (Class<?> dclazz : clazz.getDeclaredClasses()) {
+ stack.push(dclazz);
+ }
+ } while (!stack.isEmpty());
+ return classes;
+ }
+
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java Thu Dec 27 19:46:06 2012
@@ -31,7 +31,7 @@ import org.apache.hadoop.conf.Configurat
*/
public class HCatURI {
- public static final String PARTITION_SEPARATOR = "&";
+ public static final String PARTITION_SEPARATOR = ";";
public static final String PARTITION_KEYVAL_SEPARATOR = "=";
public static final String PATH_SEPARATOR = "/";
public static final String PARTITION_VALUE_QUOTE = "'";
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/XLog.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/XLog.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/XLog.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/XLog.java Thu Dec 27 19:46:06 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -43,13 +43,15 @@ public class XLog implements Log {
private static String template = "";
private static List<String> parameterNames = new ArrayList<String>();
- private static ThreadLocal<Info> tlLogInfo = new ThreadLocal<Info>() {
+ private static ThreadLocal<Info> tlLogInfo = new InfoThreadLocal();
+
+ private static class InfoThreadLocal extends ThreadLocal<Info> {
@Override
protected Info initialValue() {
return new Info();
}
- };
+ }
/**
* Define a <code>LogInfo</code> context parameter. <p/> The parameter name and its contextual value will be
Modified: oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml (original)
+++ oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml Thu Dec 27 19:46:06 2012
@@ -92,7 +92,6 @@
org.apache.oozie.service.AuthorizationService,
org.apache.oozie.service.UserGroupInformationService,
org.apache.oozie.service.HadoopAccessorService,
- org.apache.oozie.service.MetaDataAccessorService,
org.apache.oozie.service.URIHandlerService,
org.apache.oozie.service.MemoryLocksService,
org.apache.oozie.service.DagXLogInfoService,
Added: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java?rev=1426301&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java (added)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java Thu Dec 27 19:46:06 2012
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.oozie.test.XFsTestCase;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.junit.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+
+public class TestFSPrepareActions extends XFsTestCase {
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ Services services = new Services();
+ services.getConf().set(HadoopAccessorService.SUPPORTED_FILESYSTEMS, "hdfs");
+ services.init();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ Services.get().destroy();
+ super.tearDown();
+ }
+
+ // Test for delete as prepare action
+ @Test
+ public void testDelete() throws Exception {
+ Path actionDir = getFsTestCaseDir();
+ FileSystem fs = getFileSystem();
+ Path newDir = new Path(actionDir, "newDir");
+ // Delete the file if it is already there
+ if (fs.exists(newDir)) {
+ fs.delete(newDir, true);
+ }
+ fs.mkdirs(newDir);
+ // Prepare block that contains delete action
+ String prepareXML = "<prepare>" + "<delete path='" + newDir + "'/>" + "</prepare>";
+
+ JobConf conf = createJobConf();
+ LauncherMapper.setupURIServiceConf(conf);
+ PrepareActionsDriver.doOperations(prepareXML, conf);
+ assertFalse(fs.exists(newDir));
+ }
+
+ // Test for mkdir as prepare action
+ @Test
+ public void testMkdir() throws Exception {
+ Path actionDir = getFsTestCaseDir();
+ FileSystem fs = getFileSystem();
+ Path newDir = new Path(actionDir, "newDir");
+ // Delete the file if it is already there
+ if (fs.exists(newDir)) {
+ fs.delete(newDir, true);
+ }
+ // Prepare block that contains mkdir action
+ String prepareXML = "<prepare>" + "<mkdir path='" + newDir + "'/>" + "</prepare>";
+
+ JobConf conf = createJobConf();
+ LauncherMapper.setupURIServiceConf(conf);
+ PrepareActionsDriver.doOperations(prepareXML, conf);
+ assertTrue(fs.exists(newDir));
+ }
+
+ // Test for invalid scheme value in the path for action
+ @Test
+ public void testForInvalidScheme() throws Exception {
+ Path actionDir = getFsTestCaseDir();
+ // Construct a path with invalid scheme
+ Path newDir = new Path("hftp:/" + actionDir.toString().substring(5) + "/delete");
+ // Construct prepare XML block with the path
+ String prepareXML = "<prepare>" + "<delete path='" + newDir + "'/>" + "</prepare>";
+ // Parse the XML to get the node
+ Document doc = PrepareActionsDriver.getDocumentFromXML(prepareXML);
+ Node n = doc.getDocumentElement().getChildNodes().item(0);
+
+ try {
+ JobConf conf = createJobConf();
+ LauncherMapper.setupURIServiceConf(conf);
+ PrepareActionsDriver.doOperations(prepareXML, conf);
+ fail("Expected to catch an exception but did not encounter any");
+ } catch (LauncherException le) {
+ Path path = new Path(n.getAttributes().getNamedItem("path").getNodeValue().trim());
+ assertEquals("E0904: Scheme [hftp] not supported in uri [" + path + "]" , le.getMessage());
+ } catch(Exception ex){
+ fail("Expected a LauncherException but received an Exception");
+ }
+ }
+
+ // Test for null scheme value in the path for action
+ @Test
+ public void testForNullScheme() throws Exception {
+ // Construct a path without scheme
+ Path newDir = new Path("test/oozietests/testDelete/delete");
+ FileSystem fs = getFileSystem();
+ // Delete the file if it is already there
+ if (fs.exists(newDir)) {
+ fs.delete(newDir, true);
+ }
+ // Construct prepare XML block with the path
+ String prepareXML = "<prepare>" + "<delete path='" + newDir + "'/>" + "</prepare>";
+
+ // Parse the XML to get the node
+ Document doc = PrepareActionsDriver.getDocumentFromXML(prepareXML);
+ Node n = doc.getDocumentElement().getChildNodes().item(0);
+
+ try {
+ JobConf conf = createJobConf();
+ LauncherMapper.setupURIServiceConf(conf);
+ PrepareActionsDriver.doOperations(prepareXML, conf);
+ fail("Expected to catch an exception but did not encounter any");
+ } catch (LauncherException le) {
+ Path path = new Path(n.getAttributes().getNamedItem("path").getNodeValue().trim());
+ assertEquals("E0905: Scheme not present in uri [" + path + "]", le.getMessage());
+ } catch(Exception ex) {
+ fail("Expected a LauncherException but received an Exception");
+ }
+ }
+
+}
Modified: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestFileSystemActions.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestFileSystemActions.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestFileSystemActions.java (original)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestFileSystemActions.java Thu Dec 27 19:46:06 2012
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.action.hadoop;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.test.XFsTestCase;
-import org.apache.oozie.service.Services;
-import org.w3c.dom.Document;
-import org.w3c.dom.Node;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.util.Arrays;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-
-public class TestFileSystemActions extends XFsTestCase {
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- new Services().init();
- }
-
- @Override
- protected void tearDown() throws Exception {
- Services.get().destroy();
- super.tearDown();
- }
-
- // Test for delete as prepare action
- public void testDelete() throws Exception {
- Path actionDir = getFsTestCaseDir();
- FileSystem fs = getFileSystem();
- Path newDir = new Path(actionDir, "newDir");
- // Delete the file if it is already there
- if (fs.exists(newDir)) {
- fs.delete(newDir, true);
- }
- fs.mkdirs(newDir);
- // Prepare block that contains delete action
- String prepareXML = "<prepare>" + "<delete path='" + newDir + "'/>" + "</prepare>";
-
- // Parse the XML to get the node
- Document doc = PrepareActionsDriver.getDocumentFromXML(prepareXML);
- Node n = doc.getDocumentElement().getChildNodes().item(0);
-
- new FileSystemActions(Arrays.asList("hdfs")).execute(n);
- assertFalse(fs.exists(newDir));
- }
-
- // Test for mkdir as prepare action
- public void testMkdir() throws Exception {
- Path actionDir = getFsTestCaseDir();
- FileSystem fs = getFileSystem();
- Path newDir = new Path(actionDir, "newDir");
- // Delete the file if it is already there
- if (fs.exists(newDir)) {
- fs.delete(newDir, true);
- }
-
- // Prepare block that contains mkdir action
- String prepareXML = "<prepare>" + "<mkdir path='" + newDir + "'/>" + "</prepare>";
-
- // Parse the XML to get the node
- Document doc = PrepareActionsDriver.getDocumentFromXML(prepareXML);
- Node n = doc.getDocumentElement().getChildNodes().item(0);
-
- new FileSystemActions(Arrays.asList("hdfs")).execute(n);
- assertTrue(fs.exists(newDir));
- }
-
- // Test for invalid scheme value in the path for action
- public void testForInvalidScheme() throws Exception {
- Path actionDir = getFsTestCaseDir();
- // Construct a path with invalid scheme
- Path newDir = new Path("file:/" + actionDir.toString().substring(5) + "/delete");
- // Construct prepare XML block with the path
- String prepareXML = "<prepare>" + "<delete path='" + newDir + "'/>" + "</prepare>";
- // Parse the XML to get the node
- Document doc = PrepareActionsDriver.getDocumentFromXML(prepareXML);
- Node n = doc.getDocumentElement().getChildNodes().item(0);
-
- try {
- new FileSystemActions(Arrays.asList("hdfs")).execute(n);
- fail("Expected to catch an exception but did not encounter any");
- } catch (LauncherException le) {
- Path path = new Path(n.getAttributes().getNamedItem("path").getNodeValue().trim());
- assertEquals("Scheme of '" + path + "' is not supported.", le.getMessage());
- } catch(Exception ex){
- fail("Expected a LauncherException but received an Exception");
- }
- }
-
- // Test for null scheme value in the path for action
- public void testForNullScheme() throws Exception {
- // Construct a path without scheme
- Path newDir = new Path("test/oozietests/testDelete/delete");
- FileSystem fs = getFileSystem();
- // Delete the file if it is already there
- if (fs.exists(newDir)) {
- fs.delete(newDir, true);
- }
- // Construct prepare XML block with the path
- String prepareXML = "<prepare>" + "<delete path='" + newDir + "'/>" + "</prepare>";
-
- // Parse the XML to get the node
- Document doc = PrepareActionsDriver.getDocumentFromXML(prepareXML);
- Node n = doc.getDocumentElement().getChildNodes().item(0);
-
- try {
- new FileSystemActions(Arrays.asList("hdfs")).execute(n);
- fail("Expected to catch an exception but did not encounter any");
- } catch (LauncherException le) {
- Path path = new Path(n.getAttributes().getNamedItem("path").getNodeValue().trim());
- assertEquals("Scheme of the path " + path + " is null", le.getMessage());
- } catch(Exception ex) {
- fail("Expected a LauncherException but received an Exception");
- }
- }
-
-}
Added: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java?rev=1426301&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java (added)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java Thu Dec 27 19:46:06 2012
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.oozie.test.XHCatTestCase;
+import org.apache.oozie.dependency.FSURIHandler;
+import org.apache.oozie.dependency.HCatURIHandler;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
+import org.junit.Test;
+
+public class TestHCatPrepareActions extends XHCatTestCase {
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ Services services = new Services();
+ services.getConf().set(URIHandlerService.URI_HANDLERS,
+ FSURIHandler.class.getName() + "," + HCatURIHandler.class.getName());
+ services.init();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ Services.get().destroy();
+ super.tearDown();
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ dropTable("db1", "table1", true);
+ dropDatabase("db1", true);
+ createDatabase("db1");
+ createTable("db1", "table1", "year,month,dt,country");
+ String part1 = addPartition("db1", "table1", "year=2012;month=12;dt=02;country=us");
+ String part2 = addPartition("db1", "table1", "year=2012;month=12;dt=03;country=us");
+ String part3 = addPartition("db1", "table1", "year=2013;month=1;dt=01;country=us");
+
+ String uri1 = "hcat://" + getMetastoreAuthority() + "/db1/table1/year=2012;month=12";
+ String uri2 = "hcat://" + getMetastoreAuthority() + "/db1/table1/year=2013;dt=01";
+
+ // Prepare block that contains delete action
+ String prepareXML = "<prepare>"
+ + "<delete path='" + uri1 + "'/>"
+ + "<delete path='" + uri2 + "'/>"
+ + "</prepare>";
+
+ JobConf conf = createJobConf();
+ LauncherMapper.setupURIServiceConf(conf);
+ PrepareActionsDriver.doOperations(prepareXML, conf);
+ FileSystem fs = getFileSystem();
+ assertFalse(fs.exists(new Path(part1)));
+ assertFalse(fs.exists(new Path(part2)));
+ assertFalse(fs.exists(new Path(part3)));
+ assertEquals(0, getPartitions("db1", "table1", "country=us").size());
+ dropTable("db1", "table1", true);
+ dropDatabase("db1", true);
+ }
+
+}
Modified: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java?rev=1426301&r1=1426300&r2=1426301&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java (original)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java Thu Dec 27 19:46:06 2012
@@ -19,6 +19,7 @@ package org.apache.oozie.action.hadoop;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.oozie.test.MiniHCatServer;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import java.io.File;
@@ -126,6 +127,8 @@ public class TestHiveMain extends MainTe
os = new FileOutputStream(hiveSite);
jobConf.writeXml(os);
os.close();
+ MiniHCatServer.resetDefaultDBCreation();
+ MiniHCatServer.resetHiveConfStaticVariables();
HiveMain.main(null);
}
catch (SecurityException ex) {
@@ -143,6 +146,7 @@ public class TestHiveMain extends MainTe
finally {
System.setProperty("user.name", user);
hiveSite.delete();
+ MiniHCatServer.resetHiveConfStaticVariables();
}
assertTrue(outputDataFile.exists());