You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by as...@apache.org on 2020/03/11 11:11:13 UTC

[oozie] branch master updated: OOZIE-3586 Oozie spark actions using --keytab fail due to duplicate dist. cache (jmakai via asalamon74)

This is an automated email from the ASF dual-hosted git repository.

asalamon74 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git


The following commit(s) were added to refs/heads/master by this push:
     new 56870ca  OOZIE-3586 Oozie spark actions using --keytab fail due to duplicate dist. cache (jmakai via asalamon74)
56870ca is described below

commit 56870ca33a764730a523bf461822209d633c8c73
Author: Andras Salamon <as...@apache.org>
AuthorDate: Wed Mar 11 12:10:47 2020 +0100

    OOZIE-3586 Oozie spark actions using --keytab fail due to duplicate dist. cache (jmakai via asalamon74)
---
 release-log.txt                                    |  1 +
 .../oozie/action/hadoop/SparkArgsExtractor.java    | 40 +++++++++++++-
 .../org/apache/oozie/action/hadoop/SparkMain.java  | 48 +++++++++++++----
 .../action/hadoop/TestSparkArgsExtractor.java      | 46 ++++++++++++++++
 .../apache/oozie/action/hadoop/TestSparkMain.java  | 62 +++++++++++++++++++++-
 5 files changed, 184 insertions(+), 13 deletions(-)

diff --git a/release-log.txt b/release-log.txt
index a78cbbe..806a0f4 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.3.0 release (trunk - unreleased)
 
+OOZIE-3586 Oozie spark actions using --keytab fail due to duplicate dist. cache (jmakai via asalamon74)
 OOZIE-3592 Do not print misleading SecurityException for successful jobs (matijhs via asalamon74)
 OOZIE-3584 Fork-join action issue when action param cannot be resolved (jmakai via asalamon74)
 OOZIE-3589 Avoid calling copyActionData method multiple times in ReRunXCommand (zuston via asalamon74)
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
index 034a173..9a322da 100644
--- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
@@ -69,6 +69,7 @@ class SparkArgsExtractor {
     private static final String JOB_NAME_OPTION = "--name";
     private static final String CLASS_NAME_OPTION = "--class";
     private static final String VERBOSE_OPTION = "--verbose";
+    private static final String KEYTAB_OPTION = "--keytab";
     private static final String DRIVER_CLASSPATH_OPTION = "--driver-class-path";
     private static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath=";
     private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath=";
@@ -92,6 +93,10 @@ class SparkArgsExtractor {
     static final String SPARK_DEFAULTS_GENERATED_PROPERTIES = "spark-defaults-oozie-generated.properties";
 
     private boolean pySpark = false;
+    boolean isKeytabPresentInSparkArgs = false;
+    boolean isKeytabsFullPathPresentInSparkArgs = false;
+    String keytabSymlinkNameInSparkArgs;
+    String keytabFileNameInSparkArgs;
     private final Configuration actionConf;
 
     SparkArgsExtractor(final Configuration actionConf) {
@@ -253,6 +258,16 @@ class SparkArgsExtractor {
                     userArchives.append(userArchive);
                     addToSparkArgs = false;
                 }
+                if (opt.startsWith(KEYTAB_OPTION)) {
+                    isKeytabPresentInSparkArgs = true;
+                    Path keytabValueInSparkArgs = new Path(sparkOptions.get(i + 1));
+                    if (keytabValueInSparkArgs.isAbsolute()) {
+                        isKeytabsFullPathPresentInSparkArgs = true;
+                        keytabFileNameInSparkArgs = keytabValueInSparkArgs.getName();
+                    } else {
+                        keytabSymlinkNameInSparkArgs = keytabValueInSparkArgs.toString();
+                    }
+                }
                 if (addToSparkArgs) {
                     sparkArgs.add(opt);
                 }
@@ -326,8 +341,16 @@ class SparkArgsExtractor {
         mergeAndAddPropertiesFile(sparkArgs, propertiesFile);
 
         if ((yarnClusterMode || yarnClientMode)) {
-            final Map<String, URI> fixedFileUrisMap =
-                    SparkMain.fixFsDefaultUrisAndFilterDuplicates(DistributedCache.getCacheFiles(actionConf));
+            final Map<String, URI> fixedFileUrisMap;
+            if (isKeytabPresentInSparkArgs) {
+                fixedFileUrisMap =
+                        SparkMain.fixFsDefaultUrisAndFilterDuplicates
+                                (DistributedCache.getCacheFiles(actionConf), geKeytabNotToAdd());
+            } else {
+                fixedFileUrisMap =
+                        SparkMain.fixFsDefaultUrisAndFilterDuplicates
+                                (DistributedCache.getCacheFiles(actionConf));
+            }
             fixedFileUrisMap.put(SparkMain.SPARK_LOG4J_PROPS, new Path(SparkMain.SPARK_LOG4J_PROPS).toUri());
             fixedFileUrisMap.put(SparkMain.HIVE_SITE_CONF, new Path(SparkMain.HIVE_SITE_CONF).toUri());
             addUserDefined(userFiles.toString(), fixedFileUrisMap);
@@ -554,4 +577,17 @@ class SparkArgsExtractor {
             sparkArgs.add(SPARK_YARN_JARS + OPT_SEPARATOR + sparkYarnJar);
         }
     }
+
+    /**
+     * Gets the keytab string which is either the name of the keytab when full path is given or the symlink if not.
+     */
+    private String geKeytabNotToAdd(){
+        String keytabNotToAdd;
+        if (isKeytabsFullPathPresentInSparkArgs) {
+            keytabNotToAdd = keytabFileNameInSparkArgs;
+        } else {
+            keytabNotToAdd = keytabSymlinkNameInSparkArgs;
+        }
+        return keytabNotToAdd;
+    }
 }
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
index bef08e8..e76a584 100644
--- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
@@ -26,6 +26,7 @@ import java.net.URL;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.regex.Pattern;
 
 import org.apache.commons.io.FileUtils;
@@ -219,23 +220,50 @@ public class SparkMain extends LauncherMain {
     }
 
     /**
-     * Convert URIs into the default format which Spark expects
-     * Also filters out duplicate entries
-     * @param files
+     * Convert URIs into the default format which Spark expects.
+     * Filters out duplicate entries.
+     * @param uris
      * @return
      * @throws IOException
      * @throws URISyntaxException
      */
-    static Map<String, URI> fixFsDefaultUrisAndFilterDuplicates(final URI[] files) throws IOException, URISyntaxException {
-        final Map<String, URI> map= new LinkedHashMap<>();
-        if (files == null) {
+    static Map<String, URI> fixFsDefaultUrisAndFilterDuplicates(final URI[] uris)
+            throws IOException, URISyntaxException {
+        final Map<String, URI> map = new LinkedHashMap<>();
+        if (uris == null) {
             return map;
         }
         final FileSystem fs = FileSystem.get(new Configuration(true));
-        for (int i = 0; i < files.length; i++) {
-            final URI fileUri = files[i];
-            final Path p = new Path(fileUri);
-            map.put(p.getName(), HadoopUriFinder.getFixedUri(fs, fileUri));
+        for (URI uri : uris) {
+            final Path p = new Path(uri);
+            map.put(p.getName(), HadoopUriFinder.getFixedUri(fs, uri));
+        }
+        return map;
+    }
+
+    /**
+     * Convert URIs into the default format which Spark expects.
+     * Filters out duplicate entries and checks if the file was added earlier with --keytab option.
+     * @param uris
+     * @param elementNotToAdd
+     * @return
+     * @throws IOException
+     * @throws URISyntaxException
+     */
+    static Map<String, URI> fixFsDefaultUrisAndFilterDuplicates(final URI[] uris, String elementNotToAdd)
+            throws IOException, URISyntaxException {
+        final Map<String, URI> map = new LinkedHashMap<>();
+        if (uris == null) {
+            return map;
+        }
+        final FileSystem fs = FileSystem.get(new Configuration(true));
+        for (URI uri : uris) {
+            final Path p = new Path(uri);
+            URI fullFileUri = HadoopUriFinder.getFixedUri(fs, uri);
+            String symlinkInFile = fullFileUri.getFragment();
+            if (!Objects.equals(symlinkInFile, elementNotToAdd) && !p.getName().equals(elementNotToAdd)) {
+                map.put(p.getName(), fullFileUri);
+            }
         }
         return map;
     }
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
index 0daae59..f79c1a7 100644
--- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
@@ -492,6 +492,52 @@ public class TestSparkArgsExtractor {
         return actionConf;
     }
 
+    @Test
+    public void testKeytabDuplicateWithFileName()
+            throws OozieActionConfiguratorException, IOException, URISyntaxException {
+        final Configuration actionConf = new Configuration();
+
+        actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn");
+        actionConf.set(SparkActionExecutor.SPARK_MODE, "client");
+        actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy");
+        actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File");
+        actionConf.set(SparkActionExecutor.SPARK_DEFAULT_OPTS, "defaultProperty=1\ndefaultProperty2=2\ndefaultProperty3=3");
+        actionConf.set(SparkActionExecutor.SPARK_OPTS,
+                "--principal foobar --keytab /foo/bar.keytab");
+        actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar");
+
+        final String[] mainArgs = {"arg0", "arg1"};
+        SparkArgsExtractor sparkArgsExtractor = new SparkArgsExtractor(actionConf);
+        sparkArgsExtractor.extract(mainArgs);
+        String expectedFileName = "bar.keytab";
+        assertEquals("Error happened while setting keytab presence.", true, sparkArgsExtractor.isKeytabPresentInSparkArgs);
+        assertEquals("Error happened while deciding if keytab full path given or not.",
+                true, sparkArgsExtractor.isKeytabsFullPathPresentInSparkArgs);
+        assertEquals("File name wrongly set.", expectedFileName, sparkArgsExtractor.keytabFileNameInSparkArgs);
+    }
+
+    @Test
+    public void testKeytabDuplicateWithSymlink() throws OozieActionConfiguratorException, IOException, URISyntaxException {
+        final Configuration actionConf = new Configuration();
+
+        actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn");
+        actionConf.set(SparkActionExecutor.SPARK_MODE, "client");
+        actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy");
+        actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File");
+        actionConf.set(SparkActionExecutor.SPARK_DEFAULT_OPTS, "defaultProperty=1\ndefaultProperty2=2\ndefaultProperty3=3");
+        actionConf.set(SparkActionExecutor.SPARK_OPTS,
+                "--principal foobar --keytab foo");
+        actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar");
+        final String[] mainArgs = {"arg0", "arg1"};
+        SparkArgsExtractor sparkArgsExtractor = new SparkArgsExtractor(actionConf);
+        sparkArgsExtractor.extract(mainArgs);
+        String expectedSymlink = "foo";
+        assertEquals("Error happened while setting keytab presence.", true, sparkArgsExtractor.isKeytabPresentInSparkArgs);
+        assertEquals("Error happened while deciding if keytab full path given or not.",
+                false, sparkArgsExtractor.isKeytabsFullPathPresentInSparkArgs);
+        assertEquals("Symlink wrongly set.", expectedSymlink, sparkArgsExtractor.keytabSymlinkNameInSparkArgs);
+    }
+
     private void assertContainsSublist(final List<String> expected, final List<String> actual) {
         final int sublistSize = expected.size();
         assertTrue("actual size is below expected size", actual.size() >= sublistSize);
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
index f6cde18..bf16843 100644
--- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
@@ -25,13 +25,16 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.Map;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.regex.Pattern;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.util.IOUtils;
@@ -148,4 +151,61 @@ public class TestSparkMain extends MainTestCase {
         filesToDelete.add(new File(SparkMain.HIVE_SITE_CONF));
         return filesToDelete;
     }
+
+    public void testFixFsDefaultUrisAndFilterDuplicates() throws URISyntaxException, IOException {
+        URI[] uris = new URI[2];
+        URI uri1 = new URI("/foo/bar.keytab#foo.bar");
+        URI uri2 = new URI("/foo/bar.keytab#bar.foo");
+
+        uris[0] = uri1;
+        uris[1] = uri2;
+
+        Map<String, URI> result1 = SparkMain.fixFsDefaultUrisAndFilterDuplicates(uris);
+        assertEquals("Duplication elimination was not successful. " +
+                "Reason: Keytab added twice, but the result map contained more or less.",
+                1, result1.size());
+    }
+
+    public void testFixFsDefaultUrisAndFilterDuplicatesNoDuplication() throws URISyntaxException, IOException {
+        URI[] uris = new URI[2];
+        URI uri1 = new URI("/bar/foo.keytab#foo.bar");
+        URI uri2 = new URI("/foo/bar.keytab#bar.foo");
+
+        uris[0] = uri1;
+        uris[1] = uri2;
+
+        Map<String, URI> result = SparkMain.fixFsDefaultUrisAndFilterDuplicates(uris);
+        assertEquals("Duplication elimination was not successful. " +
+                "Reason: Two different keytabs were added, but the result map contained more or less.",
+                2,  result.size());
+    }
+
+    public void testFixFsDefaultUrisAndFilterDuplicatesWithKeytabSymNotToAdd() throws URISyntaxException, IOException {
+        URI[] uris = new URI[2];
+        URI uri1 = new URI("/bar/foo.keytab#foo.bar");
+        URI uri2 = new URI("/foo/bar.keytab#bar.foo");
+
+        uris[0] = uri1;
+        uris[1] = uri2;
+
+        Map<String, URI> result = SparkMain.fixFsDefaultUrisAndFilterDuplicates(uris, "foo.bar");
+        assertEquals("Duplication elimination was not successful. " +
+                "Reason: foo.bar exists in the URI array, but the deletion did not happen.",
+                1, result.size());
+    }
+
+    public void testFixFsDefaultUrisAndFilterDuplicatesWithKeytabNameNotToAdd() throws URISyntaxException, IOException {
+        URI[] uris = new URI[2];
+
+        URI uri1 = new URI("/bar/foo.keytab#foo.bar");
+        URI uri2 = new URI("/foo/bar.keytab#bar.foo");
+
+        uris[0] = uri1;
+        uris[1] = uri2;
+
+        Map<String, URI> result = SparkMain.fixFsDefaultUrisAndFilterDuplicates(uris, "foo.keytab");
+        assertEquals("Duplication elimination was not successful. " +
+                "Reason: foo.keytab exists in the URI array, but the deletion did not happen.",
+                1, result.size());
+    }
 }
\ No newline at end of file