You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by di...@apache.org on 2022/12/01 16:37:50 UTC
[oozie] branch master updated: OOZIE-3606 Extend file system EL functions to use custom file system properties (jmakai via dionusos)
This is an automated email from the ASF dual-hosted git repository.
dionusos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git
The following commit(s) were added to refs/heads/master by this push:
new 4f4e9ee43 OOZIE-3606 Extend file system EL functions to use custom file system properties (jmakai via dionusos)
4f4e9ee43 is described below
commit 4f4e9ee43daf2c8e86b56cabf674a3a4e606007f
Author: Denes Bodo <di...@apache.org>
AuthorDate: Thu Dec 1 15:39:26 2022 +0100
OOZIE-3606 Extend file system EL functions to use custom file system properties (jmakai via dionusos)
---
core/src/main/java/org/apache/oozie/ErrorCode.java | 1 +
.../apache/oozie/action/hadoop/FsELFunctions.java | 53 +++++++++++
core/src/main/resources/oozie-default.xml | 56 +++++++++++
.../oozie/action/hadoop/TestFsELFunctions.java | 102 +++++++++++++++++----
docs/src/site/markdown/WorkflowFunctionalSpec.md | 23 +++++
release-log.txt | 1 +
6 files changed, 220 insertions(+), 16 deletions(-)
diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java
index 8f4e21d03..a8053004e 100644
--- a/core/src/main/java/org/apache/oozie/ErrorCode.java
+++ b/core/src/main/java/org/apache/oozie/ErrorCode.java
@@ -158,6 +158,7 @@ public enum ErrorCode {
E0756(XLog.STD, "Exception parsing Kill node message [{0}]"),
E0757(XLog.STD, "Fork node [{0}] has multiple joins: [{1}]"),
E0758(XLog.STD, "Join node [{0}] has multiple forks: [{1}]"),
+ E0759(XLog.STD, "Could not read the workflow configuration"),
E0800(XLog.STD, "Action it is not running its in [{1}] state, action [{0}]"),
E0801(XLog.STD, "Workflow already running, workflow [{0}]"),
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/FsELFunctions.java b/core/src/main/java/org/apache/oozie/action/hadoop/FsELFunctions.java
index 0f81d7633..d877e785b 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/FsELFunctions.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/FsELFunctions.java
@@ -19,6 +19,7 @@
package org.apache.oozie.action.hadoop;
import java.io.IOException;
+import java.io.StringReader;
import java.net.URI;
import java.net.URISyntaxException;
@@ -28,26 +29,78 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.oozie.DagELFunctions;
+import org.apache.oozie.ErrorCode;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.util.XConfiguration;
/**
* EL function for fs action executor.
*/
public class FsELFunctions {
+ static final String FS_EL_FUNCTIONS_CONF = "FsELFunctions.conf.fs.";
private static FileSystem getFileSystem(URI uri) throws HadoopAccessorException {
WorkflowJob workflow = DagELFunctions.getWorkflow();
String user = workflow.getUser();
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
Configuration conf = has.createConfiguration(uri.getAuthority());
+
+ extractExtraFsConfiguration(workflow, conf, uri);
+
return has.createFileSystem(user, uri, conf);
}
+ static void extractExtraFsConfiguration(WorkflowJob workflow, Configuration conf, URI uri)
+ throws HadoopAccessorException {
+ if (workflow.getConf() != null) {
+ try {
+ readFsConfigFromOozieSite(conf, uri);
+ readFsConfigFromWorkflow(workflow, conf, uri);
+ } catch (Exception e) {
+ throw new HadoopAccessorException(ErrorCode.E0759, e);
+ }
+ }
+ }
+
+ private static void readFsConfigFromOozieSite(Configuration conf, URI uri) {
+ final String fsElFunctionsConfWithScheme = FS_EL_FUNCTIONS_CONF + uri.getScheme();
+ final String customELFsProperties = ConfigurationService.get(fsElFunctionsConfWithScheme);
+
+ for (final String entry : customELFsProperties.split(",")) {
+ final String[] nameAndValue = entry.trim().split("=", 2);
+ if (nameAndValue.length < 2) {
+ continue;
+ }
+ putKeyToConfIfAllowed(conf, nameAndValue[0], nameAndValue[1]);
+ }
+ }
+
+ private static void readFsConfigFromWorkflow(WorkflowJob workflow, Configuration conf, URI uri) throws Exception {
+ if (workflow.getConf() == null) {
+ return;
+ }
+ final String FS_EL_FUNCTIONS_CONF_WITH_SCHEME = FS_EL_FUNCTIONS_CONF + uri.getScheme() + ".";
+ final XConfiguration workflowConf = new XConfiguration(new StringReader(workflow.getConf()));
+ for (Object _key : workflowConf.toProperties().keySet()) {
+ String key = (String) _key;
+ if (!key.startsWith(FS_EL_FUNCTIONS_CONF_WITH_SCHEME)) {
+ continue;
+ }
+ putKeyToConfIfAllowed(conf, key.substring(FS_EL_FUNCTIONS_CONF_WITH_SCHEME.length()), workflowConf.get(key));
+ }
+ }
+
+ private static void putKeyToConfIfAllowed(Configuration conf, String key, String value) {
+ if (!JavaActionExecutor.DISALLOWED_PROPERTIES.contains(key)) {
+ conf.set(key, value);
+ }
+ }
+
/**
* Get file status.
*
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index a11c2e753..01c1095fe 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -826,6 +826,62 @@ will be the requeue interval for the actions which are waiting for a long time w
</description>
</property>
+ <!-- Have default empty value for every file system enumerated in oozie.service.HadoopAccessorService.supported.filesystems -->
+
+ <property>
+ <name>FsELFunctions.conf.fs.hdfs</name>
+ <value></value>
+ <description>
+ You can configure custom hdfs file system properties for EL functions globally.
+ Value shall be a comma separated list of key=value pairs.
+ </description>
+ </property>
+
+ <property>
+ <name>FsELFunctions.conf.fs.hftp</name>
+ <value></value>
+ <description>
+ You can configure custom hftp file system properties for EL functions globally.
+ Value shall be a comma separated list of key=value pairs.
+ </description>
+ </property>
+
+ <property>
+ <name>FsELFunctions.conf.fs.abfs</name>
+ <value></value>
+ <description>
+ You can configure custom abfs file system properties for EL functions globally.
+ Value shall be a comma separated list of key=value pairs.
+ </description>
+ </property>
+
+ <property>
+ <name>FsELFunctions.conf.fs.abfss</name>
+ <value></value>
+ <description>
+ You can configure custom abfss file system properties for EL functions globally.
+ Value shall be a comma separated list of key=value pairs.
+ </description>
+ </property>
+
+ <property>
+ <name>FsELFunctions.conf.fs.s3</name>
+ <value></value>
+ <description>
+ You can configure custom s3 file system properties for EL functions globally.
+ Value shall be a comma separated list of key=value pairs.
+ </description>
+ </property>
+
+ <property>
+ <name>FsELFunctions.conf.fs.webhdfs</name>
+ <value></value>
+ <description>
+ You can configure custom webhdfs file system properties for EL functions globally.
+ Value shall be a comma separated list of key=value pairs.
+ </description>
+ </property>
+
<property>
<name>oozie.service.WorkflowAppService.WorkflowDefinitionMaxLength</name>
<value>100000</value>
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java
index 7b8187e0f..bab5243e6 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java
@@ -30,6 +30,7 @@ import org.apache.oozie.client.OozieClient;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.DagELFunctions;
import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.workflow.lite.EndNodeDef;
import org.apache.oozie.workflow.lite.LiteWorkflowApp;
@@ -43,28 +44,33 @@ import org.apache.oozie.util.XConfiguration;
public class TestFsELFunctions extends XFsTestCase {
+ public static final String PASSWORD_FILE_KEY = "hadoop.security.credstore.java-keystore-provider.password-file";
+ public static final String CREDENTIAL_PATH_KEY = "hadoop.security.credential.provider.path";
+ public static final String HDFS_FILE = "hdfs://path/to/file";
+ public static final String JCEKS_FILE = "jceks://path/to/file";
+ public static final String PASSWORD_FILE = "password.file";
+ public static final String KEY_WE_DONOT_WANT = "key.shall.not.used";
+ public static final String HDFS_ELF_FS_CONF_PREFIX = FsELFunctions.FS_EL_FUNCTIONS_CONF + "hdfs";
+ private Configuration jobConf;
+ private Configuration protoConf;
+ private Configuration conf;
+ private LiteWorkflowInstance job;
+ private WorkflowJobBean wf;
+ private FileSystem fs;
+
@Override
protected void setUp() throws Exception {
super.setUp();
new Services().init();
- }
- @Override
- protected void tearDown() throws Exception {
- Services.get().destroy();
- super.tearDown();
- }
-
- public void testFunctions() throws Exception {
String file1 = new Path(getFsTestCaseDir(), "file1").toString();
String file2 = new Path(getFsTestCaseDir(), "file2").toString();
String dir = new Path(getFsTestCaseDir(), "dir").toString();
- Configuration protoConf = new Configuration();
+ protoConf = new Configuration();
protoConf.set(OozieClient.USER_NAME, getTestUser());
protoConf.set("hadoop.job.ugi", getTestUser() + "," + "group");
-
- FileSystem fs = getFileSystem();
+ fs = getFileSystem();
fs.mkdirs(new Path(dir));
fs.create(new Path(file1)).close();
OutputStream os = fs.create(new Path(dir, "a"));
@@ -83,7 +89,7 @@ public class TestFsELFunctions extends XFsTestCase {
String dirExtraSlashInPath = new URI(dirURI.getScheme(), dirURI.getAuthority(),
"/" + dirURI.getPath(), null, null).toString();
- Configuration conf = new XConfiguration();
+ conf = new XConfiguration();
conf.set(OozieClient.APP_PATH, "appPath");
conf.set(OozieClient.USER_NAME, getTestUser());
@@ -100,11 +106,11 @@ public class TestFsELFunctions extends XFsTestCase {
LiteWorkflowApp def =
new LiteWorkflowApp("name", "<workflow-app/>",
- new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end")).
- addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
- LiteWorkflowInstance job = new LiteWorkflowInstance(def, conf, "wfId");
+ new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end")).
+ addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
+ job = new LiteWorkflowInstance(def, conf, "wfId");
- WorkflowJobBean wf = new WorkflowJobBean();
+ wf = new WorkflowJobBean();
wf.setId(job.getId());
wf.setAppName("name");
wf.setAppPath("appPath");
@@ -114,7 +120,15 @@ public class TestFsELFunctions extends XFsTestCase {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
protoConf.writeXml(baos);
wf.setProtoActionConf(baos.toString(StandardCharsets.UTF_8.name()));
+ }
+ @Override
+ protected void tearDown() throws Exception {
+ Services.get().destroy();
+ super.tearDown();
+ }
+
+ public void testFunctions() throws Exception {
WorkflowActionBean action = new WorkflowActionBean();
action.setId("actionId");
action.setName("actionName");
@@ -140,4 +154,60 @@ public class TestFsELFunctions extends XFsTestCase {
3, (int) eval.evaluate("${fs:dirSize(wf:conf('dirExtraSlashInPath'))}", Integer.class));
}
+ public void testCustomFileSystemPropertiesCanBeSet() throws Exception {
+ jobConf = new Configuration();
+ jobConf.set(HDFS_ELF_FS_CONF_PREFIX + "." + CREDENTIAL_PATH_KEY, JCEKS_FILE);
+ jobConf.set(HDFS_ELF_FS_CONF_PREFIX + "." + PASSWORD_FILE_KEY, PASSWORD_FILE);
+ jobConf.set("hadoop.irrelevant.configuration", "value");
+ jobConf.set("user.name", "Malice");
+ jobConf.set(HDFS_ELF_FS_CONF_PREFIX + "user.name", "Malice");
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ jobConf.writeXml(baos);
+ wf.setConf(baos.toString(StandardCharsets.UTF_8.name()));
+
+ Configuration resultFsConf = new Configuration(false);
+ FsELFunctions.extractExtraFsConfiguration(wf, resultFsConf, fs.getUri());
+ assertEquals(JCEKS_FILE, resultFsConf.get(CREDENTIAL_PATH_KEY));
+ assertEquals(PASSWORD_FILE, resultFsConf.get(PASSWORD_FILE_KEY));
+ assertNull("Irrelevant property shall not be set.", resultFsConf.get("hadoop.irrelevant.configuration"));
+ assertNull("Disallowed property shall not be set.", resultFsConf.get("user.name"));
+
+ }
+
+ public void testOozieSiteConfigRead() throws Exception {
+ Configuration cnf = new Configuration(false);
+ URI uri = new URI(HDFS_FILE);
+ ConfigurationService.set(HDFS_ELF_FS_CONF_PREFIX,
+ CREDENTIAL_PATH_KEY + "=" + JCEKS_FILE + "," + PASSWORD_FILE_KEY + "=" + PASSWORD_FILE);
+ ConfigurationService.set(FsELFunctions.FS_EL_FUNCTIONS_CONF + "hdfsx", KEY_WE_DONOT_WANT + "=value");
+
+ jobConf = new Configuration();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ jobConf.writeXml(baos);
+ wf.setConf(baos.toString(StandardCharsets.UTF_8.name()));
+
+ FsELFunctions.extractExtraFsConfiguration(wf, cnf, uri);
+
+ assertEquals(JCEKS_FILE, cnf.get(CREDENTIAL_PATH_KEY));
+ assertEquals(PASSWORD_FILE, cnf.get(PASSWORD_FILE_KEY));
+ assertNull(cnf.get(KEY_WE_DONOT_WANT));
+ }
+
+ public void testIfWorkflowConfOverwritesSiteConf() throws Exception {
+ Configuration cnf = new Configuration(false);
+ URI uri = new URI(HDFS_FILE);
+ String KEY_TO_OVERRIDE = CREDENTIAL_PATH_KEY;
+ ConfigurationService.set(HDFS_ELF_FS_CONF_PREFIX, KEY_TO_OVERRIDE + "=" + JCEKS_FILE);
+
+ jobConf = new Configuration();
+ jobConf.set(HDFS_ELF_FS_CONF_PREFIX + "." + KEY_TO_OVERRIDE, "Desired value");
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ jobConf.writeXml(baos);
+ wf.setConf(baos.toString(StandardCharsets.UTF_8.name()));
+
+ FsELFunctions.extractExtraFsConfiguration(wf, cnf, uri);
+
+ assertEquals("Desired value", cnf.get(CREDENTIAL_PATH_KEY));
+ }
}
diff --git a/docs/src/site/markdown/WorkflowFunctionalSpec.md b/docs/src/site/markdown/WorkflowFunctionalSpec.md
index 136f1d77c..3e63d662e 100644
--- a/docs/src/site/markdown/WorkflowFunctionalSpec.md
+++ b/docs/src/site/markdown/WorkflowFunctionalSpec.md
@@ -2288,6 +2288,29 @@ It returns the size in bytes of specified file. If the path is not a file, or if
It returns the block size in bytes of specified file. If the path is not a file, or if it does not exist it returns -1.
+#### 4.2.7.1 File system configuration for EL functions
+
+There is a use case when you need to access special file systems to check data availability and you need to have
+file system principal and key file set. You can have them configured globally in oozie-site.xml or per workflow run
+through job.properties file.
+
+Example of global configuration to have all the abfss:// access via EL functions configured:
+```
+<property>
+ <name>FsELFunctions.conf.fs.abfss</name>
+ <value>
+ hadoop.security.credstore.java-keystore-provider.password-file=password_file_name,
+ hadoop.security.credential.provider.path=jceks:///path/to/file
+ </value>
+ </property>
+```
+
+Example of per workflow configuration via job.properties:
+```
+FsELFunctions.conf.fs.abfss.hadoop.security.credstore.java-keystore-provider.password-file=password_file_name,
+FsELFunctions.conf.fs.abfss.hadoop.security.credential.provider.path=jceks:///path/to/file
+```
+
#### 4.2.8 HCatalog EL Functions
For all the functions in this section the URI must be a hcatalog URI identifying a table or set of partitions in a table.
diff --git a/release-log.txt b/release-log.txt
index 8c658ef7b..372837af3 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.3.0 release (trunk - unreleased)
+OOZIE-3606 Extend file system EL functions to use custom file system properties (jmakai via dionusos)
OOZIE-3677 Oozie should accept a keyStoreType and trustStoreType property in oozie-site.xml (jmakai via dionusos)
OOZIE-3678 Reduce the number of NameNode access when starting the Yarn job (jmakai via dionusos)
OOZIE-3670 Actions can stuck while running in a Fork-Join workflow (jmakai via dionusos)