You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by zh...@apache.org on 2019/11/12 09:11:09 UTC

[submarine] branch master updated: SUBMARINE-67. Add tests to Localizer class

This is an automated email from the ASF dual-hosted git repository.

zhouquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new d2045ce  SUBMARINE-67. Add tests to Localizer class
d2045ce is described below

commit d2045ce9e484c51da270f57b1bf7fc98c187c522
Author: Adam Antal <ad...@cloudera.com>
AuthorDate: Fri Oct 25 15:12:32 2019 +0200

    SUBMARINE-67. Add tests to Localizer class
    
    ### What is this PR for?
    The goal is to add tests for the Localizer class. Also the class itself has been refactored a bit (methods from `handleLocalizations` have been decoupled).
    
    ### What type of PR is it?
    Refactoring | Test
    
    ### Todos
    No TODOs.
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/SUBMARINE-67
    
    ### How should this be tested?
    * No integration tests is needed.
    * The new unit tests should pass.
    
    ### Screenshots (if appropriate)
    Not needed.
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Adam Antal <ad...@cloudera.com>
    
    Closes #64 from adamantal/SUBMARINE-67 and squashes the following commits:
    
    f07b599 [Adam Antal] SUBMARINE-67. Add tests to Localizer class
---
 .../submitter/yarnservice/utils/Localizer.java     | 222 +++++++++-----
 .../submitter/yarnservice/utils/LocalizerTest.java | 340 +++++++++++++++++++++
 2 files changed, 484 insertions(+), 78 deletions(-)

diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/server/submitter/yarnservice/utils/Localizer.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/server/submitter/yarnservice/utils/Localizer.java
index 34cc0c7..1fcdc2a 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/server/submitter/yarnservice/utils/Localizer.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/server/submitter/yarnservice/utils/Localizer.java
@@ -21,6 +21,7 @@ package org.apache.submarine.server.submitter.yarnservice.utils;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.service.api.records.ConfigFile;
+import org.apache.hadoop.yarn.service.api.records.ConfigFile.TypeEnum;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.submarine.client.cli.param.Localization;
 import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
@@ -63,7 +64,7 @@ public class Localizer {
    * If remoteUri is directory, we'll download it, zip it and upload
    * to HDFS.
    * If localFilePath is ".", we'll use remoteUri's file/dir name
-   * */
+   */
   public void handleLocalizations(Service service)
       throws IOException {
     // Handle localizations
@@ -71,22 +72,113 @@ public class Localizer {
         remoteDirectoryManager.getJobStagingArea(
             parameters.getName(), true);
     List<Localization> localizations = parameters.getLocalizations();
-    String remoteUri;
-    String containerLocalPath;
 
     // Check to fail fast
-    for (Localization loc : localizations) {
-      remoteUri = loc.getRemoteUri();
+    checkFilesExist(localizations);
+
+    // Start download remote if needed and upload to HDFS
+    for (Localization localization : localizations) {
+      LocalizationState localizationState = new LocalizationState(localization,
+          remoteDirectoryManager);
+      Path resourceToLocalize = new Path(localizationState.remoteUri);
+      String sourceFile = determineSourceFile(localizationState);
+
+      if (localizationState.needUploadToHDFS) {
+        resourceToLocalize =
+            fsOperations.uploadToRemoteFile(stagingDir, sourceFile);
+      }
+      if (localizationState.needToDeleteTempFile) {
+        fsOperations.deleteFiles(sourceFile);
+      }
+      // Remove .zip from zipped dir name
+      if (isZippedArchive(sourceFile, localizationState.destFileType)) {
+        // Delete local zip file
+        fsOperations.deleteFiles(sourceFile);
+        sourceFile = getNameUntilUnderscore(sourceFile);
+      }
+
+      String containerLocalPath = localizationState.containerLocalPath;
+      // If provided, use the name of local uri
+      if (!containerLocalPath.equals(".")
+          && !containerLocalPath.equals("./")) {
+        // Change the YARN localized file name to what will be used in container
+        sourceFile = getLastNameFromPath(containerLocalPath);
+      }
+      String localizedName = getLastNameFromPath(sourceFile);
+      LOG.info("The file or directory to be localized is {}. " +
+          "Its localized filename will be {}",
+          resourceToLocalize.toString(), localizedName);
+      ConfigFile configFile = new ConfigFile()
+          .srcFile(resourceToLocalize.toUri().toString())
+          .destFile(localizedName)
+          .type(localizationState.destFileType);
+      service.getConfiguration().getFiles().add(configFile);
+
+      if (containerLocalPath.startsWith("/")) {
+        addToMounts(service, localization, containerLocalPath, sourceFile);
+      }
+    }
+  }
+
+  private String determineSourceFile(LocalizationState localizationState) throws IOException {
+    if (localizationState.directory) {
+      // Special handling of remoteUri directory.
+      return fsOperations.downloadAndZip(localizationState.remoteUri,
+          getLastNameFromPath(localizationState.remoteUri), true);
+    } else if (localizationState.remote &&
+        !needHdfs(localizationState.remoteUri)) {
+      // Non HDFS remote URI.
+      // Non directory, we don't need to zip
+      return fsOperations.downloadAndZip(localizationState.remoteUri,
+          getLastNameFromPath(localizationState.remoteUri), false);
+    }
+    return localizationState.remoteUri;
+  }
+
+  private static String getNameUntilUnderscore(String sourceFile) {
+    int suffixIndex = sourceFile.lastIndexOf('_');
+    if (suffixIndex == -1) {
+      throw new IllegalStateException(String.format(
+          "Vale of archive filename"
+              + " supposed to contain an underscore. Filename was: '%s'",
+          sourceFile));
+    }
+    sourceFile = sourceFile.substring(0, suffixIndex);
+    return sourceFile;
+  }
+
+  private static boolean isZippedArchive(String sourceFile,
+      TypeEnum destFileType) {
+    return destFileType == TypeEnum.ARCHIVE
+        && sourceFile.endsWith(".zip");
+  }
+
+  // set mounts
+  // if mount path is absolute, just use it.
+  // if relative, no need to mount explicitly
+  private static void addToMounts(Service service, Localization loc,
+      String containerLocalPath, String sourceFile) {
+    String mountStr = getLastNameFromPath(sourceFile) + ":"
+        + containerLocalPath + ":" + loc.getMountPermission();
+    LOG.info("Add bind-mount string {}", mountStr);
+    appendToEnv(service,
+        EnvironmentUtilities.ENV_DOCKER_MOUNTS_FOR_CONTAINER_RUNTIME,
+        mountStr, ",");
+  }
+
+  private void checkFilesExist(List<Localization> localizations)
+      throws IOException {
+    String remoteUri;
+    for (Localization localization : localizations) {
+      remoteUri = localization.getRemoteUri();
       Path resourceToLocalize = new Path(remoteUri);
-      // Check if remoteUri exists
+
       if (remoteDirectoryManager.isRemote(remoteUri)) {
-        // check if exists
         if (!remoteDirectoryManager.existsRemoteFile(resourceToLocalize)) {
           throw new FileNotFoundException(
               "File " + remoteUri + " doesn't exists.");
         }
       } else {
-        // Check if exists
         File localFile = new File(remoteUri);
         if (!localFile.exists()) {
           throw new FileNotFoundException(
@@ -96,78 +188,52 @@ public class Localizer {
       // check remote file size
       fsOperations.validFileSize(remoteUri);
     }
-    // Start download remote if needed and upload to HDFS
-    for (Localization loc : localizations) {
-      remoteUri = loc.getRemoteUri();
-      containerLocalPath = loc.getLocalPath();
-      String srcFileStr = remoteUri;
-      ConfigFile.TypeEnum destFileType = ConfigFile.TypeEnum.STATIC;
-      Path resourceToLocalize = new Path(remoteUri);
-      boolean needUploadToHDFS = true;
-
-
-      // Special handling of remoteUri directory
-      boolean needDeleteTempFile = false;
-      if (remoteDirectoryManager.isDir(remoteUri)) {
-        destFileType = ConfigFile.TypeEnum.ARCHIVE;
-        srcFileStr = fsOperations.downloadAndZip(
-            remoteUri, getLastNameFromPath(srcFileStr), true);
-      } else if (remoteDirectoryManager.isRemote(remoteUri)) {
-        if (!needHdfs(remoteUri)) {
-          // Non HDFS remote uri. Non directory, no need to zip
-          srcFileStr = fsOperations.downloadAndZip(
-              remoteUri, getLastNameFromPath(srcFileStr), false);
-          needDeleteTempFile = true;
-        } else {
-          // HDFS file, no need to upload
-          needUploadToHDFS = false;
-        }
-      }
+  }
 
-      // Upload file to HDFS
-      if (needUploadToHDFS) {
-        resourceToLocalize =
-            fsOperations.uploadToRemoteFile(stagingDir, srcFileStr);
-      }
-      if (needDeleteTempFile) {
-        fsOperations.deleteFiles(srcFileStr);
-      }
-      // Remove .zip from zipped dir name
-      if (destFileType == ConfigFile.TypeEnum.ARCHIVE
-          && srcFileStr.endsWith(".zip")) {
-        // Delete local zip file
-        fsOperations.deleteFiles(srcFileStr);
-        int suffixIndex = srcFileStr.lastIndexOf('_');
-        srcFileStr = srcFileStr.substring(0, suffixIndex);
-      }
-      // If provided, use the name of local uri
-      if (!containerLocalPath.equals(".")
-          && !containerLocalPath.equals("./")) {
-        // Change the YARN localized file name to what'll used in container
-        srcFileStr = getLastNameFromPath(containerLocalPath);
-      }
-      String localizedName = getLastNameFromPath(srcFileStr);
-      LOG.info("The file/dir to be localized is {}",
-          resourceToLocalize.toString());
-      LOG.info("Its localized file name will be {}", localizedName);
-      service.getConfiguration().getFiles().add(new ConfigFile().srcFile(
-          resourceToLocalize.toUri().toString()).destFile(localizedName)
-          .type(destFileType));
-      // set mounts
-      // if mount path is absolute, just use it.
-      // if relative, no need to mount explicitly
-      if (containerLocalPath.startsWith("/")) {
-        String mountStr = getLastNameFromPath(srcFileStr) + ":"
-            + containerLocalPath + ":" + loc.getMountPermission();
-        LOG.info("Add bind-mount string {}", mountStr);
-        appendToEnv(service,
-            EnvironmentUtilities.ENV_DOCKER_MOUNTS_FOR_CONTAINER_RUNTIME,
-            mountStr, ",");
-      }
-    }
+  private enum LocalizationType {
+    REMOTE_FILE, REMOTE_DIRECTORY, LOCAL_FILE, LOCAL_DIRECTORY
+  }
+
+  private static String getLastNameFromPath(String sourceFile) {
+    return new Path(sourceFile).getName();
   }
 
-  private String getLastNameFromPath(String srcFileStr) {
-    return new Path(srcFileStr).getName();
+  private static class LocalizationState {
+    private final String remoteUri;
+    private final LocalizationType localizationType;
+    private final boolean needHdfs;
+    private final boolean needUploadToHDFS;
+    private final boolean needToDeleteTempFile;
+    private final String containerLocalPath;
+    private final TypeEnum destFileType;
+    private final boolean directory;
+    private final boolean remote;
+
+    LocalizationState(Localization localization,
+        RemoteDirectoryManager remoteDirectoryManager) throws IOException {
+      this.remoteUri = localization.getRemoteUri();
+      this.directory = remoteDirectoryManager.isDir(remoteUri);
+      this.remote = remoteDirectoryManager.isRemote(remoteUri);
+      this.localizationType = determineLocalizationType(directory, remote);
+      this.needHdfs = determineNeedHdfs(remote);
+      //HDFS file don't need to be uploaded
+      this.needUploadToHDFS =
+          directory || (remote && !this.needHdfs) || !remote;
+      this.needToDeleteTempFile = remote && !this.needHdfs;
+      this.containerLocalPath = localization.getLocalPath();
+      this.destFileType = directory ? TypeEnum.ARCHIVE : TypeEnum.STATIC;
+    }
+
+    private boolean determineNeedHdfs(boolean remote) {
+      return remote && needHdfs(remoteUri);
+    }
+
+    private LocalizationType determineLocalizationType(boolean directory, boolean remote) {
+      if (directory) {
+        return remote ? LocalizationType.REMOTE_DIRECTORY : LocalizationType.LOCAL_DIRECTORY;
+      } else {
+        return remote ? LocalizationType.REMOTE_FILE : LocalizationType.LOCAL_FILE;
+      }
+    }
   }
 }
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/server/submitter/yarnservice/utils/LocalizerTest.java b/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/server/submitter/yarnservice/utils/LocalizerTest.java
new file mode 100644
index 0000000..db2f54e
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/server/submitter/yarnservice/utils/LocalizerTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.yarnservice.utils;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.service.api.records.ConfigFile;
+import org.apache.hadoop.yarn.service.api.records.ConfigFile.TypeEnum;
+import org.apache.hadoop.yarn.service.api.records.Configuration;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.submarine.FileUtilitiesForTests;
+import org.apache.submarine.client.cli.param.Localization;
+import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
+import org.apache.submarine.commons.runtime.fs.RemoteDirectoryManager;
+import org.apache.submarine.server.submitter.yarnservice.FileSystemOperations;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class LocalizerTest {
+
+  public static final String DEFAULT_TEMPFILE = "testFile";
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Mock
+  private RemoteDirectoryManager remoteDirectoryManager;
+
+  @Mock
+  private FileSystemOperations fsOperations;
+
+  @Mock
+  private Service service;
+
+  private FileUtilitiesForTests fileUtils = new FileUtilitiesForTests();
+
+  private void setupService() {
+    Configuration conf = new Configuration();
+    conf.setFiles(Lists.newArrayList());
+    when(service.getConfiguration()).thenReturn(conf);
+  }
+
+  private Localizer createLocalizerWithLocalizations(
+      Localization... localizations) {
+    RunJobParameters parameters = mock(RunJobParameters.class);
+    when(parameters.getLocalizations())
+        .thenReturn(Lists.newArrayList(localizations));
+    return new Localizer(fsOperations, remoteDirectoryManager, parameters);
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    setupService();
+    fileUtils.setup();
+
+    when(remoteDirectoryManager.getJobStagingArea(any(), anyBoolean()))
+        .thenReturn(new Path("stagingarea"));
+  }
+
+  private void testLocalizeExistingRemoteFileInternal() throws IOException {
+    when(remoteDirectoryManager.isRemote(anyString())).thenReturn(true);
+    when(remoteDirectoryManager.existsRemoteFile(any(Path.class)))
+        .thenReturn(true);
+
+    Localization localization = new Localization();
+    localization.setLocalPath(".");
+    localization.setRemoteUri("hdfs://dummy");
+    Localizer localizer = createLocalizerWithLocalizations(localization);
+
+    localizer.handleLocalizations(service);
+    verify(fsOperations).validFileSize(anyString());
+  }
+
+  private String testLocalizeExistingLocalFileInternal(String localPath)
+      throws IOException {
+    File testFile = fileUtils.createFileInTempDir(DEFAULT_TEMPFILE);
+
+    when(remoteDirectoryManager.isRemote(anyString())).thenReturn(false);
+    String remoteUri = testFile.getAbsolutePath();
+    when(fsOperations.uploadToRemoteFile(any(Path.class), anyString()))
+        .thenReturn(new Path(remoteUri));
+
+    Localization localization = new Localization();
+    localization.setLocalPath(localPath);
+    localization.setRemoteUri(remoteUri);
+    Localizer localizer = createLocalizerWithLocalizations(localization);
+
+    localizer.handleLocalizations(service);
+    verify(fsOperations).validFileSize(anyString());
+
+    return remoteUri;
+  }
+
+  @After
+  public void teardown() throws IOException {
+    fileUtils.teardown();
+  }
+
+  @Test(expected = FileNotFoundException.class)
+  public void testLocalizeNotExistingRemoteFile() throws IOException {
+    when(remoteDirectoryManager.isRemote(anyString())).thenReturn(true);
+    when(remoteDirectoryManager.existsRemoteFile(any(Path.class)))
+        .thenReturn(false);
+
+    Localization localization = new Localization();
+    localization.setRemoteUri("hdfs://dummy");
+    Localizer localizer = createLocalizerWithLocalizations(localization);
+
+    localizer.handleLocalizations(service);
+  }
+
+  @Test(expected = FileNotFoundException.class)
+  public void testLocalizeNotExistingLocalFile() throws IOException {
+    when(remoteDirectoryManager.isRemote(anyString())).thenReturn(false);
+
+    Localization localization = new Localization();
+    localization.setRemoteUri("file://dummy");
+    Localizer localizer = createLocalizerWithLocalizations(localization);
+
+    localizer.handleLocalizations(service);
+  }
+
+  @Test(expected = IOException.class)
+  public void testLocalizeExistingRemoteFileInvalidFileSize()
+      throws IOException {
+    doThrow(IOException.class).when(fsOperations).validFileSize(anyString());
+    testLocalizeExistingRemoteFileInternal();
+  }
+
+  @Test(expected = IOException.class)
+  public void testLocalizeExistingLocalFileInvalidFileSize()
+      throws IOException {
+    doThrow(IOException.class).when(fsOperations).validFileSize(anyString());
+    testLocalizeExistingLocalFileInternal(".");
+  }
+
+  @Test
+  public void testLocalizeExistingRemoteFile() throws IOException {
+    testLocalizeExistingRemoteFileInternal();
+    verify(fsOperations, never()).uploadToRemoteFile(any(Path.class),
+        anyString());
+    verify(fsOperations, never()).deleteFiles(anyString());
+
+    List<ConfigFile> files = service.getConfiguration().getFiles();
+    assertEquals(1, files.size());
+
+    ConfigFile configFile = files.get(0);
+    assertEquals(TypeEnum.STATIC, configFile.getType());
+    assertEquals("hdfs://dummy", configFile.getSrcFile());
+  }
+
+  @Test
+  public void testLocalizeExistingLocalFile() throws IOException {
+    String remoteUri = testLocalizeExistingLocalFileInternal(".");
+    verify(fsOperations, never()).deleteFiles(anyString());
+    verify(fsOperations).uploadToRemoteFile(any(Path.class), eq(remoteUri));
+
+    List<ConfigFile> files = service.getConfiguration().getFiles();
+    assertEquals(1, files.size());
+
+    ConfigFile configFile = files.get(0);
+    assertEquals(TypeEnum.STATIC, configFile.getType());
+    assertEquals(remoteUri, configFile.getSrcFile());
+    assertEquals("testFile", configFile.getDestFile());
+  }
+
+  @Test
+  public void testLocalizeExistingLocalFile2() throws IOException {
+    String remoteUri = testLocalizeExistingLocalFileInternal("./");
+    verify(fsOperations, never()).deleteFiles(anyString());
+    verify(fsOperations).uploadToRemoteFile(any(Path.class), eq(remoteUri));
+
+    List<ConfigFile> files = service.getConfiguration().getFiles();
+    assertEquals(1, files.size());
+
+    ConfigFile configFile = files.get(0);
+    assertEquals(TypeEnum.STATIC, configFile.getType());
+    assertEquals(remoteUri, configFile.getSrcFile());
+    assertEquals("testFile", configFile.getDestFile());
+  }
+
+  @Test
+  public void testLocalizeExistingLocalFileAbsolute() throws IOException {
+    String remoteUri =
+        testLocalizeExistingLocalFileInternal("/dummydir/dummyfile");
+    verify(fsOperations, never()).deleteFiles(anyString());
+    verify(fsOperations).uploadToRemoteFile(any(Path.class), eq(remoteUri));
+
+    List<ConfigFile> files = service.getConfiguration().getFiles();
+    assertEquals(1, files.size());
+
+    ConfigFile configFile = files.get(0);
+    assertEquals(TypeEnum.STATIC, configFile.getType());
+    assertEquals(remoteUri, configFile.getSrcFile());
+    assertEquals("dummyfile", configFile.getDestFile());
+
+    assertEquals(1, service.getConfiguration().getEnv().size());
+    String dockerMounts = service.getConfiguration().getEnv()
+        .get(EnvironmentUtilities.ENV_DOCKER_MOUNTS_FOR_CONTAINER_RUNTIME);
+    assertEquals("dummyfile:/dummydir/dummyfile:rw", dockerMounts);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testLocalizeExistingRemoteDirectoryNoUnderscoreInName()
+      throws IOException {
+    when(remoteDirectoryManager.isDir(anyString())).thenReturn(true);
+    when(remoteDirectoryManager.isRemote(anyString())).thenReturn(true);
+    when(remoteDirectoryManager.existsRemoteFile(any(Path.class)))
+        .thenReturn(true);
+    String remoteUri = "hdfs://remotedir1/remotedir2";
+    when(fsOperations.uploadToRemoteFile(any(Path.class), anyString()))
+        .thenReturn(new Path(remoteUri));
+    when(fsOperations.downloadAndZip(anyString(), anyString(), eq(true)))
+        .thenReturn("remotedir2.zip");
+
+    Localization localization = new Localization();
+    localization.setLocalPath("remotedir2");
+    localization.setRemoteUri(remoteUri);
+    Localizer localizer = createLocalizerWithLocalizations(localization);
+
+    localizer.handleLocalizations(service);
+    verify(fsOperations).validFileSize(anyString());
+  }
+
+  @Test
+  public void testLocalizeExistingRemoteDirectory() throws IOException {
+    when(remoteDirectoryManager.isDir(anyString())).thenReturn(true);
+    when(remoteDirectoryManager.isRemote(anyString())).thenReturn(true);
+    when(remoteDirectoryManager.existsRemoteFile(any(Path.class)))
+        .thenReturn(true);
+    String remoteUri = "hdfs://remotedir1/remotedir2";
+    when(fsOperations.uploadToRemoteFile(any(Path.class), anyString()))
+        .thenReturn(new Path(remoteUri));
+    String zipFileName = "remotedir2_221424.zip";
+    when(fsOperations.downloadAndZip(anyString(), anyString(), eq(true)))
+        .thenReturn(zipFileName);
+
+    Localization localization = new Localization();
+    localization.setLocalPath("/remotedir2");
+    localization.setRemoteUri(remoteUri);
+    Localizer localizer = createLocalizerWithLocalizations(localization);
+
+    localizer.handleLocalizations(service);
+
+    verify(fsOperations).validFileSize(anyString());
+    verify(fsOperations).deleteFiles(eq(zipFileName));
+    verify(fsOperations, never()).uploadToRemoteFile(any(Path.class),
+        eq(remoteUri));
+
+    List<ConfigFile> files = service.getConfiguration().getFiles();
+    assertEquals(1, files.size());
+
+    ConfigFile configFile = files.get(0);
+    assertEquals(TypeEnum.ARCHIVE, configFile.getType());
+    assertEquals(remoteUri, configFile.getSrcFile());
+    assertEquals("remotedir2", configFile.getDestFile());
+
+    assertEquals(1, service.getConfiguration().getEnv().size());
+    String dockerMounts = service.getConfiguration().getEnv()
+        .get(EnvironmentUtilities.ENV_DOCKER_MOUNTS_FOR_CONTAINER_RUNTIME);
+    assertEquals("remotedir2:/remotedir2:rw", dockerMounts);
+  }
+
+  @Test
+  public void testLocalizeNonHdfsRemoteUri() throws IOException {
+    when(remoteDirectoryManager.isRemote(anyString())).thenReturn(true);
+    when(remoteDirectoryManager.existsRemoteFile(any(Path.class)))
+        .thenReturn(true);
+    String remoteUri = "remote://remotedir1/remotedir2";
+    when(fsOperations.uploadToRemoteFile(any(Path.class), anyString()))
+        .thenReturn(new Path(remoteUri));
+    String downloadedFileName = "remotedir2_221424";
+    when(fsOperations.downloadAndZip(anyString(), anyString(), eq(false)))
+        .thenReturn(downloadedFileName);
+
+    Localization localization = new Localization();
+    localization.setLocalPath("/remotedir2");
+    localization.setRemoteUri(remoteUri);
+    Localizer localizer = createLocalizerWithLocalizations(localization);
+
+    localizer.handleLocalizations(service);
+
+    verify(fsOperations).validFileSize(anyString());
+    verify(fsOperations).deleteFiles(eq(downloadedFileName));
+    verify(fsOperations).uploadToRemoteFile(any(Path.class),
+        eq(downloadedFileName));
+
+    List<ConfigFile> files = service.getConfiguration().getFiles();
+    assertEquals(1, files.size());
+
+    ConfigFile configFile = files.get(0);
+    assertEquals(TypeEnum.STATIC, configFile.getType());
+    assertEquals(remoteUri, configFile.getSrcFile());
+    assertEquals("remotedir2", configFile.getDestFile());
+
+    assertEquals(1, service.getConfiguration().getEnv().size());
+    String dockerMounts = service.getConfiguration().getEnv()
+        .get(EnvironmentUtilities.ENV_DOCKER_MOUNTS_FOR_CONTAINER_RUNTIME);
+    assertEquals("remotedir2:/remotedir2:rw", dockerMounts);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org