You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/02/07 21:10:57 UTC

svn commit: r1443694 [1/3] - in /oozie/branches/hcat-intre: ./ core/src/main/java/org/apache/oozie/action/hadoop/ core/src/main/java/org/apache/oozie/client/rest/ core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java/org/apache/oozie/co...

Author: virag
Date: Thu Feb  7 20:10:55 2013
New Revision: 1443694

URL: http://svn.apache.org/r1443694
Log:
OOZIE-1210 Rework uri handling for Prepare actions and jms server mapping (rohini via virag)

Added:
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandlerException.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSConnectionInfo.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorService.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherFSURIHandler.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherHCatURIHandler.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestHCatAccessorService.java
Removed:
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/DependencyType.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/URIAccessorException.java
Modified:
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIContext.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/cache/SimpleHCatDependencyCache.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorException.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetaDataAccessorException.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/URIHandlerService.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/UserGroupInformationService.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/XLog.java
    oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/dependency/TestFSURIHandler.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/dependency/TestHCatURIHandler.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/dependency/TestURIHandlerService.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestHCatMessageHandler.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XTestCase.java
    oozie/branches/hcat-intre/release-log.txt
    oozie/branches/hcat-intre/tests/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java

Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java Thu Feb  7 20:10:55 2013
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FSLauncherURIHandler implements LauncherURIHandler {
+
+    @Override
+    public boolean create(URI uri, Configuration conf) throws LauncherException {
+        boolean status = false;
+        try {
+            FileSystem fs = FileSystem.get(uri, conf);
+            Path path = getNormalizedPath(uri);
+            if (!fs.exists(path)) {
+                status = fs.mkdirs(path);
+                if (status) {
+                    System.out.println("Creating directory at " + path + " succeeded.");
+                }
+                else {
+                    System.out.println("Creating directory at " + path + " failed.");
+                }
+            }
+        }
+        catch (IOException e) {
+            throw new LauncherException("Creating directory at " + uri + " failed.", e);
+        }
+        return status;
+    }
+
+    @Override
+    public boolean delete(URI uri, Configuration conf) throws LauncherException {
+        boolean status = false;
+        try {
+            FileSystem fs = FileSystem.get(uri, conf);
+            Path path = getNormalizedPath(uri);
+            if (fs.exists(path)) {
+                status = fs.delete(path, true);
+                if (status) {
+                    System.out.println("Deletion of path " + path + " succeeded.");
+                }
+                else {
+                    System.out.println("Deletion of path " + path + " failed.");
+                }
+            }
+        }
+        catch (IOException e) {
+            throw new LauncherException("Deletion of path " + uri + " failed.", e);
+        }
+        return status;
+    }
+
+    private Path getNormalizedPath(URI uri) {
+        // Normalizes uri path replacing // with / in the path which users specify by mistake
+        return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
+    }
+
+    @Override
+    public List<Class<?>> getClassesForLauncher() {
+        return null;
+    }
+
+}

Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java Thu Feb  7 20:10:55 2013
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hcatalog.api.ConnectionFailureException;
+import org.apache.hcatalog.api.HCatClient;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.oozie.util.HCatURI;
+
+public class HCatLauncherURIHandler implements LauncherURIHandler {
+
+    private static List<Class<?>> classesToShip = new ArrayList<Class<?>>();
+
+    static {
+        classesToShip.add(HCatURI.class);
+    }
+
+    @Override
+    public boolean create(URI uri, Configuration conf) throws LauncherException {
+        throw new UnsupportedOperationException("Creation of partition is not supported for " + uri);
+    }
+
+    @Override
+    public boolean delete(URI uri, Configuration conf) throws LauncherException {
+        HCatClient client = getHCatClient(uri, conf);
+        try {
+            HCatURI hcatURI = new HCatURI(uri.toString());
+            client.dropPartitions(hcatURI.getDb(), hcatURI.getTable(), hcatURI.getPartitionMap(), true);
+            System.out.println("Dropped partitions for " + uri);
+            return true;
+        }
+        catch (ConnectionFailureException e) {
+            throw new LauncherException("Error trying to drop " + uri, e);
+        }
+        catch (HCatException e) {
+            throw new LauncherException("Error trying to drop " + uri, e);
+        }
+        catch (URISyntaxException e) {
+            throw new LauncherException("Error trying to drop " + uri, e);
+        }
+        finally {
+            closeQuietly(client);
+        }
+    }
+
+    private HCatClient getHCatClient(URI uri, Configuration conf) throws LauncherException {
+        final HiveConf hiveConf = new HiveConf(conf, this.getClass());
+        String serverURI = getMetastoreConnectURI(uri);
+        if (!serverURI.equals("")) {
+            hiveConf.set("hive.metastore.local", "false");
+        }
+        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
+        try {
+            System.out.println("Creating HCatClient for user=" + UserGroupInformation.getLoginUser() + " and server="
+                    + serverURI);
+            // Delegation token fetched from metastore has new Text() as service and
+            // HiveMetastoreClient looks for the same if not overriden by hive.metastore.token.signature
+            // We are good as long as HCatCredentialHelper does not change the service of the token.
+            return HCatClient.create(hiveConf);
+        }
+        catch (HCatException e) {
+            throw new LauncherException("Error trying to connect to " + serverURI, e);
+        }
+        catch (IOException e) {
+            throw new LauncherException("Error trying to connect to " + serverURI, e);
+        }
+    }
+
+    private String getMetastoreConnectURI(URI uri) {
+        String metastoreURI;
+        // For unit tests
+        if (uri.getAuthority().equals("unittest-local")) {
+            metastoreURI = "";
+        }
+        else {
+            // Hardcoding hcat to thrift mapping till support for webhcat(templeton)
+            // is added
+            metastoreURI = "thrift://" + uri.getAuthority();
+        }
+        return metastoreURI;
+    }
+
+    private void closeQuietly(HCatClient client) {
+        if (client != null) {
+            try {
+                client.close();
+            }
+            catch (Exception ignore) {
+                System.err.println("Error closing hcat client");
+                ignore.printStackTrace(System.err);
+            }
+        }
+    }
+
+    @Override
+    public List<Class<?>> getClassesForLauncher() {
+        return classesToShip;
+    }
+
+}

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java Thu Feb  7 20:10:55 2013
@@ -129,7 +129,7 @@ public class JavaActionExecutor extends 
         classes.add(LauncherException.class);
         classes.add(LauncherMainException.class);
         classes.add(PrepareActionsDriver.class);
-        classes.addAll(Services.get().get(URIHandlerService.class).getURIHandlerClassesToShip());
+        classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
         classes.add(ActionStats.class);
         classes.add(ActionType.class);
         return classes;
@@ -586,7 +586,7 @@ public class JavaActionExecutor extends 
                     prepareXML);
 
             LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
-            LauncherMapper.setupURIServiceConf(launcherJobConf);
+            LauncherMapper.setupLauncherURIHandlerConf(launcherJobConf);
             LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
             LauncherMapper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
 

Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java Thu Feb  7 20:10:55 2013
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+public class LauncherException extends Exception {
+
+    /**
+     * Constructs an <code>LauncherException</code> with the
+     * specified detail message.
+     *
+     * @param message   the detail message.
+     */
+    LauncherException(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message and cause.
+     *
+     * @param message the detail message
+     * @param cause the cause exception
+     */
+    LauncherException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java Thu Feb  7 20:10:55 2013
@@ -150,8 +150,8 @@ public class LauncherMapper<K1, V1, K2, 
         launcherConf.set(CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
     }
 
-    public static void setupURIServiceConf(Configuration launcherConf) {
-        for(Entry<String, String> entry : Services.get().get(URIHandlerService.class).getURIHandlerServiceConfig()) {
+    public static void setupLauncherURIHandlerConf(Configuration launcherConf) {
+        for(Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) {
             launcherConf.set(entry.getKey(), entry.getValue());
         }
     }
@@ -802,14 +802,3 @@ class LauncherSecurityManager extends Se
         exitCode = 0;
     }
 }
-
-class LauncherException extends Exception {
-
-    LauncherException(String message) {
-        super(message);
-    }
-
-    LauncherException(String message, Throwable cause) {
-        super(message, cause);
-    }
-}

Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java Thu Feb  7 20:10:55 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+public interface LauncherURIHandler {
+
+    /**
+     * Create the resource identified by the URI
+     *
+     * @param uri URI of the dependency
+     * @param conf Configuration to access the URI
+     *
+     * @return <code>true</code> if the URI did not exist and was successfully
+     *         created; <code>false</code> if the URI already existed
+     *
+     * @throws LauncherException
+     */
+    public boolean create(URI uri, Configuration conf) throws LauncherException;
+
+    /**
+     * Delete the resource identified by the URI
+     *
+     * @param uri URI of the dependency
+     * @param conf Configuration to access the URI
+     *
+     * @return <code>true</code> if the URI exists and was successfully deleted;
+     *         <code>false</code> if the URI does not exist
+     * @throws LauncherException
+     */
+    public boolean delete(URI uri, Configuration conf) throws LauncherException;
+
+
+    /**
+     * Get list of classes to ship to launcher for LauncherURIHandler
+     *
+     * @return list of classes to ship to launcher
+     */
+    public List<Class<?>> getClassesForLauncher();
+
+}

Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java Thu Feb  7 20:10:55 2013
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class LauncherURIHandlerFactory {
+
+    public static final String CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX = "oozie.launcher.action.urihandler.scheme.";
+    private Configuration conf;
+
+    public LauncherURIHandlerFactory(Configuration conf) {
+        this.conf = conf;
+    }
+
+    /**
+     * Get LauncherURIHandler to perform operations on a URI in the launcher
+     * @param uri
+     * @return LauncherURIHandler to perform operations on the URI
+     * @throws LauncherException
+     */
+    public LauncherURIHandler getURIHandler(URI uri) throws LauncherException {
+        if (uri.getScheme() == null) {
+            throw new LauncherException("Scheme not present in uri " + uri);
+        }
+        else {
+            String className = conf.get(CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + uri.getScheme());
+            if (className == null) {
+                className = conf.get(CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + "*");
+            }
+            if (className == null) {
+                throw new LauncherException("Scheme " + uri.getScheme() + " not supported in uri " + uri.toString());
+            }
+            Class<?> clazz;
+            try {
+                clazz = Class.forName(className);
+            }
+            catch (ClassNotFoundException e) {
+                throw new LauncherException("Error instantiating LauncherURIHandler", e);
+            }
+            return (LauncherURIHandler) ReflectionUtils.newInstance(clazz, null);
+        }
+    }
+
+}

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java Thu Feb  7 20:10:55 2013
@@ -24,9 +24,6 @@ import java.net.URI;
 import java.net.URISyntaxException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.oozie.dependency.URIHandler;
-import org.apache.oozie.service.URIAccessorException;
-import org.apache.oozie.service.URIHandlerService;
 import org.xml.sax.SAXException;
 import org.w3c.dom.Document;
 import org.w3c.dom.Node;
@@ -55,8 +52,7 @@ public class PrepareActionsDriver {
 
             // Get the list of child nodes, basically, each one corresponding to a separate action
             NodeList nl = doc.getDocumentElement().getChildNodes();
-            URIHandlerService service = new URIHandlerService();
-            service.init(conf, false);
+            LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf);
 
             for (int i = 0; i < nl.getLength(); ++i) {
                 Node n = nl.item(i);
@@ -66,7 +62,7 @@ public class PrepareActionsDriver {
                 }
                 String path = n.getAttributes().getNamedItem("path").getNodeValue().trim();
                 URI uri = new URI(path);
-                URIHandler handler = service.getURIHandler(uri, true);
+                LauncherURIHandler handler = factory.getURIHandler(uri);
                 execute(operation, uri, handler, conf);
             }
         } catch (IOException ioe) {
@@ -77,10 +73,6 @@ public class PrepareActionsDriver {
             throw new LauncherException(pce.getMessage(), pce);
         } catch (URISyntaxException use) {
             throw new LauncherException(use.getMessage(), use);
-        } catch (URIAccessorException uae) {
-            throw new LauncherException(uae.getMessage(), uae);
-        } catch (ClassNotFoundException cnfe) {
-            throw new LauncherException(cnfe.getMessage(), cnfe);
         }
     }
 
@@ -89,15 +81,14 @@ public class PrepareActionsDriver {
      *
      * @param n Child node of the prepare XML
      * @throws LauncherException
-     * @throws URIAccessorException
      */
-    private static void execute(String operation, URI uri, URIHandler handler, Configuration conf)
-            throws URIAccessorException {
+    private static void execute(String operation, URI uri, LauncherURIHandler handler, Configuration conf)
+            throws LauncherException {
         if (operation.equals("delete")) {
-            handler.delete(uri, conf, null);
+            handler.delete(uri, conf);
         }
         else if (operation.equals("mkdir")) {
-            handler.create(uri, conf, null);
+            handler.create(uri, conf);
         }
     }
 

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java Thu Feb  7 20:10:55 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -139,7 +139,7 @@ public class JsonCoordinatorJob implemen
     public JSONObject toJSONObject() {
         return toJSONObject("GMT");
     }
-    
+
     @SuppressWarnings("unchecked")
     public JSONObject toJSONObject(String timeZoneId) {
         JSONObject json = new JSONObject();
@@ -156,7 +156,7 @@ public class JsonCoordinatorJob implemen
         json.put(JsonTags.COORDINATOR_JOB_CONCURRENCY, getConcurrency());
         json.put(JsonTags.COORDINATOR_JOB_TIMEOUT, getTimeout());
         json.put(JsonTags.COORDINATOR_JOB_LAST_ACTION_TIME, JsonUtils.formatDateRfc822(getLastActionTime(), timeZoneId));
-        json.put(JsonTags.COORDINATOR_JOB_NEXT_MATERIALIZED_TIME, 
+        json.put(JsonTags.COORDINATOR_JOB_NEXT_MATERIALIZED_TIME,
                 JsonUtils.formatDateRfc822(getNextMaterializedTime(), timeZoneId));
         json.put(JsonTags.COORDINATOR_JOB_START_TIME, JsonUtils.formatDateRfc822(getStartTime(), timeZoneId));
         json.put(JsonTags.COORDINATOR_JOB_END_TIME, JsonUtils.formatDateRfc822(getEndTime(), timeZoneId));
@@ -365,7 +365,7 @@ public class JsonCoordinatorJob implemen
 
     @Override
     public String toString() {
-        return MessageFormat.format("Coornidator application id[{0}] status[{1}]", getId(), getStatus());
+        return MessageFormat.format("Coordinator application id[{0}] status[{1}]", getId(), getStatus());
     }
 
     public void setActions(List<? extends JsonCoordinatorAction> nodes) {
@@ -405,8 +405,6 @@ public class JsonCoordinatorJob implemen
 
     /**
      * Set pending to true
-     *
-     * @param pending set pending to true
      */
     public void setPending() {
         this.pending = 1;
@@ -414,8 +412,6 @@ public class JsonCoordinatorJob implemen
 
     /**
      * Set pending to false
-     *
-     * @param pending set pending to false
      */
     public void resetPending() {
         this.pending = 0;

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java Thu Feb  7 20:10:55 2013
@@ -36,6 +36,7 @@ import org.apache.oozie.command.Precondi
 import org.apache.oozie.coord.CoordELEvaluator;
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.dependency.URIHandlerException;
 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
@@ -44,7 +45,6 @@ import org.apache.oozie.executor.jpa.JPA
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Service;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.service.URIAccessorException;
 import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ELEvaluator;
@@ -134,8 +134,8 @@ public class CoordActionInputCheckXComma
                 nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
             }
             String nonExistListStr = nonExistList.toString();
-            if (!nonExistListStr.equals(missingDeps) || missingDeps.isEmpty()) {
-                // missingDeps empty means action should become READY
+            if (!nonExistListStr.equals(missingDeps)) {
+                // missingDeps null means action should become READY
                 isChangeInDependency = true;
                 coordAction.setMissingDependencies(nonExistListStr);
             }
@@ -155,7 +155,13 @@ public class CoordActionInputCheckXComma
                 }
             }
             else {
-                queue(new CoordActionTimeOutXCommand(coordAction), 100);
+                if (!nonExistListStr.isEmpty() && pushDeps == null || pushDeps.length() == 0) {
+                    queue(new CoordActionTimeOutXCommand(coordAction), 100);
+                }
+                else {
+                    // Let CoordPushDependencyCheckXCommand queue the timeout
+                    queue(new CoordPushDependencyCheckXCommand(coordAction.getId()));
+                }
             }
         }
         catch (Exception e) {
@@ -174,10 +180,7 @@ public class CoordActionInputCheckXComma
                 .getCreatedTime().getTime()))
                 / (60 * 1000);
         int timeOut = coordAction.getTimeOut();
-        if ((timeOut >= 0) && (waitingTime > timeOut)) {
-            return true;
-        }
-        return false;
+        return (timeOut >= 0) && (waitingTime > timeOut);
     }
 
     private void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
@@ -456,7 +459,7 @@ public class CoordActionInputCheckXComma
             URIHandler handler = service.getURIHandler(uri);
             return handler.exists(uri, actionConf, user);
         }
-        catch (URIAccessorException e) {
+        catch (URIHandlerException e) {
             coordAction.setErrorCode(e.getErrorCode().toString());
             coordAction.setErrorMessage(e.getMessage());
             throw new IOException(e);

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java Thu Feb  7 20:10:55 2013
@@ -32,12 +32,13 @@ import org.apache.oozie.client.Coordinat
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.coord.CoordELEvaluator;
 import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.coord.CoordUtils;
 import org.apache.oozie.coord.CoordinatorJobException;
 import org.apache.oozie.coord.SyncCoordAction;
 import org.apache.oozie.coord.TimeUnit;
 import org.apache.oozie.dependency.DependencyChecker;
-import org.apache.oozie.dependency.DependencyType;
 import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.dependency.URIHandler.DependencyType;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.service.UUIDService;
@@ -353,7 +354,7 @@ public class CoordCommandUtils {
             URIHandler uriHandler = uriService.getURIHandler(uriPath);
             uriHandler.validate(uriPath);
             uris.append(uriPath);
-            urisWithDoneFlag.append(uriHandler.getURIWithDoneFlag(uriPath, doneFlagElement));
+            urisWithDoneFlag.append(uriHandler.getURIWithDoneFlag(uriPath, CoordUtils.getDoneFlag(doneFlagElement)));
         }
         return uris.toString();
     }

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Thu Feb  7 20:10:55 2013
@@ -22,8 +22,6 @@ import java.io.StringReader;
 import java.net.URI;
 import java.util.Date;
 import java.util.List;
-import java.util.Map.Entry;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
@@ -35,7 +33,7 @@ import org.apache.oozie.client.OozieClie
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.dependency.DependencyChecker;
-import org.apache.oozie.dependency.DependencyChecker.ActionDependency;
+import org.apache.oozie.dependency.ActionDependency;
 import org.apache.oozie.dependency.URIHandler;
 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
@@ -110,6 +108,7 @@ public class CoordPushDependencyCheckXCo
                 !registerForNotification);
 
         boolean isChangeInDependency = true;
+        boolean timeout = false;
         if (actionDep.getMissingDependencies().size() == 0) { // All push-based dependencies are available
             onAllPushDependenciesAvailable();
         }
@@ -122,11 +121,12 @@ public class CoordPushDependencyCheckXCo
                 coordAction.setPushMissingDependencies(stillMissingDeps);
             }
             // Checking for timeout
-            if (!isTimeout()) {
-                queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), getCoordPushCheckRequeueInterval());
+            timeout = isTimeout();
+            if (timeout) {
+                queue(new CoordActionTimeOutXCommand(coordAction), 100);
             }
             else {
-                queue(new CoordActionTimeOutXCommand(coordAction), 100);
+                queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), getCoordPushCheckRequeueInterval());
             }
         }
 
@@ -137,6 +137,9 @@ public class CoordPushDependencyCheckXCo
         else {
             unregisterAvailableDependencies(actionDep);
         }
+        if (timeout) {
+            unregisterMissingDependencies(actionDep.getMissingDependencies());
+        }
         return null;
     }
 
@@ -156,10 +159,7 @@ public class CoordPushDependencyCheckXCo
                 .getCreatedTime().getTime()))
                 / (60 * 1000);
         int timeOut = coordAction.getTimeOut();
-        if ((timeOut >= 0) && (waitingTime > timeOut)) {
-            return true;
-        }
-        return false;
+        return (timeOut >= 0) && (waitingTime > timeOut);
     }
 
     protected void onAllPushDependenciesAvailable() {
@@ -207,52 +207,62 @@ public class CoordPushDependencyCheckXCo
     }
 
     private void unregisterAvailableDependencies(ActionDependency actionDependency) {
-        for (Entry<URIHandler, List<URI>> entry : actionDependency.getAvailableDependencies().entrySet()) {
-            URIHandler handler = entry.getKey();
-            for (URI availableURI : entry.getValue()) {
+        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+        for (String availableDep : actionDependency.getAvailableDependencies()) {
+            try {
+                URI availableURI = new URI(availableDep);
+                URIHandler handler = uriService.getURIHandler(availableURI);
                 if (handler.unregisterFromNotification(availableURI, actionId)) {
-                    LOG.debug("Succesfully unregistered uri [{0}] for actionId: [{1}] from notifications",
+                    LOG.debug("Successfully unregistered uri [{0}] for actionId: [{1}] from notifications",
                             availableURI, actionId);
                 }
                 else {
-                    LOG.warn("Unable to unregister uri [{0}] for actionId: [{1}] from notifications",
-                            availableURI, actionId);
+                    LOG.warn("Unable to unregister uri [{0}] for actionId: [{1}] from notifications", availableURI,
+                            actionId);
                 }
             }
+            catch (Exception e) {
+                LOG.warn("Exception while unregistering uri for actionId: [{0}] for notifications", actionId, e);
+            }
+        }
+    }
+
+    private void unregisterMissingDependencies(List<String> missingDeps) {
+        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+        for (String missingDep : missingDeps) {
+            try {
+                URI missingURI = new URI(missingDep);
+                URIHandler handler = uriService.getURIHandler(missingURI);
+                if (handler.unregisterFromNotification(missingURI, actionId)) {
+                    LOG.debug("Successfully unregistered uri [{0}] for actionId: [{1}] from notifications", missingURI,
+                            actionId);
+                }
+                else {
+                    LOG.warn("Unable to unregister uri [{0}] for actionId: [{1}] from notifications", missingURI,
+                            actionId);
+                }
+            }
+            catch (Exception e) {
+                LOG.warn("Exception while registering uri for actionId: [{0}] for notifications", actionId, e);
+            }
         }
     }
 
-    /*
-     * (non-Javadoc)
-     *
-     * @see org.apache.oozie.command.XCommand#getEntityKey()
-     */
     @Override
     public String getEntityKey() {
         return coordAction.getJobId();
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.XCommand#getKey()
-     */
     @Override
     public String getKey(){
         return getName() + "_" + actionId;
     }
 
-    /*
-     * (non-Javadoc)
-     *
-     * @see org.apache.oozie.command.XCommand#isLockRequired()
-     */
     @Override
     protected boolean isLockRequired() {
         return true;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.XCommand#eagerLoadState()
-     */
     @Override
     protected void eagerLoadState() throws CommandException {
         try {
@@ -272,9 +282,6 @@ public class CoordPushDependencyCheckXCo
         }
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
-     */
     @Override
     protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
         if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
@@ -298,20 +305,10 @@ public class CoordPushDependencyCheckXCo
         }
     }
 
-    /*
-     * (non-Javadoc)
-     *
-     * @see org.apache.oozie.command.XCommand#loadState()
-     */
     @Override
     protected void loadState() throws CommandException {
     }
 
-    /*
-     * (non-Javadoc)
-     *
-     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
-     */
     @Override
     protected void verifyPrecondition() throws CommandException, PreconditionException {
     }

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java Thu Feb  7 20:10:55 2013
@@ -336,7 +336,7 @@ public class CoordELFunctions {
                 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
             }
             if (uriContext != null) {
-                uriContext.dispose();
+                uriContext.destroy();
             }
             if (!resolved) {
                 // return unchanged future function with variable 'is_resolved'
@@ -1017,7 +1017,7 @@ public class CoordELFunctions {
                 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
             }
             if (uriContext != null) {
-                uriContext.dispose();
+                uriContext.destroy();
             }
             if (!resolved) {
                 // return unchanged latest function with variable 'is_resolved'

Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java Thu Feb  7 20:10:55 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.dependency;
+
+import java.util.List;
+
+public class ActionDependency {
+
+    private List<String> missingDependencies;
+    private List<String> availableDependencies;
+
+    ActionDependency(List<String> missingDependencies, List<String> availableDependencies) {
+        this.missingDependencies = missingDependencies;
+        this.availableDependencies = availableDependencies;
+    }
+
+    public List<String> getMissingDependencies() {
+        return missingDependencies;
+    }
+
+    public List<String> getAvailableDependencies() {
+        return availableDependencies;
+    }
+
+}

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java Thu Feb  7 20:10:55 2013
@@ -20,10 +20,7 @@ package org.apache.oozie.dependency;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.ErrorCode;
@@ -31,7 +28,6 @@ import org.apache.oozie.client.OozieClie
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.service.URIAccessorException;
 import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XLog;
@@ -92,63 +88,40 @@ public class DependencyChecker {
             boolean stopOnFirstMissing) throws CommandException {
         String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
         List<String> missingDeps = new ArrayList<String>();
-        Map<URIHandler, List<URI>> availableDeps = new HashMap<URIHandler, List<URI>>();
+        List<String> availableDeps = new ArrayList<String>();
         URIHandlerService uriService = Services.get().get(URIHandlerService.class);
-        int index = 0;
-        for (;index < missingDependencies.length; index++) {
-            String dependency = missingDependencies[index];
-            try {
-                URI uri = new URI(dependency);
-                URIHandler uriHandler = uriService.getURIHandler(uri);
-                LOG.debug("Checking for the availability of {0} ", dependency);
-                if (uriHandler.exists(uri, actionConf, user)) {
-                    List<URI> availableURIs = availableDeps.get(uriHandler);
-                    if (availableURIs == null) {
-                        availableURIs = new ArrayList<URI>();
-                        availableDeps.put(uriHandler, availableURIs);
+        boolean continueChecking = true;
+        try {
+            for (int index = 0; index < missingDependencies.length; index++) {
+                if (continueChecking) {
+                    String dependency = missingDependencies[index];
+
+                    URI uri = new URI(dependency);
+                    URIHandler uriHandler = uriService.getURIHandler(uri);
+                    LOG.debug("Checking for the availability of [{0}] ", dependency);
+                    if (uriHandler.exists(uri, actionConf, user)) {
+                        LOG.debug("Dependency [{0}] is available", dependency);
+                        availableDeps.add(dependency);
+                    }
+                    else {
+                        missingDeps.add(dependency);
+                        if (stopOnFirstMissing) {
+                            continueChecking = false;
+                        }
                     }
-                    availableURIs.add(uri);
+
                 }
                 else {
-                    missingDeps.add(dependency);
-                    if (stopOnFirstMissing) {
-                        index++;
-                        break;
-                    }
+                    missingDeps.add(missingDependencies[index]);
                 }
             }
-            catch (URISyntaxException e) {
-                throw new CommandException(ErrorCode.E0906, e.getMessage(), e);
-            }
-            catch (URIAccessorException e) {
-                throw new CommandException(e);
-            }
         }
-        if (stopOnFirstMissing) {
-            for (;index < missingDependencies.length; index++) {
-                missingDeps.add(missingDependencies[index]);
-            }
+        catch (URISyntaxException e) {
+            throw new CommandException(ErrorCode.E0906, e.getMessage(), e);
         }
-        return new ActionDependency(missingDeps, availableDeps);
-    }
-
-    public static class ActionDependency {
-        private List<String> missingDependencies;
-        private Map<URIHandler, List<URI>> availableDependencies;
-
-        public ActionDependency(List<String> missingDependencies, Map<URIHandler, List<URI>> availableDependencies) {
-            this.missingDependencies = missingDependencies;
-            this.availableDependencies = availableDependencies;
-        }
-
-        public List<String> getMissingDependencies() {
-            return missingDependencies;
-        }
-
-        public Map<URIHandler, List<URI>> getAvailableDependencies() {
-            return availableDependencies;
+        catch (URIHandlerException e) {
+            throw new CommandException(e);
         }
-
+        return new ActionDependency(missingDeps, availableDeps);
     }
-
 }

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIContext.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIContext.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIContext.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIContext.java Thu Feb  7 20:10:55 2013
@@ -24,11 +24,22 @@ public class FSURIContext extends URICon
 
     private FileSystem fs;
 
+    /**
+     * Create a FSURIContext that can be used to access a filesystem URI
+     *
+     * @param conf Configuration to access the URI
+     * @param user name of the user the URI should be accessed as
+     * @param fs FileSystem to access
+     */
     public FSURIContext(Configuration conf, String user, FileSystem fs) {
         super(conf, user);
         this.fs = fs;
     }
 
+    /**
+     * Get the FileSystem to access the URI
+     * @return FileSystem to access the URI
+     */
     public FileSystem getFileSystem() {
         return fs;
     }

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java Thu Feb  7 20:10:55 2013
@@ -19,59 +19,30 @@ package org.apache.oozie.dependency;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.oozie.ErrorCode;
-import org.apache.oozie.coord.CoordUtils;
+import org.apache.oozie.action.hadoop.FSLauncherURIHandler;
+import org.apache.oozie.action.hadoop.LauncherURIHandler;
 import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.service.URIAccessorException;
-import org.apache.oozie.service.URIHandlerService;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XLog;
-import org.jdom.Element;
 
-public class FSURIHandler extends URIHandler {
+public class FSURIHandler implements URIHandler {
 
-    private static XLog LOG = XLog.getLog(FSURIHandler.class);
-    private boolean isFrontEnd;
     private HadoopAccessorService service;
     private Set<String> supportedSchemes;
     private List<Class<?>> classesToShip;
 
-    public FSURIHandler() {
-        this.classesToShip = new ArrayList<Class<?>>();
-        classesToShip.add(FSURIHandler.class);
-        classesToShip.add(FSURIContext.class);
-        classesToShip.add(HadoopAccessorService.class);
-        classesToShip.add(HadoopAccessorException.class);
-        classesToShip.add(XConfiguration.class); //Not sure why it fails in init with CNFE for this.
-    }
-
     @Override
-    public void init(Configuration conf, boolean isFrontEnd) {
-        this.isFrontEnd = isFrontEnd;
-        if (isFrontEnd) {
-            service = Services.get().get(HadoopAccessorService.class);
-            supportedSchemes = service.getSupportedSchemes();
-        }
-        if (supportedSchemes == null) {
-            supportedSchemes = new HashSet<String>();
-            String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX
-                    + this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX,
-                    HadoopAccessorService.DEFAULT_SUPPORTED_SCHEMES);
-            supportedSchemes.addAll(Arrays.asList(schemes));
-        }
+    public void init(Configuration conf) {
+        service = Services.get().get(HadoopAccessorService.class);
+        supportedSchemes = service.getSupportedSchemes();
+        classesToShip = new FSLauncherURIHandler().getClassesForLauncher();
     }
 
     @Override
@@ -79,18 +50,24 @@ public class FSURIHandler extends URIHan
         return supportedSchemes;
     }
 
-    public Collection<Class<?>> getClassesToShip() {
+    @Override
+    public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
+        return FSLauncherURIHandler.class;
+    }
+
+    @Override
+    public List<Class<?>> getClassesForLauncher() {
         return classesToShip;
     }
 
     @Override
-    public DependencyType getDependencyType(URI uri) throws URIAccessorException {
+    public DependencyType getDependencyType(URI uri) throws URIHandlerException {
         return DependencyType.PULL;
     }
 
     @Override
     public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
-            throws URIAccessorException {
+            throws URIHandlerException {
         throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme());
     }
 
@@ -100,19 +77,13 @@ public class FSURIHandler extends URIHan
     }
 
     @Override
-    public URIContext getURIContext(URI uri, Configuration conf, String user) throws URIAccessorException {
+    public URIContext getURIContext(URI uri, Configuration conf, String user) throws URIHandlerException {
         FileSystem fs = getFileSystem(uri, conf, user);
         return new FSURIContext(conf, user, fs);
     }
 
     @Override
-    public boolean create(URI uri, Configuration conf, String user) throws URIAccessorException {
-        FileSystem fs = getFileSystem(uri, conf, user);
-        return create(fs, uri);
-    }
-
-    @Override
-    public boolean exists(URI uri, URIContext uriContext) throws URIAccessorException {
+    public boolean exists(URI uri, URIContext uriContext) throws URIHandlerException {
         try {
             FileSystem fs = ((FSURIContext) uriContext).getFileSystem();
             return fs.exists(getNormalizedPath(uri));
@@ -123,7 +94,7 @@ public class FSURIHandler extends URIHan
     }
 
     @Override
-    public boolean exists(URI uri, Configuration conf, String user) throws URIAccessorException {
+    public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
         try {
             FileSystem fs = getFileSystem(uri, conf, user);
             return fs.exists(getNormalizedPath(uri));
@@ -134,22 +105,7 @@ public class FSURIHandler extends URIHan
     }
 
     @Override
-    public boolean delete(URI uri, Configuration conf, String user) throws URIAccessorException {
-        FileSystem fs = getFileSystem(uri, conf, user);
-        return delete(fs, uri);
-    }
-
-    @Override
-    public String getURIWithDoneFlag(String uri, Element doneFlagElement) throws URIAccessorException {
-        String doneFlag = CoordUtils.getDoneFlag(doneFlagElement);
-        if (doneFlag.length() > 0) {
-            uri += "/" + doneFlag;
-        }
-        return uri;
-    }
-
-    @Override
-    public String getURIWithDoneFlag(String uri, String doneFlag) throws URIAccessorException {
+    public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
         if (doneFlag.length() > 0) {
             uri += "/" + doneFlag;
         }
@@ -157,7 +113,7 @@ public class FSURIHandler extends URIHan
     }
 
     @Override
-    public void validate(String uri) throws URIAccessorException {
+    public void validate(String uri) throws URIHandlerException {
     }
 
     @Override
@@ -171,65 +127,11 @@ public class FSURIHandler extends URIHan
     }
 
     private FileSystem getFileSystem(URI uri, Configuration conf, String user) throws HadoopAccessorException {
-        if (isFrontEnd) {
-            if (user == null) {
-                throw new HadoopAccessorException(ErrorCode.E0902, "user has to be specified to access FileSystem");
-            }
-            Configuration fsConf = service.createJobConf(uri.getAuthority());
-            return service.createFileSystem(user, uri, fsConf);
-        }
-        else {
-            try {
-                if (user != null && !user.equals(UserGroupInformation.getLoginUser().getShortUserName())) {
-                    throw new HadoopAccessorException(ErrorCode.E0902,
-                            "Cannot access FileSystem as a different user in backend");
-                }
-                return FileSystem.get(uri, conf);
-            }
-            catch (IOException e) {
-                throw new HadoopAccessorException(ErrorCode.E0902, e);
-            }
-        }
-    }
-
-    private boolean create(FileSystem fs, URI uri) throws URIAccessorException {
-        Path path = getNormalizedPath(uri);
-        try {
-            if (!fs.exists(path)) {
-                boolean status = fs.mkdirs(path);
-                if (status) {
-                    LOG.info("Creating directory at {0} succeeded.", path);
-                }
-                else {
-                    LOG.info("Creating directory at {0} failed.", path);
-                }
-                return status;
-            }
-        }
-        catch (IOException e) {
-            throw new HadoopAccessorException(ErrorCode.E0902, e);
-        }
-        return false;
-    }
-
-    private boolean delete(FileSystem fs, URI uri) throws URIAccessorException {
-        Path path = getNormalizedPath(uri);
-        try {
-            if (fs.exists(path)) {
-                boolean status = fs.delete(path, true);
-                if (status) {
-                    LOG.info("Deletion of path {0} succeeded.", path);
-                }
-                else {
-                    LOG.info("Deletion of path {0} failed.", path);
-                }
-                return status;
-            }
-        }
-        catch (IOException e) {
-            throw new HadoopAccessorException(ErrorCode.E0902, e);
+        if (user == null) {
+            throw new HadoopAccessorException(ErrorCode.E0902, "user has to be specified to access FileSystem");
         }
-        return false;
+        Configuration fsConf = service.createJobConf(uri.getAuthority());
+        return service.createFileSystem(user, uri, fsConf);
     }
 
 }

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java Thu Feb  7 20:10:55 2013
@@ -19,25 +19,41 @@ package org.apache.oozie.dependency;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hcatalog.api.HCatClient;
+import org.apache.oozie.util.XLog;
 
 public class HCatURIContext extends URIContext {
 
+    private static XLog LOG = XLog.getLog(HCatURIContext.class);
     private HCatClient hcatClient;
 
+    /**
+     * Create a HCatURIContext that can be used to access a hcat URI
+     *
+     * @param conf Configuration to access the URI
+     * @param user name of the user the URI should be accessed as
+     * @param hcatClient HCatClient to talk to hcatalog server
+     */
     public HCatURIContext(Configuration conf, String user, HCatClient hcatClient) {
         super(conf, user);
         this.hcatClient = hcatClient;
     }
 
+    /**
+     * Get the HCatClient to talk to hcatalog server
+     *
+     * @return HCatClient to talk to hcatalog server
+     */
     public HCatClient getHCatClient() {
         return hcatClient;
     }
 
-    public void dispose() {
+    @Override
+    public void destroy() {
         try {
             hcatClient.close();
         }
         catch (Exception ignore) {
+            LOG.warn("Error closing hcat client", ignore);
         }
     }
 

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java Thu Feb  7 20:10:55 2013
@@ -20,10 +20,7 @@ package org.apache.oozie.dependency;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -38,53 +35,32 @@ import org.apache.hcatalog.api.HCatClien
 import org.apache.hcatalog.api.HCatPartition;
 import org.apache.hcatalog.common.HCatException;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.hadoop.HCatLauncherURIHandler;
+import org.apache.oozie.action.hadoop.LauncherURIHandler;
 import org.apache.oozie.jms.HCatMessageHandler;
-import org.apache.oozie.service.JMSAccessorService;
+import org.apache.oozie.service.HCatAccessorService;
 import org.apache.oozie.service.MetaDataAccessorException;
 import org.apache.oozie.service.PartitionDependencyManagerService;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.service.URIAccessorException;
 import org.apache.oozie.service.URIHandlerService;
-import org.apache.oozie.service.UserGroupInformationService;
 import org.apache.oozie.util.HCatURI;
 import org.apache.oozie.util.XLog;
-import org.jdom.Element;
 
-public class HCatURIHandler extends URIHandler {
+public class HCatURIHandler implements URIHandler {
 
     private static XLog LOG = XLog.getLog(HCatURIHandler.class);
-    private boolean isFrontEnd;
     private Set<String> supportedSchemes;
-    private UserGroupInformationService ugiService;
-    private List<Class<?>> classesToShip;
     private Map<String, DependencyType> dependencyTypes;
-    private Set<String> registeredTopicURIs;
-
-    public HCatURIHandler() {
-        this.classesToShip = new ArrayList<Class<?>>();
-        classesToShip.add(HCatURIHandler.class);
-        classesToShip.add(HCatURIContext.class);
-        classesToShip.add(HCatURI.class);
-        classesToShip.add(MetaDataAccessorException.class);
-        classesToShip.add(UserGroupInformationService.class);
-        classesToShip.add(JMSAccessorService.class);
-        classesToShip.add(PartitionDependencyManagerService.class);
-        classesToShip.add(HCatMessageHandler.class);
-        classesToShip.add(DependencyType.class);
-    }
+    private List<Class<?>> classesToShip;
 
     @Override
-    public void init(Configuration conf, boolean isFrontEnd) {
-        this.isFrontEnd = isFrontEnd;
-        if (isFrontEnd) {
-            ugiService = Services.get().get(UserGroupInformationService.class);
-            dependencyTypes = new HashMap<String, DependencyType>();
-            registeredTopicURIs = new HashSet<String>();
-        }
+    public void init(Configuration conf) {
+        dependencyTypes = new HashMap<String, DependencyType>();
         supportedSchemes = new HashSet<String>();
         String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX
                 + this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "hcat");
         supportedSchemes.addAll(Arrays.asList(schemes));
+        classesToShip = new HCatLauncherURIHandler().getClassesForLauncher();
     }
 
     @Override
@@ -92,46 +68,51 @@ public class HCatURIHandler extends URIH
         return supportedSchemes;
     }
 
-    public Collection<Class<?>> getClassesToShip() {
+    @Override
+    public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
+        return HCatLauncherURIHandler.class;
+    }
+
+    @Override
+    public List<Class<?>> getClassesForLauncher() {
         return classesToShip;
     }
 
     @Override
-    public DependencyType getDependencyType(URI uri) throws URIAccessorException {
-        JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
-        if (jmsService == null) {
-            return DependencyType.PULL;
-        }
-        DependencyType depType = dependencyTypes.get(uri.getAuthority());
-        if (depType == null) {
-             depType = jmsService.isKnownPublisher(uri) ? DependencyType.PUSH : DependencyType.PULL;
-             dependencyTypes.put(uri.getAuthority(), depType);
+    public DependencyType getDependencyType(URI uri) throws URIHandlerException {
+        DependencyType depType = DependencyType.PULL;
+        // Not initializing in constructor as this will be part of oozie.services.ext
+        // and will be initialized after URIHandlerService
+        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+        if (hcatService != null) {
+            depType = dependencyTypes.get(uri.getAuthority());
+            if (depType == null) {
+                depType = hcatService.isKnownPublisher(uri) ? DependencyType.PUSH : DependencyType.PULL;
+                dependencyTypes.put(uri.getAuthority(), depType);
+            }
         }
         return depType;
- }
+    }
 
     @Override
-    public void registerForNotification(URI uri, Configuration conf, String user, String actionID) throws URIAccessorException {
-        String uriString = uri.toString();
-        String uriMinusPartition = uriString.substring(0, uriString.lastIndexOf("/"));
+    public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
+            throws URIHandlerException {
         HCatURI hcatURI;
         try {
             hcatURI = new HCatURI(uri);
         }
         catch (URISyntaxException e) {
-            throw new URIAccessorException(ErrorCode.E0906, uri, e);
+            throw new URIHandlerException(ErrorCode.E0906, uri, e);
         }
-        // Check cache to avoid a call to hcat server to get the topic
-        if (!registeredTopicURIs.contains(uriMinusPartition)) {
+        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+        if (!hcatService.isRegisteredForNotification(hcatURI)) {
             HCatClient client = getHCatClient(uri, conf, user);
             try {
                 String topic = client.getMessageBusTopicName(hcatURI.getDb(), hcatURI.getTable());
                 if (topic == null) {
                     return;
                 }
-                JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
-                jmsService.registerForNotification(uri, topic, new HCatMessageHandler(uri.getAuthority()));
-                registeredTopicURIs.add(uriMinusPartition);
+                hcatService.registerForNotification(hcatURI, topic, new HCatMessageHandler(uri.getAuthority()));
             }
             catch (HCatException e) {
                 throw new MetaDataAccessorException(ErrorCode.E1504, e);
@@ -151,59 +132,42 @@ public class HCatURIHandler extends URIH
             hcatURI = new HCatURI(uri);
         }
         catch (URISyntaxException e) {
-            throw new RuntimeException(e); //Unexpected at this point
+            throw new RuntimeException(e); // Unexpected at this point
         }
         PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
         return pdmService.removeMissingDependency(hcatURI, actionID);
     }
 
     @Override
-    public URIContext getURIContext(URI uri, Configuration conf, String user) throws URIAccessorException {
+    public URIContext getURIContext(URI uri, Configuration conf, String user) throws URIHandlerException {
         HCatClient client = getHCatClient(uri, conf, user);
         return new HCatURIContext(conf, user, client);
     }
 
     @Override
-    public boolean create(URI uri, Configuration conf, String user) throws URIAccessorException {
-        throw new MetaDataAccessorException(ErrorCode.E0902, new UnsupportedOperationException(
-                "Add partition not supported"));
-    }
-
-    @Override
-    public boolean exists(URI uri, URIContext uriContext) throws URIAccessorException {
+    public boolean exists(URI uri, URIContext uriContext) throws URIHandlerException {
         HCatClient client = ((HCatURIContext) uriContext).getHCatClient();
         return exists(uri, client, false);
     }
 
     @Override
-    public boolean exists(URI uri, Configuration conf, String user) throws URIAccessorException {
+    public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
         HCatClient client = getHCatClient(uri, conf, user);
         return exists(uri, client, true);
     }
 
     @Override
-    public boolean delete(URI uri, Configuration conf, String user) throws URIAccessorException {
-        HCatClient hCatClient = getHCatClient(uri, conf, user);
-        return delete(hCatClient, uri, true);
-    }
-
-    @Override
-    public String getURIWithDoneFlag(String uri, Element doneFlagElement) throws URIAccessorException {
-        return uri;
-    }
-
-    @Override
-    public String getURIWithDoneFlag(String uri, String doneFlag) throws URIAccessorException {
+    public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
         return uri;
     }
 
     @Override
-    public void validate(String uri) throws URIAccessorException {
+    public void validate(String uri) throws URIHandlerException {
         try {
-            new HCatURI(uri);  //will fail if uri syntax is incorrect
+            new HCatURI(uri); // will fail if uri syntax is incorrect
         }
         catch (URISyntaxException e) {
-            throw new URIAccessorException(ErrorCode.E0906, uri, e);
+            throw new URIHandlerException(ErrorCode.E0906, uri, e);
         }
 
     }
@@ -220,32 +184,23 @@ public class HCatURIHandler extends URIH
             hiveConf.set("hive.metastore.local", "false");
         }
         hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
-        HCatClient client = null;
         try {
             LOG.info("Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user,
                     UserGroupInformation.getLoginUser(), serverURI);
-            if (isFrontEnd) {
-                if (user == null) {
-                    throw new MetaDataAccessorException(ErrorCode.E0902,
-                            "user has to be specified to access metastore server");
-                }
-                UserGroupInformation ugi = ugiService.getProxyUser(user);
-                client = ugi.doAs(new PrivilegedExceptionAction<HCatClient>() {
-                    public HCatClient run() throws Exception {
-                        return HCatClient.create(hiveConf);
-                    }
-                });
-            }
-            else {
-                if (user != null && !user.equals(UserGroupInformation.getLoginUser().getShortUserName())) {
-                    throw new MetaDataAccessorException(ErrorCode.E0902,
-                            "Cannot access metastore server as a different user in backend");
+
+            // HiveMetastoreClient (hive 0.9) currently does not work if UGI has doAs
+            // We are good to connect as the oozie user since listPartitions does not require
+            // authorization
+            /*
+            UserGroupInformation ugi = ugiService.getProxyUser(user);
+            return ugi.doAs(new PrivilegedExceptionAction<HCatClient>() {
+                public HCatClient run() throws Exception {
+                    return HCatClient.create(hiveConf);
                 }
-                // Delegation token fetched from metastore has new Text() as service and HiveMetastoreClient
-                // looks for the same if not overriden by hive.metastore.token.signature
-                // We are good as long as HCatCredentialHelper does not change the service of the token.
-                client = HCatClient.create(hiveConf);
-            }
+            });
+            */
+
+            return HCatClient.create(hiveConf);
         }
         catch (HCatException e) {
             throw new MetaDataAccessorException(ErrorCode.E1504, e);
@@ -253,21 +208,20 @@ public class HCatURIHandler extends URIH
         catch (IOException e) {
             throw new MetaDataAccessorException(ErrorCode.E1504, e);
         }
-        catch (InterruptedException e) {
-            throw new MetaDataAccessorException(ErrorCode.E1504, e);
-        }
 
-        return client;
     }
 
     private String getMetastoreConnectURI(URI uri) {
+        String metastoreURI;
         // For unit tests
         if (uri.getAuthority().equals("unittest-local")) {
-            return "";
+            metastoreURI = "";
+        }
+        else {
+            // Hardcoding hcat to thrift mapping till support for webhcat(templeton)
+            // is added
+            metastoreURI = "thrift://" + uri.getAuthority();
         }
-        // Hardcoding hcat to thrift mapping till support for webhcat(templeton)
-        // is added
-        String metastoreURI = "thrift://" + uri.getAuthority();
         return metastoreURI;
     }
 
@@ -276,31 +230,7 @@ public class HCatURIHandler extends URIH
             HCatURI hcatURI = new HCatURI(uri.toString());
             List<HCatPartition> partitions = client.getPartitions(hcatURI.getDb(), hcatURI.getTable(),
                     hcatURI.getPartitionMap());
-            if (partitions == null || partitions.isEmpty()) {
-                return false;
-            }
-            return true;
-        }
-        catch (ConnectionFailureException e) {
-            throw new MetaDataAccessorException(ErrorCode.E1504, e);
-        }
-        catch (HCatException e) {
-            throw new MetaDataAccessorException(ErrorCode.E0902, e);
-        }
-        catch (URISyntaxException e) {
-            throw new MetaDataAccessorException(ErrorCode.E0902, e);
-        }
-        finally {
-            closeQuietly(client, closeClient);
-        }
-    }
-
-    private boolean delete(HCatClient client, URI uri, boolean closeClient) throws URIAccessorException {
-        try {
-            HCatURI hcatURI = new HCatURI(uri.toString());
-            client.dropPartitions(hcatURI.getDb(), hcatURI.getTable(), hcatURI.getPartitionMap(), true);
-            LOG.info("Dropped partitions for " + uri);
-            return true;
+            return (partitions != null && !partitions.isEmpty());
         }
         catch (ConnectionFailureException e) {
             throw new MetaDataAccessorException(ErrorCode.E1504, e);
@@ -322,6 +252,7 @@ public class HCatURIHandler extends URIH
                 client.close();
             }
             catch (Exception ignore) {
+                LOG.warn("Error closing hcat client", ignore);
             }
         }
     }

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java Thu Feb  7 20:10:55 2013
@@ -24,20 +24,36 @@ public abstract class URIContext {
     private Configuration conf;
     private String user;
 
+    /**
+     * Create a URIContext that can be used to access a URI
+     * @param conf Configuration to access the URI
+     * @param user name of the user the URI should be accessed as
+     */
     public URIContext(Configuration conf, String user) {
         this.conf = conf;
         this.user = user;
     }
 
+    /**
+     * Get the Configuration to access the URI
+     * @return Configuration to access the URI
+     */
     public Configuration getConfiguration() {
         return conf;
     }
 
+    /**
+     * Get the name of the user the URI will be accessed as
+     * @return the user name the URI will be accessed as
+     */
     public String getUser() {
         return user;
     }
 
-    public void dispose() {
+    /**
+     * Destroy the URIContext
+     */
+    public void destroy() {
     }
 
 }