You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/02/07 21:10:57 UTC
svn commit: r1443694 [1/3] - in /oozie/branches/hcat-intre: ./
core/src/main/java/org/apache/oozie/action/hadoop/
core/src/main/java/org/apache/oozie/client/rest/
core/src/main/java/org/apache/oozie/command/coord/
core/src/main/java/org/apache/oozie/co...
Author: virag
Date: Thu Feb 7 20:10:55 2013
New Revision: 1443694
URL: http://svn.apache.org/r1443694
Log:
OOZIE-1210 Rework uri handling for Prepare actions and jms server mapping (rohini via virag)
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandlerException.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSConnectionInfo.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorService.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherFSURIHandler.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherHCatURIHandler.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestHCatAccessorService.java
Removed:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/DependencyType.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/URIAccessorException.java
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIContext.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/cache/SimpleHCatDependencyCache.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorException.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetaDataAccessorException.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/URIHandlerService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/UserGroupInformationService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/XLog.java
oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/dependency/TestFSURIHandler.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/dependency/TestHCatURIHandler.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/dependency/TestURIHandlerService.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestHCatMessageHandler.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XTestCase.java
oozie/branches/hcat-intre/release-log.txt
oozie/branches/hcat-intre/tests/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java Thu Feb 7 20:10:55 2013
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FSLauncherURIHandler implements LauncherURIHandler {
+
+ @Override
+ public boolean create(URI uri, Configuration conf) throws LauncherException {
+ boolean status = false;
+ try {
+ FileSystem fs = FileSystem.get(uri, conf);
+ Path path = getNormalizedPath(uri);
+ if (!fs.exists(path)) {
+ status = fs.mkdirs(path);
+ if (status) {
+ System.out.println("Creating directory at " + path + " succeeded.");
+ }
+ else {
+ System.out.println("Creating directory at " + path + " failed.");
+ }
+ }
+ }
+ catch (IOException e) {
+ throw new LauncherException("Creating directory at " + uri + " failed.", e);
+ }
+ return status;
+ }
+
+ @Override
+ public boolean delete(URI uri, Configuration conf) throws LauncherException {
+ boolean status = false;
+ try {
+ FileSystem fs = FileSystem.get(uri, conf);
+ Path path = getNormalizedPath(uri);
+ if (fs.exists(path)) {
+ status = fs.delete(path, true);
+ if (status) {
+ System.out.println("Deletion of path " + path + " succeeded.");
+ }
+ else {
+ System.out.println("Deletion of path " + path + " failed.");
+ }
+ }
+ }
+ catch (IOException e) {
+ throw new LauncherException("Deletion of path " + uri + " failed.", e);
+ }
+ return status;
+ }
+
+ private Path getNormalizedPath(URI uri) {
+ // Normalizes uri path replacing // with / in the path which users specify by mistake
+ return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
+ }
+
+ @Override
+ public List<Class<?>> getClassesForLauncher() {
+ return null;
+ }
+
+}
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java Thu Feb 7 20:10:55 2013
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hcatalog.api.ConnectionFailureException;
+import org.apache.hcatalog.api.HCatClient;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.oozie.util.HCatURI;
+
+public class HCatLauncherURIHandler implements LauncherURIHandler {
+
+ private static List<Class<?>> classesToShip = new ArrayList<Class<?>>();
+
+ static {
+ classesToShip.add(HCatURI.class);
+ }
+
+ @Override
+ public boolean create(URI uri, Configuration conf) throws LauncherException {
+ throw new UnsupportedOperationException("Creation of partition is not supported for " + uri);
+ }
+
+ @Override
+ public boolean delete(URI uri, Configuration conf) throws LauncherException {
+ HCatClient client = getHCatClient(uri, conf);
+ try {
+ HCatURI hcatURI = new HCatURI(uri.toString());
+ client.dropPartitions(hcatURI.getDb(), hcatURI.getTable(), hcatURI.getPartitionMap(), true);
+ System.out.println("Dropped partitions for " + uri);
+ return true;
+ }
+ catch (ConnectionFailureException e) {
+ throw new LauncherException("Error trying to drop " + uri, e);
+ }
+ catch (HCatException e) {
+ throw new LauncherException("Error trying to drop " + uri, e);
+ }
+ catch (URISyntaxException e) {
+ throw new LauncherException("Error trying to drop " + uri, e);
+ }
+ finally {
+ closeQuietly(client);
+ }
+ }
+
+ private HCatClient getHCatClient(URI uri, Configuration conf) throws LauncherException {
+ final HiveConf hiveConf = new HiveConf(conf, this.getClass());
+ String serverURI = getMetastoreConnectURI(uri);
+ if (!serverURI.equals("")) {
+ hiveConf.set("hive.metastore.local", "false");
+ }
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
+ try {
+ System.out.println("Creating HCatClient for user=" + UserGroupInformation.getLoginUser() + " and server="
+ + serverURI);
+ // Delegation token fetched from metastore has new Text() as service and
+ // HiveMetastoreClient looks for the same if not overriden by hive.metastore.token.signature
+ // We are good as long as HCatCredentialHelper does not change the service of the token.
+ return HCatClient.create(hiveConf);
+ }
+ catch (HCatException e) {
+ throw new LauncherException("Error trying to connect to " + serverURI, e);
+ }
+ catch (IOException e) {
+ throw new LauncherException("Error trying to connect to " + serverURI, e);
+ }
+ }
+
+ private String getMetastoreConnectURI(URI uri) {
+ String metastoreURI;
+ // For unit tests
+ if (uri.getAuthority().equals("unittest-local")) {
+ metastoreURI = "";
+ }
+ else {
+ // Hardcoding hcat to thrift mapping till support for webhcat(templeton)
+ // is added
+ metastoreURI = "thrift://" + uri.getAuthority();
+ }
+ return metastoreURI;
+ }
+
+ private void closeQuietly(HCatClient client) {
+ if (client != null) {
+ try {
+ client.close();
+ }
+ catch (Exception ignore) {
+ System.err.println("Error closing hcat client");
+ ignore.printStackTrace(System.err);
+ }
+ }
+ }
+
+ @Override
+ public List<Class<?>> getClassesForLauncher() {
+ return classesToShip;
+ }
+
+}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java Thu Feb 7 20:10:55 2013
@@ -129,7 +129,7 @@ public class JavaActionExecutor extends
classes.add(LauncherException.class);
classes.add(LauncherMainException.class);
classes.add(PrepareActionsDriver.class);
- classes.addAll(Services.get().get(URIHandlerService.class).getURIHandlerClassesToShip());
+ classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
classes.add(ActionStats.class);
classes.add(ActionType.class);
return classes;
@@ -586,7 +586,7 @@ public class JavaActionExecutor extends
prepareXML);
LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
- LauncherMapper.setupURIServiceConf(launcherJobConf);
+ LauncherMapper.setupLauncherURIHandlerConf(launcherJobConf);
LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
LauncherMapper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java Thu Feb 7 20:10:55 2013
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+public class LauncherException extends Exception {
+
+ /**
+ * Constructs an <code>LauncherException</code> with the
+ * specified detail message.
+ *
+ * @param message the detail message.
+ */
+ LauncherException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and cause.
+ *
+ * @param message the detail message
+ * @param cause the cause exception
+ */
+ LauncherException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java Thu Feb 7 20:10:55 2013
@@ -150,8 +150,8 @@ public class LauncherMapper<K1, V1, K2,
launcherConf.set(CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
}
- public static void setupURIServiceConf(Configuration launcherConf) {
- for(Entry<String, String> entry : Services.get().get(URIHandlerService.class).getURIHandlerServiceConfig()) {
+ public static void setupLauncherURIHandlerConf(Configuration launcherConf) {
+ for(Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) {
launcherConf.set(entry.getKey(), entry.getValue());
}
}
@@ -802,14 +802,3 @@ class LauncherSecurityManager extends Se
exitCode = 0;
}
}
-
-class LauncherException extends Exception {
-
- LauncherException(String message) {
- super(message);
- }
-
- LauncherException(String message, Throwable cause) {
- super(message, cause);
- }
-}
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java Thu Feb 7 20:10:55 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+public interface LauncherURIHandler {
+
+ /**
+ * Create the resource identified by the URI
+ *
+ * @param uri URI of the dependency
+ * @param conf Configuration to access the URI
+ *
+ * @return <code>true</code> if the URI did not exist and was successfully
+ * created; <code>false</code> if the URI already existed
+ *
+ * @throws LauncherException
+ */
+ public boolean create(URI uri, Configuration conf) throws LauncherException;
+
+ /**
+ * Delete the resource identified by the URI
+ *
+ * @param uri URI of the dependency
+ * @param conf Configuration to access the URI
+ *
+ * @return <code>true</code> if the URI exists and was successfully deleted;
+ * <code>false</code> if the URI does not exist
+ * @throws LauncherException
+ */
+ public boolean delete(URI uri, Configuration conf) throws LauncherException;
+
+
+ /**
+ * Get list of classes to ship to launcher for LauncherURIHandler
+ *
+ * @return list of classes to ship to launcher
+ */
+ public List<Class<?>> getClassesForLauncher();
+
+}
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java Thu Feb 7 20:10:55 2013
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class LauncherURIHandlerFactory {
+
+ public static final String CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX = "oozie.launcher.action.urihandler.scheme.";
+ private Configuration conf;
+
+ public LauncherURIHandlerFactory(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Get LauncherURIHandler to perform operations on a URI in the launcher
+ * @param uri
+ * @return LauncherURIHandler to perform operations on the URI
+ * @throws LauncherException
+ */
+ public LauncherURIHandler getURIHandler(URI uri) throws LauncherException {
+ if (uri.getScheme() == null) {
+ throw new LauncherException("Scheme not present in uri " + uri);
+ }
+ else {
+ String className = conf.get(CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + uri.getScheme());
+ if (className == null) {
+ className = conf.get(CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + "*");
+ }
+ if (className == null) {
+ throw new LauncherException("Scheme " + uri.getScheme() + " not supported in uri " + uri.toString());
+ }
+ Class<?> clazz;
+ try {
+ clazz = Class.forName(className);
+ }
+ catch (ClassNotFoundException e) {
+ throw new LauncherException("Error instantiating LauncherURIHandler", e);
+ }
+ return (LauncherURIHandler) ReflectionUtils.newInstance(clazz, null);
+ }
+ }
+
+}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java Thu Feb 7 20:10:55 2013
@@ -24,9 +24,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.oozie.dependency.URIHandler;
-import org.apache.oozie.service.URIAccessorException;
-import org.apache.oozie.service.URIHandlerService;
import org.xml.sax.SAXException;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
@@ -55,8 +52,7 @@ public class PrepareActionsDriver {
// Get the list of child nodes, basically, each one corresponding to a separate action
NodeList nl = doc.getDocumentElement().getChildNodes();
- URIHandlerService service = new URIHandlerService();
- service.init(conf, false);
+ LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf);
for (int i = 0; i < nl.getLength(); ++i) {
Node n = nl.item(i);
@@ -66,7 +62,7 @@ public class PrepareActionsDriver {
}
String path = n.getAttributes().getNamedItem("path").getNodeValue().trim();
URI uri = new URI(path);
- URIHandler handler = service.getURIHandler(uri, true);
+ LauncherURIHandler handler = factory.getURIHandler(uri);
execute(operation, uri, handler, conf);
}
} catch (IOException ioe) {
@@ -77,10 +73,6 @@ public class PrepareActionsDriver {
throw new LauncherException(pce.getMessage(), pce);
} catch (URISyntaxException use) {
throw new LauncherException(use.getMessage(), use);
- } catch (URIAccessorException uae) {
- throw new LauncherException(uae.getMessage(), uae);
- } catch (ClassNotFoundException cnfe) {
- throw new LauncherException(cnfe.getMessage(), cnfe);
}
}
@@ -89,15 +81,14 @@ public class PrepareActionsDriver {
*
* @param n Child node of the prepare XML
* @throws LauncherException
- * @throws URIAccessorException
*/
- private static void execute(String operation, URI uri, URIHandler handler, Configuration conf)
- throws URIAccessorException {
+ private static void execute(String operation, URI uri, LauncherURIHandler handler, Configuration conf)
+ throws LauncherException {
if (operation.equals("delete")) {
- handler.delete(uri, conf, null);
+ handler.delete(uri, conf);
}
else if (operation.equals("mkdir")) {
- handler.create(uri, conf, null);
+ handler.create(uri, conf);
}
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java Thu Feb 7 20:10:55 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -139,7 +139,7 @@ public class JsonCoordinatorJob implemen
public JSONObject toJSONObject() {
return toJSONObject("GMT");
}
-
+
@SuppressWarnings("unchecked")
public JSONObject toJSONObject(String timeZoneId) {
JSONObject json = new JSONObject();
@@ -156,7 +156,7 @@ public class JsonCoordinatorJob implemen
json.put(JsonTags.COORDINATOR_JOB_CONCURRENCY, getConcurrency());
json.put(JsonTags.COORDINATOR_JOB_TIMEOUT, getTimeout());
json.put(JsonTags.COORDINATOR_JOB_LAST_ACTION_TIME, JsonUtils.formatDateRfc822(getLastActionTime(), timeZoneId));
- json.put(JsonTags.COORDINATOR_JOB_NEXT_MATERIALIZED_TIME,
+ json.put(JsonTags.COORDINATOR_JOB_NEXT_MATERIALIZED_TIME,
JsonUtils.formatDateRfc822(getNextMaterializedTime(), timeZoneId));
json.put(JsonTags.COORDINATOR_JOB_START_TIME, JsonUtils.formatDateRfc822(getStartTime(), timeZoneId));
json.put(JsonTags.COORDINATOR_JOB_END_TIME, JsonUtils.formatDateRfc822(getEndTime(), timeZoneId));
@@ -365,7 +365,7 @@ public class JsonCoordinatorJob implemen
@Override
public String toString() {
- return MessageFormat.format("Coornidator application id[{0}] status[{1}]", getId(), getStatus());
+ return MessageFormat.format("Coordinator application id[{0}] status[{1}]", getId(), getStatus());
}
public void setActions(List<? extends JsonCoordinatorAction> nodes) {
@@ -405,8 +405,6 @@ public class JsonCoordinatorJob implemen
/**
* Set pending to true
- *
- * @param pending set pending to true
*/
public void setPending() {
this.pending = 1;
@@ -414,8 +412,6 @@ public class JsonCoordinatorJob implemen
/**
* Set pending to false
- *
- * @param pending set pending to false
*/
public void resetPending() {
this.pending = 0;
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java Thu Feb 7 20:10:55 2013
@@ -36,6 +36,7 @@ import org.apache.oozie.command.Precondi
import org.apache.oozie.coord.CoordELEvaluator;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.dependency.URIHandlerException;
import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
@@ -44,7 +45,6 @@ import org.apache.oozie.executor.jpa.JPA
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
-import org.apache.oozie.service.URIAccessorException;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ELEvaluator;
@@ -134,8 +134,8 @@ public class CoordActionInputCheckXComma
nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
}
String nonExistListStr = nonExistList.toString();
- if (!nonExistListStr.equals(missingDeps) || missingDeps.isEmpty()) {
- // missingDeps empty means action should become READY
+ if (!nonExistListStr.equals(missingDeps)) {
+ // missingDeps null means action should become READY
isChangeInDependency = true;
coordAction.setMissingDependencies(nonExistListStr);
}
@@ -155,7 +155,13 @@ public class CoordActionInputCheckXComma
}
}
else {
- queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ if (!nonExistListStr.isEmpty() && pushDeps == null || pushDeps.length() == 0) {
+ queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ }
+ else {
+ // Let CoordPushDependencyCheckXCommand queue the timeout
+ queue(new CoordPushDependencyCheckXCommand(coordAction.getId()));
+ }
}
}
catch (Exception e) {
@@ -174,10 +180,7 @@ public class CoordActionInputCheckXComma
.getCreatedTime().getTime()))
/ (60 * 1000);
int timeOut = coordAction.getTimeOut();
- if ((timeOut >= 0) && (waitingTime > timeOut)) {
- return true;
- }
- return false;
+ return (timeOut >= 0) && (waitingTime > timeOut);
}
private void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
@@ -456,7 +459,7 @@ public class CoordActionInputCheckXComma
URIHandler handler = service.getURIHandler(uri);
return handler.exists(uri, actionConf, user);
}
- catch (URIAccessorException e) {
+ catch (URIHandlerException e) {
coordAction.setErrorCode(e.getErrorCode().toString());
coordAction.setErrorMessage(e.getMessage());
throw new IOException(e);
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java Thu Feb 7 20:10:55 2013
@@ -32,12 +32,13 @@ import org.apache.oozie.client.Coordinat
import org.apache.oozie.command.CommandException;
import org.apache.oozie.coord.CoordELEvaluator;
import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.coord.CoordUtils;
import org.apache.oozie.coord.CoordinatorJobException;
import org.apache.oozie.coord.SyncCoordAction;
import org.apache.oozie.coord.TimeUnit;
import org.apache.oozie.dependency.DependencyChecker;
-import org.apache.oozie.dependency.DependencyType;
import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.dependency.URIHandler.DependencyType;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.service.UUIDService;
@@ -353,7 +354,7 @@ public class CoordCommandUtils {
URIHandler uriHandler = uriService.getURIHandler(uriPath);
uriHandler.validate(uriPath);
uris.append(uriPath);
- urisWithDoneFlag.append(uriHandler.getURIWithDoneFlag(uriPath, doneFlagElement));
+ urisWithDoneFlag.append(uriHandler.getURIWithDoneFlag(uriPath, CoordUtils.getDoneFlag(doneFlagElement)));
}
return uris.toString();
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Thu Feb 7 20:10:55 2013
@@ -22,8 +22,6 @@ import java.io.StringReader;
import java.net.URI;
import java.util.Date;
import java.util.List;
-import java.util.Map.Entry;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
@@ -35,7 +33,7 @@ import org.apache.oozie.client.OozieClie
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.dependency.DependencyChecker;
-import org.apache.oozie.dependency.DependencyChecker.ActionDependency;
+import org.apache.oozie.dependency.ActionDependency;
import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
@@ -110,6 +108,7 @@ public class CoordPushDependencyCheckXCo
!registerForNotification);
boolean isChangeInDependency = true;
+ boolean timeout = false;
if (actionDep.getMissingDependencies().size() == 0) { // All push-based dependencies are available
onAllPushDependenciesAvailable();
}
@@ -122,11 +121,12 @@ public class CoordPushDependencyCheckXCo
coordAction.setPushMissingDependencies(stillMissingDeps);
}
// Checking for timeout
- if (!isTimeout()) {
- queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), getCoordPushCheckRequeueInterval());
+ timeout = isTimeout();
+ if (timeout) {
+ queue(new CoordActionTimeOutXCommand(coordAction), 100);
}
else {
- queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), getCoordPushCheckRequeueInterval());
}
}
@@ -137,6 +137,9 @@ public class CoordPushDependencyCheckXCo
else {
unregisterAvailableDependencies(actionDep);
}
+ if (timeout) {
+ unregisterMissingDependencies(actionDep.getMissingDependencies());
+ }
return null;
}
@@ -156,10 +159,7 @@ public class CoordPushDependencyCheckXCo
.getCreatedTime().getTime()))
/ (60 * 1000);
int timeOut = coordAction.getTimeOut();
- if ((timeOut >= 0) && (waitingTime > timeOut)) {
- return true;
- }
- return false;
+ return (timeOut >= 0) && (waitingTime > timeOut);
}
protected void onAllPushDependenciesAvailable() {
@@ -207,52 +207,62 @@ public class CoordPushDependencyCheckXCo
}
private void unregisterAvailableDependencies(ActionDependency actionDependency) {
- for (Entry<URIHandler, List<URI>> entry : actionDependency.getAvailableDependencies().entrySet()) {
- URIHandler handler = entry.getKey();
- for (URI availableURI : entry.getValue()) {
+ URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+ for (String availableDep : actionDependency.getAvailableDependencies()) {
+ try {
+ URI availableURI = new URI(availableDep);
+ URIHandler handler = uriService.getURIHandler(availableURI);
if (handler.unregisterFromNotification(availableURI, actionId)) {
- LOG.debug("Succesfully unregistered uri [{0}] for actionId: [{1}] from notifications",
+ LOG.debug("Successfully unregistered uri [{0}] for actionId: [{1}] from notifications",
availableURI, actionId);
}
else {
- LOG.warn("Unable to unregister uri [{0}] for actionId: [{1}] from notifications",
- availableURI, actionId);
+ LOG.warn("Unable to unregister uri [{0}] for actionId: [{1}] from notifications", availableURI,
+ actionId);
}
}
+ catch (Exception e) {
+ LOG.warn("Exception while unregistering uri for actionId: [{0}] for notifications", actionId, e);
+ }
+ }
+ }
+
+ private void unregisterMissingDependencies(List<String> missingDeps) {
+ URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+ for (String missingDep : missingDeps) {
+ try {
+ URI missingURI = new URI(missingDep);
+ URIHandler handler = uriService.getURIHandler(missingURI);
+ if (handler.unregisterFromNotification(missingURI, actionId)) {
+ LOG.debug("Successfully unregistered uri [{0}] for actionId: [{1}] from notifications", missingURI,
+ actionId);
+ }
+ else {
+ LOG.warn("Unable to unregister uri [{0}] for actionId: [{1}] from notifications", missingURI,
+ actionId);
+ }
+ }
+ catch (Exception e) {
+ LOG.warn("Exception while registering uri for actionId: [{0}] for notifications", actionId, e);
+ }
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.oozie.command.XCommand#getEntityKey()
- */
@Override
public String getEntityKey() {
return coordAction.getJobId();
}
- /* (non-Javadoc)
- * @see org.apache.oozie.command.XCommand#getKey()
- */
@Override
public String getKey(){
return getName() + "_" + actionId;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.oozie.command.XCommand#isLockRequired()
- */
@Override
protected boolean isLockRequired() {
return true;
}
- /* (non-Javadoc)
- * @see org.apache.oozie.command.XCommand#eagerLoadState()
- */
@Override
protected void eagerLoadState() throws CommandException {
try {
@@ -272,9 +282,6 @@ public class CoordPushDependencyCheckXCo
}
}
- /* (non-Javadoc)
- * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
- */
@Override
protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
@@ -298,20 +305,10 @@ public class CoordPushDependencyCheckXCo
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.oozie.command.XCommand#loadState()
- */
@Override
protected void loadState() throws CommandException {
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.oozie.command.XCommand#verifyPrecondition()
- */
@Override
protected void verifyPrecondition() throws CommandException, PreconditionException {
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java Thu Feb 7 20:10:55 2013
@@ -336,7 +336,7 @@ public class CoordELFunctions {
// DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
}
if (uriContext != null) {
- uriContext.dispose();
+ uriContext.destroy();
}
if (!resolved) {
// return unchanged future function with variable 'is_resolved'
@@ -1017,7 +1017,7 @@ public class CoordELFunctions {
// DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
}
if (uriContext != null) {
- uriContext.dispose();
+ uriContext.destroy();
}
if (!resolved) {
// return unchanged latest function with variable 'is_resolved'
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java Thu Feb 7 20:10:55 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.dependency;
+
+import java.util.List;
+
+public class ActionDependency {
+
+ private List<String> missingDependencies;
+ private List<String> availableDependencies;
+
+ ActionDependency(List<String> missingDependencies, List<String> availableDependencies) {
+ this.missingDependencies = missingDependencies;
+ this.availableDependencies = availableDependencies;
+ }
+
+ public List<String> getMissingDependencies() {
+ return missingDependencies;
+ }
+
+ public List<String> getAvailableDependencies() {
+ return availableDependencies;
+ }
+
+}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java Thu Feb 7 20:10:55 2013
@@ -20,10 +20,7 @@ package org.apache.oozie.dependency;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
@@ -31,7 +28,6 @@ import org.apache.oozie.client.OozieClie
import org.apache.oozie.command.CommandException;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.service.Services;
-import org.apache.oozie.service.URIAccessorException;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
@@ -92,63 +88,40 @@ public class DependencyChecker {
boolean stopOnFirstMissing) throws CommandException {
String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
List<String> missingDeps = new ArrayList<String>();
- Map<URIHandler, List<URI>> availableDeps = new HashMap<URIHandler, List<URI>>();
+ List<String> availableDeps = new ArrayList<String>();
URIHandlerService uriService = Services.get().get(URIHandlerService.class);
- int index = 0;
- for (;index < missingDependencies.length; index++) {
- String dependency = missingDependencies[index];
- try {
- URI uri = new URI(dependency);
- URIHandler uriHandler = uriService.getURIHandler(uri);
- LOG.debug("Checking for the availability of {0} ", dependency);
- if (uriHandler.exists(uri, actionConf, user)) {
- List<URI> availableURIs = availableDeps.get(uriHandler);
- if (availableURIs == null) {
- availableURIs = new ArrayList<URI>();
- availableDeps.put(uriHandler, availableURIs);
+ boolean continueChecking = true;
+ try {
+ for (int index = 0; index < missingDependencies.length; index++) {
+ if (continueChecking) {
+ String dependency = missingDependencies[index];
+
+ URI uri = new URI(dependency);
+ URIHandler uriHandler = uriService.getURIHandler(uri);
+ LOG.debug("Checking for the availability of [{0}] ", dependency);
+ if (uriHandler.exists(uri, actionConf, user)) {
+ LOG.debug("Dependency [{0}] is available", dependency);
+ availableDeps.add(dependency);
+ }
+ else {
+ missingDeps.add(dependency);
+ if (stopOnFirstMissing) {
+ continueChecking = false;
+ }
}
- availableURIs.add(uri);
+
}
else {
- missingDeps.add(dependency);
- if (stopOnFirstMissing) {
- index++;
- break;
- }
+ missingDeps.add(missingDependencies[index]);
}
}
- catch (URISyntaxException e) {
- throw new CommandException(ErrorCode.E0906, e.getMessage(), e);
- }
- catch (URIAccessorException e) {
- throw new CommandException(e);
- }
}
- if (stopOnFirstMissing) {
- for (;index < missingDependencies.length; index++) {
- missingDeps.add(missingDependencies[index]);
- }
+ catch (URISyntaxException e) {
+ throw new CommandException(ErrorCode.E0906, e.getMessage(), e);
}
- return new ActionDependency(missingDeps, availableDeps);
- }
-
- public static class ActionDependency {
- private List<String> missingDependencies;
- private Map<URIHandler, List<URI>> availableDependencies;
-
- public ActionDependency(List<String> missingDependencies, Map<URIHandler, List<URI>> availableDependencies) {
- this.missingDependencies = missingDependencies;
- this.availableDependencies = availableDependencies;
- }
-
- public List<String> getMissingDependencies() {
- return missingDependencies;
- }
-
- public Map<URIHandler, List<URI>> getAvailableDependencies() {
- return availableDependencies;
+ catch (URIHandlerException e) {
+ throw new CommandException(e);
}
-
+ return new ActionDependency(missingDeps, availableDeps);
}
-
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIContext.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIContext.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIContext.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIContext.java Thu Feb 7 20:10:55 2013
@@ -24,11 +24,22 @@ public class FSURIContext extends URICon
private FileSystem fs;
+ /**
+ * Create a FSURIContext that can be used to access a filesystem URI
+ *
+ * @param conf Configuration to access the URI
+ * @param user name of the user the URI should be accessed as
+ * @param fs FileSystem to access
+ */
public FSURIContext(Configuration conf, String user, FileSystem fs) {
super(conf, user);
this.fs = fs;
}
+ /**
+ * Get the FileSystem to access the URI
+ * @return FileSystem to access the URI
+ */
public FileSystem getFileSystem() {
return fs;
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java Thu Feb 7 20:10:55 2013
@@ -19,59 +19,30 @@ package org.apache.oozie.dependency;
import java.io.IOException;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.oozie.ErrorCode;
-import org.apache.oozie.coord.CoordUtils;
+import org.apache.oozie.action.hadoop.FSLauncherURIHandler;
+import org.apache.oozie.action.hadoop.LauncherURIHandler;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
-import org.apache.oozie.service.URIAccessorException;
-import org.apache.oozie.service.URIHandlerService;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XLog;
-import org.jdom.Element;
-public class FSURIHandler extends URIHandler {
+public class FSURIHandler implements URIHandler {
- private static XLog LOG = XLog.getLog(FSURIHandler.class);
- private boolean isFrontEnd;
private HadoopAccessorService service;
private Set<String> supportedSchemes;
private List<Class<?>> classesToShip;
- public FSURIHandler() {
- this.classesToShip = new ArrayList<Class<?>>();
- classesToShip.add(FSURIHandler.class);
- classesToShip.add(FSURIContext.class);
- classesToShip.add(HadoopAccessorService.class);
- classesToShip.add(HadoopAccessorException.class);
- classesToShip.add(XConfiguration.class); //Not sure why it fails in init with CNFE for this.
- }
-
@Override
- public void init(Configuration conf, boolean isFrontEnd) {
- this.isFrontEnd = isFrontEnd;
- if (isFrontEnd) {
- service = Services.get().get(HadoopAccessorService.class);
- supportedSchemes = service.getSupportedSchemes();
- }
- if (supportedSchemes == null) {
- supportedSchemes = new HashSet<String>();
- String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX
- + this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX,
- HadoopAccessorService.DEFAULT_SUPPORTED_SCHEMES);
- supportedSchemes.addAll(Arrays.asList(schemes));
- }
+ public void init(Configuration conf) {
+ service = Services.get().get(HadoopAccessorService.class);
+ supportedSchemes = service.getSupportedSchemes();
+ classesToShip = new FSLauncherURIHandler().getClassesForLauncher();
}
@Override
@@ -79,18 +50,24 @@ public class FSURIHandler extends URIHan
return supportedSchemes;
}
- public Collection<Class<?>> getClassesToShip() {
+ @Override
+ public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
+ return FSLauncherURIHandler.class;
+ }
+
+ @Override
+ public List<Class<?>> getClassesForLauncher() {
return classesToShip;
}
@Override
- public DependencyType getDependencyType(URI uri) throws URIAccessorException {
+ public DependencyType getDependencyType(URI uri) throws URIHandlerException {
return DependencyType.PULL;
}
@Override
public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
- throws URIAccessorException {
+ throws URIHandlerException {
throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme());
}
@@ -100,19 +77,13 @@ public class FSURIHandler extends URIHan
}
@Override
- public URIContext getURIContext(URI uri, Configuration conf, String user) throws URIAccessorException {
+ public URIContext getURIContext(URI uri, Configuration conf, String user) throws URIHandlerException {
FileSystem fs = getFileSystem(uri, conf, user);
return new FSURIContext(conf, user, fs);
}
@Override
- public boolean create(URI uri, Configuration conf, String user) throws URIAccessorException {
- FileSystem fs = getFileSystem(uri, conf, user);
- return create(fs, uri);
- }
-
- @Override
- public boolean exists(URI uri, URIContext uriContext) throws URIAccessorException {
+ public boolean exists(URI uri, URIContext uriContext) throws URIHandlerException {
try {
FileSystem fs = ((FSURIContext) uriContext).getFileSystem();
return fs.exists(getNormalizedPath(uri));
@@ -123,7 +94,7 @@ public class FSURIHandler extends URIHan
}
@Override
- public boolean exists(URI uri, Configuration conf, String user) throws URIAccessorException {
+ public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
try {
FileSystem fs = getFileSystem(uri, conf, user);
return fs.exists(getNormalizedPath(uri));
@@ -134,22 +105,7 @@ public class FSURIHandler extends URIHan
}
@Override
- public boolean delete(URI uri, Configuration conf, String user) throws URIAccessorException {
- FileSystem fs = getFileSystem(uri, conf, user);
- return delete(fs, uri);
- }
-
- @Override
- public String getURIWithDoneFlag(String uri, Element doneFlagElement) throws URIAccessorException {
- String doneFlag = CoordUtils.getDoneFlag(doneFlagElement);
- if (doneFlag.length() > 0) {
- uri += "/" + doneFlag;
- }
- return uri;
- }
-
- @Override
- public String getURIWithDoneFlag(String uri, String doneFlag) throws URIAccessorException {
+ public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
if (doneFlag.length() > 0) {
uri += "/" + doneFlag;
}
@@ -157,7 +113,7 @@ public class FSURIHandler extends URIHan
}
@Override
- public void validate(String uri) throws URIAccessorException {
+ public void validate(String uri) throws URIHandlerException {
}
@Override
@@ -171,65 +127,11 @@ public class FSURIHandler extends URIHan
}
private FileSystem getFileSystem(URI uri, Configuration conf, String user) throws HadoopAccessorException {
- if (isFrontEnd) {
- if (user == null) {
- throw new HadoopAccessorException(ErrorCode.E0902, "user has to be specified to access FileSystem");
- }
- Configuration fsConf = service.createJobConf(uri.getAuthority());
- return service.createFileSystem(user, uri, fsConf);
- }
- else {
- try {
- if (user != null && !user.equals(UserGroupInformation.getLoginUser().getShortUserName())) {
- throw new HadoopAccessorException(ErrorCode.E0902,
- "Cannot access FileSystem as a different user in backend");
- }
- return FileSystem.get(uri, conf);
- }
- catch (IOException e) {
- throw new HadoopAccessorException(ErrorCode.E0902, e);
- }
- }
- }
-
- private boolean create(FileSystem fs, URI uri) throws URIAccessorException {
- Path path = getNormalizedPath(uri);
- try {
- if (!fs.exists(path)) {
- boolean status = fs.mkdirs(path);
- if (status) {
- LOG.info("Creating directory at {0} succeeded.", path);
- }
- else {
- LOG.info("Creating directory at {0} failed.", path);
- }
- return status;
- }
- }
- catch (IOException e) {
- throw new HadoopAccessorException(ErrorCode.E0902, e);
- }
- return false;
- }
-
- private boolean delete(FileSystem fs, URI uri) throws URIAccessorException {
- Path path = getNormalizedPath(uri);
- try {
- if (fs.exists(path)) {
- boolean status = fs.delete(path, true);
- if (status) {
- LOG.info("Deletion of path {0} succeeded.", path);
- }
- else {
- LOG.info("Deletion of path {0} failed.", path);
- }
- return status;
- }
- }
- catch (IOException e) {
- throw new HadoopAccessorException(ErrorCode.E0902, e);
+ if (user == null) {
+ throw new HadoopAccessorException(ErrorCode.E0902, "user has to be specified to access FileSystem");
}
- return false;
+ Configuration fsConf = service.createJobConf(uri.getAuthority());
+ return service.createFileSystem(user, uri, fsConf);
}
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIContext.java Thu Feb 7 20:10:55 2013
@@ -19,25 +19,41 @@ package org.apache.oozie.dependency;
import org.apache.hadoop.conf.Configuration;
import org.apache.hcatalog.api.HCatClient;
+import org.apache.oozie.util.XLog;
public class HCatURIContext extends URIContext {
+ private static XLog LOG = XLog.getLog(HCatURIContext.class);
private HCatClient hcatClient;
+ /**
+ * Create a HCatURIContext that can be used to access a hcat URI
+ *
+ * @param conf Configuration to access the URI
+ * @param user name of the user the URI should be accessed as
+ * @param hcatClient HCatClient to talk to hcatalog server
+ */
public HCatURIContext(Configuration conf, String user, HCatClient hcatClient) {
super(conf, user);
this.hcatClient = hcatClient;
}
+ /**
+ * Get the HCatClient to talk to hcatalog server
+ *
+ * @return HCatClient to talk to hcatalog server
+ */
public HCatClient getHCatClient() {
return hcatClient;
}
- public void dispose() {
+ @Override
+ public void destroy() {
try {
hcatClient.close();
}
catch (Exception ignore) {
+ LOG.warn("Error closing hcat client", ignore);
}
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java Thu Feb 7 20:10:55 2013
@@ -20,10 +20,7 @@ package org.apache.oozie.dependency;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -38,53 +35,32 @@ import org.apache.hcatalog.api.HCatClien
import org.apache.hcatalog.api.HCatPartition;
import org.apache.hcatalog.common.HCatException;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.hadoop.HCatLauncherURIHandler;
+import org.apache.oozie.action.hadoop.LauncherURIHandler;
import org.apache.oozie.jms.HCatMessageHandler;
-import org.apache.oozie.service.JMSAccessorService;
+import org.apache.oozie.service.HCatAccessorService;
import org.apache.oozie.service.MetaDataAccessorException;
import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.Services;
-import org.apache.oozie.service.URIAccessorException;
import org.apache.oozie.service.URIHandlerService;
-import org.apache.oozie.service.UserGroupInformationService;
import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.XLog;
-import org.jdom.Element;
-public class HCatURIHandler extends URIHandler {
+public class HCatURIHandler implements URIHandler {
private static XLog LOG = XLog.getLog(HCatURIHandler.class);
- private boolean isFrontEnd;
private Set<String> supportedSchemes;
- private UserGroupInformationService ugiService;
- private List<Class<?>> classesToShip;
private Map<String, DependencyType> dependencyTypes;
- private Set<String> registeredTopicURIs;
-
- public HCatURIHandler() {
- this.classesToShip = new ArrayList<Class<?>>();
- classesToShip.add(HCatURIHandler.class);
- classesToShip.add(HCatURIContext.class);
- classesToShip.add(HCatURI.class);
- classesToShip.add(MetaDataAccessorException.class);
- classesToShip.add(UserGroupInformationService.class);
- classesToShip.add(JMSAccessorService.class);
- classesToShip.add(PartitionDependencyManagerService.class);
- classesToShip.add(HCatMessageHandler.class);
- classesToShip.add(DependencyType.class);
- }
+ private List<Class<?>> classesToShip;
@Override
- public void init(Configuration conf, boolean isFrontEnd) {
- this.isFrontEnd = isFrontEnd;
- if (isFrontEnd) {
- ugiService = Services.get().get(UserGroupInformationService.class);
- dependencyTypes = new HashMap<String, DependencyType>();
- registeredTopicURIs = new HashSet<String>();
- }
+ public void init(Configuration conf) {
+ dependencyTypes = new HashMap<String, DependencyType>();
supportedSchemes = new HashSet<String>();
String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX
+ this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "hcat");
supportedSchemes.addAll(Arrays.asList(schemes));
+ classesToShip = new HCatLauncherURIHandler().getClassesForLauncher();
}
@Override
@@ -92,46 +68,51 @@ public class HCatURIHandler extends URIH
return supportedSchemes;
}
- public Collection<Class<?>> getClassesToShip() {
+ @Override
+ public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
+ return HCatLauncherURIHandler.class;
+ }
+
+ @Override
+ public List<Class<?>> getClassesForLauncher() {
return classesToShip;
}
@Override
- public DependencyType getDependencyType(URI uri) throws URIAccessorException {
- JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
- if (jmsService == null) {
- return DependencyType.PULL;
- }
- DependencyType depType = dependencyTypes.get(uri.getAuthority());
- if (depType == null) {
- depType = jmsService.isKnownPublisher(uri) ? DependencyType.PUSH : DependencyType.PULL;
- dependencyTypes.put(uri.getAuthority(), depType);
+ public DependencyType getDependencyType(URI uri) throws URIHandlerException {
+ DependencyType depType = DependencyType.PULL;
+ // Not initializing in constructor as this will be part of oozie.services.ext
+ // and will be initialized after URIHandlerService
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ if (hcatService != null) {
+ depType = dependencyTypes.get(uri.getAuthority());
+ if (depType == null) {
+ depType = hcatService.isKnownPublisher(uri) ? DependencyType.PUSH : DependencyType.PULL;
+ dependencyTypes.put(uri.getAuthority(), depType);
+ }
}
return depType;
- }
+ }
@Override
- public void registerForNotification(URI uri, Configuration conf, String user, String actionID) throws URIAccessorException {
- String uriString = uri.toString();
- String uriMinusPartition = uriString.substring(0, uriString.lastIndexOf("/"));
+ public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
+ throws URIHandlerException {
HCatURI hcatURI;
try {
hcatURI = new HCatURI(uri);
}
catch (URISyntaxException e) {
- throw new URIAccessorException(ErrorCode.E0906, uri, e);
+ throw new URIHandlerException(ErrorCode.E0906, uri, e);
}
- // Check cache to avoid a call to hcat server to get the topic
- if (!registeredTopicURIs.contains(uriMinusPartition)) {
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ if (!hcatService.isRegisteredForNotification(hcatURI)) {
HCatClient client = getHCatClient(uri, conf, user);
try {
String topic = client.getMessageBusTopicName(hcatURI.getDb(), hcatURI.getTable());
if (topic == null) {
return;
}
- JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
- jmsService.registerForNotification(uri, topic, new HCatMessageHandler(uri.getAuthority()));
- registeredTopicURIs.add(uriMinusPartition);
+ hcatService.registerForNotification(hcatURI, topic, new HCatMessageHandler(uri.getAuthority()));
}
catch (HCatException e) {
throw new MetaDataAccessorException(ErrorCode.E1504, e);
@@ -151,59 +132,42 @@ public class HCatURIHandler extends URIH
hcatURI = new HCatURI(uri);
}
catch (URISyntaxException e) {
- throw new RuntimeException(e); //Unexpected at this point
+ throw new RuntimeException(e); // Unexpected at this point
}
PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
return pdmService.removeMissingDependency(hcatURI, actionID);
}
@Override
- public URIContext getURIContext(URI uri, Configuration conf, String user) throws URIAccessorException {
+ public URIContext getURIContext(URI uri, Configuration conf, String user) throws URIHandlerException {
HCatClient client = getHCatClient(uri, conf, user);
return new HCatURIContext(conf, user, client);
}
@Override
- public boolean create(URI uri, Configuration conf, String user) throws URIAccessorException {
- throw new MetaDataAccessorException(ErrorCode.E0902, new UnsupportedOperationException(
- "Add partition not supported"));
- }
-
- @Override
- public boolean exists(URI uri, URIContext uriContext) throws URIAccessorException {
+ public boolean exists(URI uri, URIContext uriContext) throws URIHandlerException {
HCatClient client = ((HCatURIContext) uriContext).getHCatClient();
return exists(uri, client, false);
}
@Override
- public boolean exists(URI uri, Configuration conf, String user) throws URIAccessorException {
+ public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
HCatClient client = getHCatClient(uri, conf, user);
return exists(uri, client, true);
}
@Override
- public boolean delete(URI uri, Configuration conf, String user) throws URIAccessorException {
- HCatClient hCatClient = getHCatClient(uri, conf, user);
- return delete(hCatClient, uri, true);
- }
-
- @Override
- public String getURIWithDoneFlag(String uri, Element doneFlagElement) throws URIAccessorException {
- return uri;
- }
-
- @Override
- public String getURIWithDoneFlag(String uri, String doneFlag) throws URIAccessorException {
+ public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
return uri;
}
@Override
- public void validate(String uri) throws URIAccessorException {
+ public void validate(String uri) throws URIHandlerException {
try {
- new HCatURI(uri); //will fail if uri syntax is incorrect
+ new HCatURI(uri); // will fail if uri syntax is incorrect
}
catch (URISyntaxException e) {
- throw new URIAccessorException(ErrorCode.E0906, uri, e);
+ throw new URIHandlerException(ErrorCode.E0906, uri, e);
}
}
@@ -220,32 +184,23 @@ public class HCatURIHandler extends URIH
hiveConf.set("hive.metastore.local", "false");
}
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
- HCatClient client = null;
try {
LOG.info("Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user,
UserGroupInformation.getLoginUser(), serverURI);
- if (isFrontEnd) {
- if (user == null) {
- throw new MetaDataAccessorException(ErrorCode.E0902,
- "user has to be specified to access metastore server");
- }
- UserGroupInformation ugi = ugiService.getProxyUser(user);
- client = ugi.doAs(new PrivilegedExceptionAction<HCatClient>() {
- public HCatClient run() throws Exception {
- return HCatClient.create(hiveConf);
- }
- });
- }
- else {
- if (user != null && !user.equals(UserGroupInformation.getLoginUser().getShortUserName())) {
- throw new MetaDataAccessorException(ErrorCode.E0902,
- "Cannot access metastore server as a different user in backend");
+
+ // HiveMetastoreClient (hive 0.9) currently does not work if UGI has doAs
+ // We are good to connect as the oozie user since listPartitions does not require
+ // authorization
+ /*
+ UserGroupInformation ugi = ugiService.getProxyUser(user);
+ return ugi.doAs(new PrivilegedExceptionAction<HCatClient>() {
+ public HCatClient run() throws Exception {
+ return HCatClient.create(hiveConf);
}
- // Delegation token fetched from metastore has new Text() as service and HiveMetastoreClient
- // looks for the same if not overriden by hive.metastore.token.signature
- // We are good as long as HCatCredentialHelper does not change the service of the token.
- client = HCatClient.create(hiveConf);
- }
+ });
+ */
+
+ return HCatClient.create(hiveConf);
}
catch (HCatException e) {
throw new MetaDataAccessorException(ErrorCode.E1504, e);
@@ -253,21 +208,20 @@ public class HCatURIHandler extends URIH
catch (IOException e) {
throw new MetaDataAccessorException(ErrorCode.E1504, e);
}
- catch (InterruptedException e) {
- throw new MetaDataAccessorException(ErrorCode.E1504, e);
- }
- return client;
}
private String getMetastoreConnectURI(URI uri) {
+ String metastoreURI;
// For unit tests
if (uri.getAuthority().equals("unittest-local")) {
- return "";
+ metastoreURI = "";
+ }
+ else {
+ // Hardcoding hcat to thrift mapping till support for webhcat(templeton)
+ // is added
+ metastoreURI = "thrift://" + uri.getAuthority();
}
- // Hardcoding hcat to thrift mapping till support for webhcat(templeton)
- // is added
- String metastoreURI = "thrift://" + uri.getAuthority();
return metastoreURI;
}
@@ -276,31 +230,7 @@ public class HCatURIHandler extends URIH
HCatURI hcatURI = new HCatURI(uri.toString());
List<HCatPartition> partitions = client.getPartitions(hcatURI.getDb(), hcatURI.getTable(),
hcatURI.getPartitionMap());
- if (partitions == null || partitions.isEmpty()) {
- return false;
- }
- return true;
- }
- catch (ConnectionFailureException e) {
- throw new MetaDataAccessorException(ErrorCode.E1504, e);
- }
- catch (HCatException e) {
- throw new MetaDataAccessorException(ErrorCode.E0902, e);
- }
- catch (URISyntaxException e) {
- throw new MetaDataAccessorException(ErrorCode.E0902, e);
- }
- finally {
- closeQuietly(client, closeClient);
- }
- }
-
- private boolean delete(HCatClient client, URI uri, boolean closeClient) throws URIAccessorException {
- try {
- HCatURI hcatURI = new HCatURI(uri.toString());
- client.dropPartitions(hcatURI.getDb(), hcatURI.getTable(), hcatURI.getPartitionMap(), true);
- LOG.info("Dropped partitions for " + uri);
- return true;
+ return (partitions != null && !partitions.isEmpty());
}
catch (ConnectionFailureException e) {
throw new MetaDataAccessorException(ErrorCode.E1504, e);
@@ -322,6 +252,7 @@ public class HCatURIHandler extends URIH
client.close();
}
catch (Exception ignore) {
+ LOG.warn("Error closing hcat client", ignore);
}
}
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIContext.java Thu Feb 7 20:10:55 2013
@@ -24,20 +24,36 @@ public abstract class URIContext {
private Configuration conf;
private String user;
+ /**
+ * Create a URIContext that can be used to access a URI
+ * @param conf Configuration to access the URI
+ * @param user name of the user the URI should be accessed as
+ */
public URIContext(Configuration conf, String user) {
this.conf = conf;
this.user = user;
}
+ /**
+ * Get the Configuration to access the URI
+ * @return Configuration to access the URI
+ */
public Configuration getConfiguration() {
return conf;
}
+ /**
+ * Get the name of the user the URI will be accessed as
+ * @return the user name the URI will be accessed as
+ */
public String getUser() {
return user;
}
- public void dispose() {
+ /**
+ * Destroy the URIContext
+ */
+ public void destroy() {
}
}