You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by as...@apache.org on 2019/10/07 09:42:00 UTC

[oozie] branch master updated: OOZIE-3529 Oozie not supported for s3 as filesystem (dionusos via asalamon74)

This is an automated email from the ASF dual-hosted git repository.

asalamon74 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git


The following commit(s) were added to refs/heads/master by this push:
     new 21794c3  OOZIE-3529 Oozie not supported for s3 as filesystem (dionusos via asalamon74)
21794c3 is described below

commit 21794c3fbd0db3d7c1c8836ace758176629e73eb
Author: Andras Salamon <as...@apache.org>
AuthorDate: Mon Oct 7 11:40:11 2019 +0200

    OOZIE-3529 Oozie not supported for s3 as filesystem (dionusos via asalamon74)
---
 core/pom.xml                                       |  7 ++
 .../oozie/service/HadoopAccessorService.java       | 26 ++++++-
 core/src/main/resources/oozie-default.xml          | 12 ++++
 .../oozie/service/TestHadoopAccessorService.java   | 81 ++++++++++++++++++++++
 .../java/org/apache/oozie/test/XFsTestCase.java    |  4 ++
 docs/src/site/markdown/AG_HadoopConfiguration.md   | 22 ++++++
 release-log.txt                                    |  1 +
 7 files changed, 151 insertions(+), 2 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index d2a211a..cd52c45 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -504,6 +504,13 @@
             <scope>compile</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-aws</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
index 0b53a36..396ad29 100644
--- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
+++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
@@ -100,6 +100,7 @@ public class HadoopAccessorService implements Service {
     public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
     public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
     public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
+    protected static final String FS_PROP_PATTERN = CONF_PREFIX + "fs.%s";
 
     private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
     private static final String DEFAULT_ACTIONNAME = "default";
@@ -619,12 +620,12 @@ public class HadoopAccessorService implements Service {
             }
         }
         validateNameNode(nameNode);
-
+        final Configuration fileSystemConf = extendWithFileSystemSpecificPropertiesIfAny(uri, conf);
         try {
             UserGroupInformation ugi = getUGI(user);
             return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
                 public FileSystem run() throws Exception {
-                    return FileSystem.get(uri, conf);
+                    return FileSystem.get(uri, fileSystemConf);
                 }
             });
         }
@@ -633,6 +634,27 @@ public class HadoopAccessorService implements Service {
         }
     }
 
+    Configuration extendWithFileSystemSpecificPropertiesIfAny(final URI uri, final Configuration conf) {
+        final String fsProps = String.format(FS_PROP_PATTERN, uri.getScheme());
+        final String fsCustomProps = ConfigurationService.get(fsProps);
+        if (fsCustomProps == null || fsCustomProps.length() == 0) {
+            return conf;
+        }
+
+        final Configuration result = new Configuration();
+        XConfiguration.copy(conf, result);
+        for (final String entry : fsCustomProps.split(",")) {
+            final String[] nameAndValue = entry.trim().split("=", 2);
+            if (nameAndValue.length < 2) {
+                LOG.warn(String.format("Configuration for %s cannot be read: %s. Skipping...",
+                        fsProps, Arrays.toString(nameAndValue)));
+                continue;
+            }
+            result.set(nameAndValue[0], nameAndValue[1]);
+        }
+        return result;
+    }
+
     /**
      * Validate Job tracker
      * @param jobTrackerUri job tracker uri
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index f33c7b9..56c5b59 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2219,6 +2219,18 @@ will be the requeue interval for the actions which are waiting for a long time w
         </description>
     </property>
 
+    <property>
+        <name>oozie.service.HadoopAccessorService.fs.s3a</name>
+        <value> </value>
+        <description>
+            You can configure custom s3a file system properties globally.
+            Value shall be a comma separated list of key=value pairs. For example:
+            fs.s3a.fast.upload.buffer=bytebuffer,fs.s3a.impl.disable.cache=true
+            Limitation: the custom file system properties cannot contain comma neither
+            in key nor in value.
+        </description>
+    </property>
+
     <!-- Credentials -->
     <property>
         <name>oozie.credentials.credentialclasses</name>
diff --git a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
index 89ce185..d3cb97a 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
@@ -38,9 +38,11 @@ import java.io.FileOutputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URI;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.util.XConfiguration;
+import org.junit.Assert;
 
 public class TestHadoopAccessorService extends XFsTestCase {
 
@@ -326,4 +328,83 @@ public class TestHadoopAccessorService extends XFsTestCase {
         conf2.addResource(getFileSystem().open(resPath));
         assertEquals("bar", conf2.get("foo"));
     }
+
+    public void testCreateS3WithByteBufferProperties() throws Exception {
+        FileSystem fs = createFileSystemWithCustomProperties(
+                "s3a://somebucket/somedirectory/somefile.txt",
+                String.format(HadoopAccessorService.FS_PROP_PATTERN, "s3a"),
+                "fs.s3a.fast.upload.buffer=bytebuffer,fs.s3a.impl.disable.cache=true");
+        assertEquals("Expected peroperty [bytebuffer] is not set.", "bytebuffer", fs.getConf().get("fs.s3a.fast.upload.buffer"));
+    }
+
+    public void testCreateS3WithDefaultProperties() throws Exception {
+        FileSystem fs = createFileSystemWithCustomProperties("s3a://somebucket/somedirectory/somefile.txt", null, null);
+        assertNotSame("Unxpected peroperty [bytebuffer] is set.", "bytebuffer", fs.getConf().get("fs.s3a.fast.upload.buffer"));
+    }
+
+    public void testCreateHdfsWithoutProperties() throws Exception {
+        createFileSystemWithCustomProperties("hdfs://localhost:1234/somedirectory/somefile.txt", null, null);
+    }
+
+    public void testCreateFileWithoutProperties() throws Exception {
+        createFileSystemWithCustomProperties("file://somebucket/somedirectory/somefile.txt", null, null);
+    }
+
+    public void testCreateWithoutSchemeWithoutProperties() throws Exception {
+        createFileSystemWithCustomProperties("/somebucket/somedirectory/somefile.txt", null, null);
+    }
+
+    public void testCreateHdfsWithInvalidProperties() throws Exception {
+        FileSystem fs = createFileSystemWithCustomProperties("hdfs://localhost:1234/somedirectory/somefile.txt",
+                String.format(HadoopAccessorService.FS_PROP_PATTERN, "hdfs"),
+                "fs.hdfs.custom.property1=value1,fs.hdfs.custom.property2");
+        Assert.assertEquals("value1", fs.getConf().get("fs.hdfs.custom.property1"));
+        Assert.assertEquals(null, fs.getConf().get("fs.hdfs.custom.property2"));
+    }
+
+    public void testCreateHdfsWithEqualSignInValuePropertiy() throws Exception {
+        FileSystem fs = createFileSystemWithCustomProperties("hdfs://localhost:1234/somedirectory/somefile.txt",
+                String.format(HadoopAccessorService.FS_PROP_PATTERN, "hdfs"),
+                "fs.hdfs.custom.property1=value1=value2");
+        Assert.assertEquals("value1=value2", fs.getConf().get("fs.hdfs.custom.property1"));
+        Assert.assertEquals(null, fs.getConf().get("value1"));
+        Assert.assertEquals(null, fs.getConf().get("value2"));
+    }
+
+    public void testCreateHdfsWithCommaSeparatedValues() throws Exception {
+        FileSystem fs = createFileSystemWithCustomProperties("hdfs://localhost:1234/somedirectory/somefile.txt",
+                String.format(HadoopAccessorService.FS_PROP_PATTERN, "hdfs"),
+                "fs.hdfs.custom.property1=value1,value2");
+        Assert.assertEquals("value1", fs.getConf().get("fs.hdfs.custom.property1"));
+        Assert.assertEquals(null, fs.getConf().get("value2"));
+    }
+
+    public void testIfNoCustomFsConfigProvidedBaseConfigRemainsTheSame() throws Exception {
+        HadoopAccessorService has = new HadoopAccessorService();
+        Configuration base = new XConfiguration();
+        base.set("foo.bar", "baz");
+        Configuration result = has.extendWithFileSystemSpecificPropertiesIfAny(new URI("hdfs://localhost:1234/"), base);
+        assertEquals("The two configuration object shall be the same", base, result);
+        assertEquals("Key foo.bar shall be present in result configuration", "baz", result.get("foo.bar"));
+        assertSame("The two configuration object shall be the same", base, result);
+    }
+
+    public FileSystem createFileSystemWithCustomProperties(
+            final String uri, final String fsKey, final String commaSeparatedKeyValues) throws Exception {
+        final HadoopAccessorService has = new HadoopAccessorService();
+        if (fsKey != null && commaSeparatedKeyValues != null) {
+            ConfigurationService.set(fsKey, commaSeparatedKeyValues);
+        }
+        has.init(new Configuration(false));
+        final Configuration conf = has.createConfiguration(null);
+        setS3CredentialProperties(conf);
+        final FileSystem fs = has.createFileSystem("user", new URI(uri), conf);
+        has.destroy();
+        return fs;
+    }
+
+    private void setS3CredentialProperties(final Configuration conf) {
+        conf.set("fs.s3a.access.key", "someAccessKey");
+        conf.set("fs.s3a.secret.key", "someSecretKey");
+    }
 }
diff --git a/core/src/test/java/org/apache/oozie/test/XFsTestCase.java b/core/src/test/java/org/apache/oozie/test/XFsTestCase.java
index c0f3c69..436000c 100644
--- a/core/src/test/java/org/apache/oozie/test/XFsTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XFsTestCase.java
@@ -79,6 +79,7 @@ public abstract class XFsTestCase extends XTestCase {
         conf.set("oozie.service.HadoopAccessorService.hadoop.configurations", "*=hadoop-conf");
         conf.set("oozie.service.HadoopAccessorService.action.configurations", "*=action-conf");
 
+        new Services().init();
         has = new HadoopAccessorService();
         has.init(conf);
         Configuration jobConf = has.createConfiguration(getNameNodeUri());
@@ -128,6 +129,9 @@ public abstract class XFsTestCase extends XTestCase {
      * Tear down the testcase.
      */
     protected void tearDown() throws Exception {
+        if (Services.get() != null) {
+            Services.get().destroy();
+        }
         fileSystem = null;
         fsTestDir = null;
         super.tearDown();
diff --git a/docs/src/site/markdown/AG_HadoopConfiguration.md b/docs/src/site/markdown/AG_HadoopConfiguration.md
index ab71d7c..349cac4 100644
--- a/docs/src/site/markdown/AG_HadoopConfiguration.md
+++ b/docs/src/site/markdown/AG_HadoopConfiguration.md
@@ -84,6 +84,28 @@ By default Oozie defines `*=hadoop-conf` and the default values of the `hadoop-s
     </property>
 </configuration>
 ```
+## File system custom properties
+Some users notified us about issues when they started using Amazon S3A file system -
+see [OOZIE-3529](https://issues.apache.org/jira/browse/OOZIE-3529).
+Oozie from version 5.2.0 supports custom file system properties which can be defined in the following way.
+The example shows how to resolve issues mentioned in OOZIE-3529 by setting the following in oozie-site.xml:
+```
+    <property>
+        <name>oozie.service.HadoopAccessorService.fs.s3a</name>
+        <value>fs.s3a.fast.upload.buffer=bytebuffer,fs.s3a.impl.disable.cache=true</value>
+    </property>
+```
+Use `oozie.service.HadoopAccessorService.fs.%s` where `%s` is the schema of the file system.
+The value shall be a list of key=value pairs separated using a comma (,). You can use properties as describe below:
+ * `property_name=property_value_1=property_value2` will be read as:
+     * name: property_name
+     * value: property_value_1=property_value2
+ * `property1_name=value1,property2_name`
+     * name: property1_name
+     * value: value1
+     * property2_name will be ignored
+ * Limitation: the custom file system properties cannot contain comma neither in key nor in value.
+ See [OOZIE-3547](https://issues.apache.org/jira/browse/OOZIE-3547).
 
 ## Limitations
 
diff --git a/release-log.txt b/release-log.txt
index 4355b04..96f91d9 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.2.0 release (trunk - unreleased)
 
+OOZIE-3529 Oozie not supported for s3 as filesystem (dionusos via asalamon74)
 OOZIE-3465 Migrate from commons-codec (matijhs via asalamon74)
 OOZIE-3533 Flaky test TestXLogService.testLog4jReload (asalamon74 via kmarton)
 OOZIE-3544 Upgrade commons-beanutils to 1.9.4 (matijhs via asalamon74)