You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by di...@apache.org on 2022/12/01 16:37:50 UTC

[oozie] branch master updated: OOZIE-3606 Extend file system EL functions to use custom file system properties (jmakai via dionusos)

This is an automated email from the ASF dual-hosted git repository.

dionusos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f4e9ee43 OOZIE-3606 Extend file system EL functions to use custom file system properties (jmakai via dionusos)
4f4e9ee43 is described below

commit 4f4e9ee43daf2c8e86b56cabf674a3a4e606007f
Author: Denes Bodo <di...@apache.org>
AuthorDate: Thu Dec 1 15:39:26 2022 +0100

    OOZIE-3606 Extend file system EL functions to use custom file system properties (jmakai via dionusos)
---
 core/src/main/java/org/apache/oozie/ErrorCode.java |   1 +
 .../apache/oozie/action/hadoop/FsELFunctions.java  |  53 +++++++++++
 core/src/main/resources/oozie-default.xml          |  56 +++++++++++
 .../oozie/action/hadoop/TestFsELFunctions.java     | 102 +++++++++++++++++----
 docs/src/site/markdown/WorkflowFunctionalSpec.md   |  23 +++++
 release-log.txt                                    |   1 +
 6 files changed, 220 insertions(+), 16 deletions(-)

diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java
index 8f4e21d03..a8053004e 100644
--- a/core/src/main/java/org/apache/oozie/ErrorCode.java
+++ b/core/src/main/java/org/apache/oozie/ErrorCode.java
@@ -158,6 +158,7 @@ public enum ErrorCode {
     E0756(XLog.STD, "Exception parsing Kill node message [{0}]"),
     E0757(XLog.STD, "Fork node [{0}] has multiple joins: [{1}]"),
     E0758(XLog.STD, "Join node [{0}] has multiple forks: [{1}]"),
+    E0759(XLog.STD, "Could not read the workflow configuration"),
 
     E0800(XLog.STD, "Action it is not running its in [{1}] state, action [{0}]"),
     E0801(XLog.STD, "Workflow already running, workflow [{0}]"),
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/FsELFunctions.java b/core/src/main/java/org/apache/oozie/action/hadoop/FsELFunctions.java
index 0f81d7633..d877e785b 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/FsELFunctions.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/FsELFunctions.java
@@ -19,6 +19,7 @@
 package org.apache.oozie.action.hadoop;
 
 import java.io.IOException;
+import java.io.StringReader;
 import java.net.URI;
 import java.net.URISyntaxException;
 
@@ -28,26 +29,78 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.oozie.DagELFunctions;
+import org.apache.oozie.ErrorCode;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.util.XConfiguration;
 
 /**
  * EL function for fs action executor.
  */
 public class FsELFunctions {
+    static final String FS_EL_FUNCTIONS_CONF = "FsELFunctions.conf.fs.";
 
     private static FileSystem getFileSystem(URI uri) throws HadoopAccessorException {
         WorkflowJob workflow = DagELFunctions.getWorkflow();
         String user = workflow.getUser();
         HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
         Configuration conf = has.createConfiguration(uri.getAuthority());
+
+        extractExtraFsConfiguration(workflow, conf, uri);
+
         return has.createFileSystem(user, uri, conf);
     }
 
+    static void extractExtraFsConfiguration(WorkflowJob workflow, Configuration conf, URI uri)
+            throws HadoopAccessorException {
+        if (workflow.getConf() != null) {
+            try {
+                readFsConfigFromOozieSite(conf, uri);
+                readFsConfigFromWorkflow(workflow, conf, uri);
+            } catch (Exception e) {
+                throw new HadoopAccessorException(ErrorCode.E0759, e);
+            }
+        }
+    }
+
+    private static void readFsConfigFromOozieSite(Configuration conf, URI uri) {
+        final String fsElFunctionsConfWithScheme = FS_EL_FUNCTIONS_CONF + uri.getScheme();
+        final String customELFsProperties = ConfigurationService.get(fsElFunctionsConfWithScheme);
+
+        for (final String entry : customELFsProperties.split(",")) {
+            final String[] nameAndValue = entry.trim().split("=", 2);
+            if (nameAndValue.length < 2) {
+                continue;
+            }
+            putKeyToConfIfAllowed(conf, nameAndValue[0], nameAndValue[1]);
+        }
+    }
+
+    private static void readFsConfigFromWorkflow(WorkflowJob workflow, Configuration conf, URI uri) throws Exception {
+        if (workflow.getConf() == null) {
+            return;
+        }
+        final String FS_EL_FUNCTIONS_CONF_WITH_SCHEME = FS_EL_FUNCTIONS_CONF + uri.getScheme() + ".";
+        final XConfiguration workflowConf = new XConfiguration(new StringReader(workflow.getConf()));
+        for (Object _key : workflowConf.toProperties().keySet()) {
+            String key = (String) _key;
+            if (!key.startsWith(FS_EL_FUNCTIONS_CONF_WITH_SCHEME)) {
+                continue;
+            }
+            putKeyToConfIfAllowed(conf, key.substring(FS_EL_FUNCTIONS_CONF_WITH_SCHEME.length()), workflowConf.get(key));
+        }
+    }
+
+    private static void putKeyToConfIfAllowed(Configuration conf, String key, String value) {
+        if (!JavaActionExecutor.DISALLOWED_PROPERTIES.contains(key)) {
+            conf.set(key, value);
+        }
+    }
+
     /**
      * Get file status.
      *
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index a11c2e753..01c1095fe 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -826,6 +826,62 @@ will be the requeue interval for the actions which are waiting for a long time w
         </description>
     </property>
 
+    <!-- Have default empty value for every file system enumerated in oozie.service.HadoopAccessorService.supported.filesystems -->
+
+    <property>
+        <name>FsELFunctions.conf.fs.hdfs</name>
+        <value></value>
+        <description>
+            You can configure custom hdfs file system properties for EL functions globally.
+            Value shall be a comma separated list of key=value pairs.
+        </description>
+    </property>
+
+    <property>
+        <name>FsELFunctions.conf.fs.hftp</name>
+        <value></value>
+        <description>
+            You can configure custom hftp file system properties for EL functions globally.
+            Value shall be a comma separated list of key=value pairs.
+        </description>
+    </property>
+
+    <property>
+        <name>FsELFunctions.conf.fs.abfs</name>
+        <value></value>
+        <description>
+            You can configure custom abfs file system properties for EL functions globally.
+            Value shall be a comma separated list of key=value pairs.
+        </description>
+    </property>
+
+    <property>
+        <name>FsELFunctions.conf.fs.abfss</name>
+        <value></value>
+        <description>
+            You can configure custom abfss file system properties for EL functions globally.
+            Value shall be a comma separated list of key=value pairs.
+        </description>
+    </property>
+
+    <property>
+        <name>FsELFunctions.conf.fs.s3</name>
+        <value></value>
+        <description>
+            You can configure custom s3 file system properties for EL functions globally.
+            Value shall be a comma separated list of key=value pairs.
+        </description>
+    </property>
+
+    <property>
+        <name>FsELFunctions.conf.fs.webhdfs</name>
+        <value></value>
+        <description>
+            You can configure custom webhdfs file system properties for EL functions globally.
+            Value shall be a comma separated list of key=value pairs.
+        </description>
+    </property>
+
     <property>
         <name>oozie.service.WorkflowAppService.WorkflowDefinitionMaxLength</name>
         <value>100000</value>
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java
index 7b8187e0f..bab5243e6 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java
@@ -30,6 +30,7 @@ import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.DagELFunctions;
 import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.workflow.lite.EndNodeDef;
 import org.apache.oozie.workflow.lite.LiteWorkflowApp;
@@ -43,28 +44,33 @@ import org.apache.oozie.util.XConfiguration;
 
 public class TestFsELFunctions extends XFsTestCase {
 
+    public static final String PASSWORD_FILE_KEY = "hadoop.security.credstore.java-keystore-provider.password-file";
+    public static final String CREDENTIAL_PATH_KEY = "hadoop.security.credential.provider.path";
+    public static final String HDFS_FILE = "hdfs://path/to/file";
+    public static final String JCEKS_FILE = "jceks://path/to/file";
+    public static final String PASSWORD_FILE = "password.file";
+    public static final String KEY_WE_DONOT_WANT = "key.shall.not.used";
+    public static final String HDFS_ELF_FS_CONF_PREFIX = FsELFunctions.FS_EL_FUNCTIONS_CONF + "hdfs";
+    private Configuration jobConf;
+    private Configuration protoConf;
+    private Configuration conf;
+    private LiteWorkflowInstance job;
+    private WorkflowJobBean wf;
+    private FileSystem fs;
+
     @Override
     protected void setUp() throws Exception {
         super.setUp();
         new Services().init();
-    }
 
-    @Override
-    protected void tearDown() throws Exception {
-        Services.get().destroy();
-        super.tearDown();
-    }
-
-    public void testFunctions() throws Exception {
         String file1 = new Path(getFsTestCaseDir(), "file1").toString();
         String file2 = new Path(getFsTestCaseDir(), "file2").toString();
         String dir = new Path(getFsTestCaseDir(), "dir").toString();
-        Configuration protoConf = new Configuration();
+        protoConf = new Configuration();
         protoConf.set(OozieClient.USER_NAME, getTestUser());
         protoConf.set("hadoop.job.ugi", getTestUser() + "," + "group");
 
-
-        FileSystem fs = getFileSystem();
+        fs = getFileSystem();
         fs.mkdirs(new Path(dir));
         fs.create(new Path(file1)).close();
         OutputStream os = fs.create(new Path(dir, "a"));
@@ -83,7 +89,7 @@ public class TestFsELFunctions extends XFsTestCase {
         String dirExtraSlashInPath = new URI(dirURI.getScheme(), dirURI.getAuthority(),
                 "/" + dirURI.getPath(), null, null).toString();
 
-        Configuration conf = new XConfiguration();
+        conf = new XConfiguration();
         conf.set(OozieClient.APP_PATH, "appPath");
         conf.set(OozieClient.USER_NAME, getTestUser());
 
@@ -100,11 +106,11 @@ public class TestFsELFunctions extends XFsTestCase {
 
         LiteWorkflowApp def =
                 new LiteWorkflowApp("name", "<workflow-app/>",
-                                    new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end")).
-                    addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
-        LiteWorkflowInstance job = new LiteWorkflowInstance(def, conf, "wfId");
+                        new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end")).
+                        addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
+        job = new LiteWorkflowInstance(def, conf, "wfId");
 
-        WorkflowJobBean wf = new WorkflowJobBean();
+        wf = new WorkflowJobBean();
         wf.setId(job.getId());
         wf.setAppName("name");
         wf.setAppPath("appPath");
@@ -114,7 +120,15 @@ public class TestFsELFunctions extends XFsTestCase {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         protoConf.writeXml(baos);
         wf.setProtoActionConf(baos.toString(StandardCharsets.UTF_8.name()));
+    }
 
+    @Override
+    protected void tearDown() throws Exception {
+        Services.get().destroy();
+        super.tearDown();
+    }
+
+    public void testFunctions() throws Exception {
         WorkflowActionBean action = new WorkflowActionBean();
         action.setId("actionId");
         action.setName("actionName");
@@ -140,4 +154,60 @@ public class TestFsELFunctions extends XFsTestCase {
                 3, (int) eval.evaluate("${fs:dirSize(wf:conf('dirExtraSlashInPath'))}", Integer.class));
     }
 
+    public void testCustomFileSystemPropertiesCanBeSet() throws Exception {
+        jobConf = new Configuration();
+        jobConf.set(HDFS_ELF_FS_CONF_PREFIX + "." + CREDENTIAL_PATH_KEY, JCEKS_FILE);
+        jobConf.set(HDFS_ELF_FS_CONF_PREFIX + "." + PASSWORD_FILE_KEY, PASSWORD_FILE);
+        jobConf.set("hadoop.irrelevant.configuration", "value");
+        jobConf.set("user.name", "Malice");
+        jobConf.set(HDFS_ELF_FS_CONF_PREFIX + "user.name", "Malice");
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        jobConf.writeXml(baos);
+        wf.setConf(baos.toString(StandardCharsets.UTF_8.name()));
+
+        Configuration resultFsConf = new Configuration(false);
+        FsELFunctions.extractExtraFsConfiguration(wf, resultFsConf, fs.getUri());
+        assertEquals(JCEKS_FILE, resultFsConf.get(CREDENTIAL_PATH_KEY));
+        assertEquals(PASSWORD_FILE, resultFsConf.get(PASSWORD_FILE_KEY));
+        assertNull("Irrelevant property shall not be set.", resultFsConf.get("hadoop.irrelevant.configuration"));
+        assertNull("Disallowed property shall not be set.", resultFsConf.get("user.name"));
+
+    }
+
+    public void testOozieSiteConfigRead() throws Exception {
+        Configuration cnf = new Configuration(false);
+        URI uri = new URI(HDFS_FILE);
+        ConfigurationService.set(HDFS_ELF_FS_CONF_PREFIX,
+                CREDENTIAL_PATH_KEY + "=" + JCEKS_FILE + "," + PASSWORD_FILE_KEY + "=" + PASSWORD_FILE);
+        ConfigurationService.set(FsELFunctions.FS_EL_FUNCTIONS_CONF + "hdfsx", KEY_WE_DONOT_WANT + "=value");
+
+        jobConf = new Configuration();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        jobConf.writeXml(baos);
+        wf.setConf(baos.toString(StandardCharsets.UTF_8.name()));
+
+        FsELFunctions.extractExtraFsConfiguration(wf, cnf, uri);
+
+        assertEquals(JCEKS_FILE, cnf.get(CREDENTIAL_PATH_KEY));
+        assertEquals(PASSWORD_FILE, cnf.get(PASSWORD_FILE_KEY));
+        assertNull(cnf.get(KEY_WE_DONOT_WANT));
+    }
+
+    public void testIfWorkflowConfOverwritesSiteConf() throws Exception {
+        Configuration cnf = new Configuration(false);
+        URI uri = new URI(HDFS_FILE);
+        String KEY_TO_OVERRIDE = CREDENTIAL_PATH_KEY;
+        ConfigurationService.set(HDFS_ELF_FS_CONF_PREFIX, KEY_TO_OVERRIDE + "=" + JCEKS_FILE);
+
+        jobConf = new Configuration();
+        jobConf.set(HDFS_ELF_FS_CONF_PREFIX  + "." + KEY_TO_OVERRIDE, "Desired value");
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        jobConf.writeXml(baos);
+        wf.setConf(baos.toString(StandardCharsets.UTF_8.name()));
+
+        FsELFunctions.extractExtraFsConfiguration(wf, cnf, uri);
+
+        assertEquals("Desired value", cnf.get(CREDENTIAL_PATH_KEY));
+    }
 }
diff --git a/docs/src/site/markdown/WorkflowFunctionalSpec.md b/docs/src/site/markdown/WorkflowFunctionalSpec.md
index 136f1d77c..3e63d662e 100644
--- a/docs/src/site/markdown/WorkflowFunctionalSpec.md
+++ b/docs/src/site/markdown/WorkflowFunctionalSpec.md
@@ -2288,6 +2288,29 @@ It returns the size in bytes of specified file. If the path is not a file, or if
 
 It returns the block size in bytes of specified file. If the path is not a file, or if it does not exist it returns -1.
 
+#### 4.2.7.1 File system configuration for EL functions
+
+There is a use case when you need to access special file systems to check data availability and you need to have
+file system principal and key file set. You can have them configured globally in oozie-site.xml or per workflow run
+through job.properties file.
+
+Example of global configuration to have all the abfss:// access via EL functions configured:
+```
+<property>
+        <name>FsELFunctions.conf.fs.abfss</name>
+        <value>
+            hadoop.security.credstore.java-keystore-provider.password-file=password_file_name,
+            hadoop.security.credential.provider.path=jceks:///path/to/file
+        </value>
+    </property>
+```
+
+Example of per workflow configuration via job.properties:
+```
+FsELFunctions.conf.fs.abfss.hadoop.security.credstore.java-keystore-provider.password-file=password_file_name,
+FsELFunctions.conf.fs.abfss.hadoop.security.credential.provider.path=jceks:///path/to/file
+```
+
 #### 4.2.8 HCatalog EL Functions
 
 For all the functions in this section the URI must be a hcatalog URI identifying a table or set of partitions in a table.
diff --git a/release-log.txt b/release-log.txt
index 8c658ef7b..372837af3 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.3.0 release (trunk - unreleased)
 
+OOZIE-3606 Extend file system EL functions to use custom file system properties (jmakai via dionusos)
 OOZIE-3677 Oozie should accept a keyStoreType and trustStoreType property in oozie-site.xml (jmakai via dionusos)
 OOZIE-3678 Reduce the number of NameNode access when starting the Yarn job (jmakai via dionusos)
 OOZIE-3670 Actions can stuck while running in a Fork-Join workflow (jmakai via dionusos)