You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by zh...@apache.org on 2019/11/12 09:11:09 UTC
[submarine] branch master updated: SUBMARINE-67. Add tests to
Localizer class
This is an automated email from the ASF dual-hosted git repository.
zhouquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new d2045ce SUBMARINE-67. Add tests to Localizer class
d2045ce is described below
commit d2045ce9e484c51da270f57b1bf7fc98c187c522
Author: Adam Antal <ad...@cloudera.com>
AuthorDate: Fri Oct 25 15:12:32 2019 +0200
SUBMARINE-67. Add tests to Localizer class
### What is this PR for?
The goal is to add tests for the Localizer class. Also the class itself has been refactored a bit (methods from `handleLocalizations` have been decoupled).
### What type of PR is it?
Refactoring | Test
### Todos
No TODOs.
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-67
### How should this be tested?
* No integration tests is needed.
* The new unit tests should pass.
### Screenshots (if appropriate)
Not needed.
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Adam Antal <ad...@cloudera.com>
Closes #64 from adamantal/SUBMARINE-67 and squashes the following commits:
f07b599 [Adam Antal] SUBMARINE-67. Add tests to Localizer class
---
.../submitter/yarnservice/utils/Localizer.java | 222 +++++++++-----
.../submitter/yarnservice/utils/LocalizerTest.java | 340 +++++++++++++++++++++
2 files changed, 484 insertions(+), 78 deletions(-)
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/server/submitter/yarnservice/utils/Localizer.java b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/server/submitter/yarnservice/utils/Localizer.java
index 34cc0c7..1fcdc2a 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/server/submitter/yarnservice/utils/Localizer.java
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/main/java/org/apache/submarine/server/submitter/yarnservice/utils/Localizer.java
@@ -21,6 +21,7 @@ package org.apache.submarine.server.submitter.yarnservice.utils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
+import org.apache.hadoop.yarn.service.api.records.ConfigFile.TypeEnum;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.submarine.client.cli.param.Localization;
import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
@@ -63,7 +64,7 @@ public class Localizer {
* If remoteUri is directory, we'll download it, zip it and upload
* to HDFS.
* If localFilePath is ".", we'll use remoteUri's file/dir name
- * */
+ */
public void handleLocalizations(Service service)
throws IOException {
// Handle localizations
@@ -71,22 +72,113 @@ public class Localizer {
remoteDirectoryManager.getJobStagingArea(
parameters.getName(), true);
List<Localization> localizations = parameters.getLocalizations();
- String remoteUri;
- String containerLocalPath;
// Check to fail fast
- for (Localization loc : localizations) {
- remoteUri = loc.getRemoteUri();
+ checkFilesExist(localizations);
+
+ // Start download remote if needed and upload to HDFS
+ for (Localization localization : localizations) {
+ LocalizationState localizationState = new LocalizationState(localization,
+ remoteDirectoryManager);
+ Path resourceToLocalize = new Path(localizationState.remoteUri);
+ String sourceFile = determineSourceFile(localizationState);
+
+ if (localizationState.needUploadToHDFS) {
+ resourceToLocalize =
+ fsOperations.uploadToRemoteFile(stagingDir, sourceFile);
+ }
+ if (localizationState.needToDeleteTempFile) {
+ fsOperations.deleteFiles(sourceFile);
+ }
+ // Remove .zip from zipped dir name
+ if (isZippedArchive(sourceFile, localizationState.destFileType)) {
+ // Delete local zip file
+ fsOperations.deleteFiles(sourceFile);
+ sourceFile = getNameUntilUnderscore(sourceFile);
+ }
+
+ String containerLocalPath = localizationState.containerLocalPath;
+ // If provided, use the name of local uri
+ if (!containerLocalPath.equals(".")
+ && !containerLocalPath.equals("./")) {
+ // Change the YARN localized file name to what will be used in container
+ sourceFile = getLastNameFromPath(containerLocalPath);
+ }
+ String localizedName = getLastNameFromPath(sourceFile);
+ LOG.info("The file or directory to be localized is {}. " +
+ "Its localized filename will be {}",
+ resourceToLocalize.toString(), localizedName);
+ ConfigFile configFile = new ConfigFile()
+ .srcFile(resourceToLocalize.toUri().toString())
+ .destFile(localizedName)
+ .type(localizationState.destFileType);
+ service.getConfiguration().getFiles().add(configFile);
+
+ if (containerLocalPath.startsWith("/")) {
+ addToMounts(service, localization, containerLocalPath, sourceFile);
+ }
+ }
+ }
+
+ private String determineSourceFile(LocalizationState localizationState) throws IOException {
+ if (localizationState.directory) {
+ // Special handling of remoteUri directory.
+ return fsOperations.downloadAndZip(localizationState.remoteUri,
+ getLastNameFromPath(localizationState.remoteUri), true);
+ } else if (localizationState.remote &&
+ !needHdfs(localizationState.remoteUri)) {
+ // Non HDFS remote URI.
+ // Non directory, we don't need to zip
+ return fsOperations.downloadAndZip(localizationState.remoteUri,
+ getLastNameFromPath(localizationState.remoteUri), false);
+ }
+ return localizationState.remoteUri;
+ }
+
+ private static String getNameUntilUnderscore(String sourceFile) {
+ int suffixIndex = sourceFile.lastIndexOf('_');
+ if (suffixIndex == -1) {
+ throw new IllegalStateException(String.format(
+ "Vale of archive filename"
+ + " supposed to contain an underscore. Filename was: '%s'",
+ sourceFile));
+ }
+ sourceFile = sourceFile.substring(0, suffixIndex);
+ return sourceFile;
+ }
+
+ private static boolean isZippedArchive(String sourceFile,
+ TypeEnum destFileType) {
+ return destFileType == TypeEnum.ARCHIVE
+ && sourceFile.endsWith(".zip");
+ }
+
+ // set mounts
+ // if mount path is absolute, just use it.
+ // if relative, no need to mount explicitly
+ private static void addToMounts(Service service, Localization loc,
+ String containerLocalPath, String sourceFile) {
+ String mountStr = getLastNameFromPath(sourceFile) + ":"
+ + containerLocalPath + ":" + loc.getMountPermission();
+ LOG.info("Add bind-mount string {}", mountStr);
+ appendToEnv(service,
+ EnvironmentUtilities.ENV_DOCKER_MOUNTS_FOR_CONTAINER_RUNTIME,
+ mountStr, ",");
+ }
+
+ private void checkFilesExist(List<Localization> localizations)
+ throws IOException {
+ String remoteUri;
+ for (Localization localization : localizations) {
+ remoteUri = localization.getRemoteUri();
Path resourceToLocalize = new Path(remoteUri);
- // Check if remoteUri exists
+
if (remoteDirectoryManager.isRemote(remoteUri)) {
- // check if exists
if (!remoteDirectoryManager.existsRemoteFile(resourceToLocalize)) {
throw new FileNotFoundException(
"File " + remoteUri + " doesn't exists.");
}
} else {
- // Check if exists
File localFile = new File(remoteUri);
if (!localFile.exists()) {
throw new FileNotFoundException(
@@ -96,78 +188,52 @@ public class Localizer {
// check remote file size
fsOperations.validFileSize(remoteUri);
}
- // Start download remote if needed and upload to HDFS
- for (Localization loc : localizations) {
- remoteUri = loc.getRemoteUri();
- containerLocalPath = loc.getLocalPath();
- String srcFileStr = remoteUri;
- ConfigFile.TypeEnum destFileType = ConfigFile.TypeEnum.STATIC;
- Path resourceToLocalize = new Path(remoteUri);
- boolean needUploadToHDFS = true;
-
-
- // Special handling of remoteUri directory
- boolean needDeleteTempFile = false;
- if (remoteDirectoryManager.isDir(remoteUri)) {
- destFileType = ConfigFile.TypeEnum.ARCHIVE;
- srcFileStr = fsOperations.downloadAndZip(
- remoteUri, getLastNameFromPath(srcFileStr), true);
- } else if (remoteDirectoryManager.isRemote(remoteUri)) {
- if (!needHdfs(remoteUri)) {
- // Non HDFS remote uri. Non directory, no need to zip
- srcFileStr = fsOperations.downloadAndZip(
- remoteUri, getLastNameFromPath(srcFileStr), false);
- needDeleteTempFile = true;
- } else {
- // HDFS file, no need to upload
- needUploadToHDFS = false;
- }
- }
+ }
- // Upload file to HDFS
- if (needUploadToHDFS) {
- resourceToLocalize =
- fsOperations.uploadToRemoteFile(stagingDir, srcFileStr);
- }
- if (needDeleteTempFile) {
- fsOperations.deleteFiles(srcFileStr);
- }
- // Remove .zip from zipped dir name
- if (destFileType == ConfigFile.TypeEnum.ARCHIVE
- && srcFileStr.endsWith(".zip")) {
- // Delete local zip file
- fsOperations.deleteFiles(srcFileStr);
- int suffixIndex = srcFileStr.lastIndexOf('_');
- srcFileStr = srcFileStr.substring(0, suffixIndex);
- }
- // If provided, use the name of local uri
- if (!containerLocalPath.equals(".")
- && !containerLocalPath.equals("./")) {
- // Change the YARN localized file name to what'll used in container
- srcFileStr = getLastNameFromPath(containerLocalPath);
- }
- String localizedName = getLastNameFromPath(srcFileStr);
- LOG.info("The file/dir to be localized is {}",
- resourceToLocalize.toString());
- LOG.info("Its localized file name will be {}", localizedName);
- service.getConfiguration().getFiles().add(new ConfigFile().srcFile(
- resourceToLocalize.toUri().toString()).destFile(localizedName)
- .type(destFileType));
- // set mounts
- // if mount path is absolute, just use it.
- // if relative, no need to mount explicitly
- if (containerLocalPath.startsWith("/")) {
- String mountStr = getLastNameFromPath(srcFileStr) + ":"
- + containerLocalPath + ":" + loc.getMountPermission();
- LOG.info("Add bind-mount string {}", mountStr);
- appendToEnv(service,
- EnvironmentUtilities.ENV_DOCKER_MOUNTS_FOR_CONTAINER_RUNTIME,
- mountStr, ",");
- }
- }
+ private enum LocalizationType {
+ REMOTE_FILE, REMOTE_DIRECTORY, LOCAL_FILE, LOCAL_DIRECTORY
+ }
+
+ private static String getLastNameFromPath(String sourceFile) {
+ return new Path(sourceFile).getName();
}
- private String getLastNameFromPath(String srcFileStr) {
- return new Path(srcFileStr).getName();
+ private static class LocalizationState {
+ private final String remoteUri;
+ private final LocalizationType localizationType;
+ private final boolean needHdfs;
+ private final boolean needUploadToHDFS;
+ private final boolean needToDeleteTempFile;
+ private final String containerLocalPath;
+ private final TypeEnum destFileType;
+ private final boolean directory;
+ private final boolean remote;
+
+ LocalizationState(Localization localization,
+ RemoteDirectoryManager remoteDirectoryManager) throws IOException {
+ this.remoteUri = localization.getRemoteUri();
+ this.directory = remoteDirectoryManager.isDir(remoteUri);
+ this.remote = remoteDirectoryManager.isRemote(remoteUri);
+ this.localizationType = determineLocalizationType(directory, remote);
+ this.needHdfs = determineNeedHdfs(remote);
+ //HDFS file don't need to be uploaded
+ this.needUploadToHDFS =
+ directory || (remote && !this.needHdfs) || !remote;
+ this.needToDeleteTempFile = remote && !this.needHdfs;
+ this.containerLocalPath = localization.getLocalPath();
+ this.destFileType = directory ? TypeEnum.ARCHIVE : TypeEnum.STATIC;
+ }
+
+ private boolean determineNeedHdfs(boolean remote) {
+ return remote && needHdfs(remoteUri);
+ }
+
+ private LocalizationType determineLocalizationType(boolean directory, boolean remote) {
+ if (directory) {
+ return remote ? LocalizationType.REMOTE_DIRECTORY : LocalizationType.LOCAL_DIRECTORY;
+ } else {
+ return remote ? LocalizationType.REMOTE_FILE : LocalizationType.LOCAL_FILE;
+ }
+ }
}
}
diff --git a/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/server/submitter/yarnservice/utils/LocalizerTest.java b/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/server/submitter/yarnservice/utils/LocalizerTest.java
new file mode 100644
index 0000000..db2f54e
--- /dev/null
+++ b/submarine-server/server-submitter/submitter-yarnservice/src/test/java/org/apache/submarine/server/submitter/yarnservice/utils/LocalizerTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.yarnservice.utils;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.service.api.records.ConfigFile;
+import org.apache.hadoop.yarn.service.api.records.ConfigFile.TypeEnum;
+import org.apache.hadoop.yarn.service.api.records.Configuration;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.submarine.FileUtilitiesForTests;
+import org.apache.submarine.client.cli.param.Localization;
+import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
+import org.apache.submarine.commons.runtime.fs.RemoteDirectoryManager;
+import org.apache.submarine.server.submitter.yarnservice.FileSystemOperations;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class LocalizerTest {
+
+ public static final String DEFAULT_TEMPFILE = "testFile";
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Mock
+ private RemoteDirectoryManager remoteDirectoryManager;
+
+ @Mock
+ private FileSystemOperations fsOperations;
+
+ @Mock
+ private Service service;
+
+ private FileUtilitiesForTests fileUtils = new FileUtilitiesForTests();
+
+ private void setupService() {
+ Configuration conf = new Configuration();
+ conf.setFiles(Lists.newArrayList());
+ when(service.getConfiguration()).thenReturn(conf);
+ }
+
+ private Localizer createLocalizerWithLocalizations(
+ Localization... localizations) {
+ RunJobParameters parameters = mock(RunJobParameters.class);
+ when(parameters.getLocalizations())
+ .thenReturn(Lists.newArrayList(localizations));
+ return new Localizer(fsOperations, remoteDirectoryManager, parameters);
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ setupService();
+ fileUtils.setup();
+
+ when(remoteDirectoryManager.getJobStagingArea(any(), anyBoolean()))
+ .thenReturn(new Path("stagingarea"));
+ }
+
+ private void testLocalizeExistingRemoteFileInternal() throws IOException {
+ when(remoteDirectoryManager.isRemote(anyString())).thenReturn(true);
+ when(remoteDirectoryManager.existsRemoteFile(any(Path.class)))
+ .thenReturn(true);
+
+ Localization localization = new Localization();
+ localization.setLocalPath(".");
+ localization.setRemoteUri("hdfs://dummy");
+ Localizer localizer = createLocalizerWithLocalizations(localization);
+
+ localizer.handleLocalizations(service);
+ verify(fsOperations).validFileSize(anyString());
+ }
+
+ private String testLocalizeExistingLocalFileInternal(String localPath)
+ throws IOException {
+ File testFile = fileUtils.createFileInTempDir(DEFAULT_TEMPFILE);
+
+ when(remoteDirectoryManager.isRemote(anyString())).thenReturn(false);
+ String remoteUri = testFile.getAbsolutePath();
+ when(fsOperations.uploadToRemoteFile(any(Path.class), anyString()))
+ .thenReturn(new Path(remoteUri));
+
+ Localization localization = new Localization();
+ localization.setLocalPath(localPath);
+ localization.setRemoteUri(remoteUri);
+ Localizer localizer = createLocalizerWithLocalizations(localization);
+
+ localizer.handleLocalizations(service);
+ verify(fsOperations).validFileSize(anyString());
+
+ return remoteUri;
+ }
+
+ @After
+ public void teardown() throws IOException {
+ fileUtils.teardown();
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testLocalizeNotExistingRemoteFile() throws IOException {
+ when(remoteDirectoryManager.isRemote(anyString())).thenReturn(true);
+ when(remoteDirectoryManager.existsRemoteFile(any(Path.class)))
+ .thenReturn(false);
+
+ Localization localization = new Localization();
+ localization.setRemoteUri("hdfs://dummy");
+ Localizer localizer = createLocalizerWithLocalizations(localization);
+
+ localizer.handleLocalizations(service);
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testLocalizeNotExistingLocalFile() throws IOException {
+ when(remoteDirectoryManager.isRemote(anyString())).thenReturn(false);
+
+ Localization localization = new Localization();
+ localization.setRemoteUri("file://dummy");
+ Localizer localizer = createLocalizerWithLocalizations(localization);
+
+ localizer.handleLocalizations(service);
+ }
+
+ @Test(expected = IOException.class)
+ public void testLocalizeExistingRemoteFileInvalidFileSize()
+ throws IOException {
+ doThrow(IOException.class).when(fsOperations).validFileSize(anyString());
+ testLocalizeExistingRemoteFileInternal();
+ }
+
+ @Test(expected = IOException.class)
+ public void testLocalizeExistingLocalFileInvalidFileSize()
+ throws IOException {
+ doThrow(IOException.class).when(fsOperations).validFileSize(anyString());
+ testLocalizeExistingLocalFileInternal(".");
+ }
+
+ @Test
+ public void testLocalizeExistingRemoteFile() throws IOException {
+ testLocalizeExistingRemoteFileInternal();
+ verify(fsOperations, never()).uploadToRemoteFile(any(Path.class),
+ anyString());
+ verify(fsOperations, never()).deleteFiles(anyString());
+
+ List<ConfigFile> files = service.getConfiguration().getFiles();
+ assertEquals(1, files.size());
+
+ ConfigFile configFile = files.get(0);
+ assertEquals(TypeEnum.STATIC, configFile.getType());
+ assertEquals("hdfs://dummy", configFile.getSrcFile());
+ }
+
+ @Test
+ public void testLocalizeExistingLocalFile() throws IOException {
+ String remoteUri = testLocalizeExistingLocalFileInternal(".");
+ verify(fsOperations, never()).deleteFiles(anyString());
+ verify(fsOperations).uploadToRemoteFile(any(Path.class), eq(remoteUri));
+
+ List<ConfigFile> files = service.getConfiguration().getFiles();
+ assertEquals(1, files.size());
+
+ ConfigFile configFile = files.get(0);
+ assertEquals(TypeEnum.STATIC, configFile.getType());
+ assertEquals(remoteUri, configFile.getSrcFile());
+ assertEquals("testFile", configFile.getDestFile());
+ }
+
+ @Test
+ public void testLocalizeExistingLocalFile2() throws IOException {
+ String remoteUri = testLocalizeExistingLocalFileInternal("./");
+ verify(fsOperations, never()).deleteFiles(anyString());
+ verify(fsOperations).uploadToRemoteFile(any(Path.class), eq(remoteUri));
+
+ List<ConfigFile> files = service.getConfiguration().getFiles();
+ assertEquals(1, files.size());
+
+ ConfigFile configFile = files.get(0);
+ assertEquals(TypeEnum.STATIC, configFile.getType());
+ assertEquals(remoteUri, configFile.getSrcFile());
+ assertEquals("testFile", configFile.getDestFile());
+ }
+
+ @Test
+ public void testLocalizeExistingLocalFileAbsolute() throws IOException {
+ String remoteUri =
+ testLocalizeExistingLocalFileInternal("/dummydir/dummyfile");
+ verify(fsOperations, never()).deleteFiles(anyString());
+ verify(fsOperations).uploadToRemoteFile(any(Path.class), eq(remoteUri));
+
+ List<ConfigFile> files = service.getConfiguration().getFiles();
+ assertEquals(1, files.size());
+
+ ConfigFile configFile = files.get(0);
+ assertEquals(TypeEnum.STATIC, configFile.getType());
+ assertEquals(remoteUri, configFile.getSrcFile());
+ assertEquals("dummyfile", configFile.getDestFile());
+
+ assertEquals(1, service.getConfiguration().getEnv().size());
+ String dockerMounts = service.getConfiguration().getEnv()
+ .get(EnvironmentUtilities.ENV_DOCKER_MOUNTS_FOR_CONTAINER_RUNTIME);
+ assertEquals("dummyfile:/dummydir/dummyfile:rw", dockerMounts);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testLocalizeExistingRemoteDirectoryNoUnderscoreInName()
+ throws IOException {
+ when(remoteDirectoryManager.isDir(anyString())).thenReturn(true);
+ when(remoteDirectoryManager.isRemote(anyString())).thenReturn(true);
+ when(remoteDirectoryManager.existsRemoteFile(any(Path.class)))
+ .thenReturn(true);
+ String remoteUri = "hdfs://remotedir1/remotedir2";
+ when(fsOperations.uploadToRemoteFile(any(Path.class), anyString()))
+ .thenReturn(new Path(remoteUri));
+ when(fsOperations.downloadAndZip(anyString(), anyString(), eq(true)))
+ .thenReturn("remotedir2.zip");
+
+ Localization localization = new Localization();
+ localization.setLocalPath("remotedir2");
+ localization.setRemoteUri(remoteUri);
+ Localizer localizer = createLocalizerWithLocalizations(localization);
+
+ localizer.handleLocalizations(service);
+ verify(fsOperations).validFileSize(anyString());
+ }
+
+ @Test
+ public void testLocalizeExistingRemoteDirectory() throws IOException {
+ when(remoteDirectoryManager.isDir(anyString())).thenReturn(true);
+ when(remoteDirectoryManager.isRemote(anyString())).thenReturn(true);
+ when(remoteDirectoryManager.existsRemoteFile(any(Path.class)))
+ .thenReturn(true);
+ String remoteUri = "hdfs://remotedir1/remotedir2";
+ when(fsOperations.uploadToRemoteFile(any(Path.class), anyString()))
+ .thenReturn(new Path(remoteUri));
+ String zipFileName = "remotedir2_221424.zip";
+ when(fsOperations.downloadAndZip(anyString(), anyString(), eq(true)))
+ .thenReturn(zipFileName);
+
+ Localization localization = new Localization();
+ localization.setLocalPath("/remotedir2");
+ localization.setRemoteUri(remoteUri);
+ Localizer localizer = createLocalizerWithLocalizations(localization);
+
+ localizer.handleLocalizations(service);
+
+ verify(fsOperations).validFileSize(anyString());
+ verify(fsOperations).deleteFiles(eq(zipFileName));
+ verify(fsOperations, never()).uploadToRemoteFile(any(Path.class),
+ eq(remoteUri));
+
+ List<ConfigFile> files = service.getConfiguration().getFiles();
+ assertEquals(1, files.size());
+
+ ConfigFile configFile = files.get(0);
+ assertEquals(TypeEnum.ARCHIVE, configFile.getType());
+ assertEquals(remoteUri, configFile.getSrcFile());
+ assertEquals("remotedir2", configFile.getDestFile());
+
+ assertEquals(1, service.getConfiguration().getEnv().size());
+ String dockerMounts = service.getConfiguration().getEnv()
+ .get(EnvironmentUtilities.ENV_DOCKER_MOUNTS_FOR_CONTAINER_RUNTIME);
+ assertEquals("remotedir2:/remotedir2:rw", dockerMounts);
+ }
+
+ @Test
+ public void testLocalizeNonHdfsRemoteUri() throws IOException {
+ when(remoteDirectoryManager.isRemote(anyString())).thenReturn(true);
+ when(remoteDirectoryManager.existsRemoteFile(any(Path.class)))
+ .thenReturn(true);
+ String remoteUri = "remote://remotedir1/remotedir2";
+ when(fsOperations.uploadToRemoteFile(any(Path.class), anyString()))
+ .thenReturn(new Path(remoteUri));
+ String downloadedFileName = "remotedir2_221424";
+ when(fsOperations.downloadAndZip(anyString(), anyString(), eq(false)))
+ .thenReturn(downloadedFileName);
+
+ Localization localization = new Localization();
+ localization.setLocalPath("/remotedir2");
+ localization.setRemoteUri(remoteUri);
+ Localizer localizer = createLocalizerWithLocalizations(localization);
+
+ localizer.handleLocalizations(service);
+
+ verify(fsOperations).validFileSize(anyString());
+ verify(fsOperations).deleteFiles(eq(downloadedFileName));
+ verify(fsOperations).uploadToRemoteFile(any(Path.class),
+ eq(downloadedFileName));
+
+ List<ConfigFile> files = service.getConfiguration().getFiles();
+ assertEquals(1, files.size());
+
+ ConfigFile configFile = files.get(0);
+ assertEquals(TypeEnum.STATIC, configFile.getType());
+ assertEquals(remoteUri, configFile.getSrcFile());
+ assertEquals("remotedir2", configFile.getDestFile());
+
+ assertEquals(1, service.getConfiguration().getEnv().size());
+ String dockerMounts = service.getConfiguration().getEnv()
+ .get(EnvironmentUtilities.ENV_DOCKER_MOUNTS_FOR_CONTAINER_RUNTIME);
+ assertEquals("remotedir2:/remotedir2:rw", dockerMounts);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org