You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by tt...@apache.org on 2017/01/12 13:40:00 UTC
ambari git commit: AMBARI-19241 - Ambari python scripts should
support hdfs download
Repository: ambari
Updated Branches:
refs/heads/trunk 497586479 -> 5228e1847
AMBARI-19241 - Ambari python scripts should support hdfs download
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5228e184
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5228e184
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5228e184
Branch: refs/heads/trunk
Commit: 5228e1847bd7b92d7a3379d26002879dbfeafffe
Parents: 4975864
Author: Tim Thorpe <tt...@apache.org>
Authored: Thu Jan 12 05:30:19 2017 -0800
Committer: Tim Thorpe <tt...@apache.org>
Committed: Thu Jan 12 05:30:19 2017 -0800
----------------------------------------------------------------------
.../libraries/functions/download_from_hdfs.py | 76 ++++++++
.../libraries/providers/hdfs_resource.py | 177 +++++++++++++++----
.../libraries/resources/hdfs_resource.py | 11 +-
.../ambari/fast_hdfs_resource/Resource.java | 63 ++++---
.../ambari/fast_hdfs_resource/Runner.java | 20 ++-
5 files changed, 272 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/5228e184/ambari-common/src/main/python/resource_management/libraries/functions/download_from_hdfs.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/download_from_hdfs.py b/ambari-common/src/main/python/resource_management/libraries/functions/download_from_hdfs.py
new file mode 100644
index 0000000..5826fc1
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/download_from_hdfs.py
@@ -0,0 +1,76 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+__all__ = ["download_from_hdfs", ]
+
+import os
+import uuid
+import tempfile
+import re
+
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.resources.hdfs_resource import HdfsResource
+from resource_management.libraries.functions.default import default
+from resource_management.core import shell
+from resource_management.core.logger import Logger
+
+
+def download_from_hdfs(source_file, dest_path, user_group, owner, download_type="file", file_mode=0444, force_execute=False,
+ replace_existing_files=False):
+ """
+ :param source_file: the source file path
+ :param dest_path: the destination path
+ :param user_group: Group to own the directory.
+ :param owner: File owner
+ :param download_type: file or directory
+ :param file_mode: File permission
+ :param force_execute: If true, will execute the HDFS commands immediately, otherwise, will defer to the calling function.
+ :param replace_existing_files: If true, will replace existing files even if they are the same size
+ :return: Will return True if successful, otherwise, False.
+ """
+ import params
+
+ Logger.info("Called download_from_hdfs source in HDFS: {0} , local destination path: {1}".format(source_file, dest_path))
+
+ # The destination directory must already exist
+ if not os.path.exists(dest_path):
+ Logger.error("Cannot copy {0} because destination directory {1} does not exist.".format(source_file, dest_path))
+ return False
+
+ filename = os.path.basename(source_file)
+ dest_file = os.path.join(dest_path, filename)
+
+ params.HdfsResource(dest_file,
+ type=download_type,
+ action="download_on_execute",
+ source=source_file,
+ group=user_group,
+ owner=owner,
+ mode=file_mode,
+ replace_existing_files=replace_existing_files,
+ )
+
+ Logger.info("Will attempt to copy from DFS at {0} to local file system {1}.".format(source_file, dest_file))
+
+ # For improved performance, force_execute should be False so that it is delayed and combined with other calls.
+ if force_execute:
+ params.HdfsResource(None, action="execute")
+
+ return True
http://git-wip-us.apache.org/repos/asf/ambari/blob/5228e184/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
index f1aa3e1..69cc7cd 100644
--- a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
+++ b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
@@ -21,6 +21,8 @@ Ambari Agent
"""
import re
import os
+import grp
+import pwd
import time
from resource_management.core.environment import Environment
from resource_management.core.base import Fail
@@ -63,10 +65,10 @@ class HdfsResourceJar:
"""
This is slower than HdfsResourceWebHDFS implementation of HdfsResouce, but it works in any cases on any DFS types.
- The idea is to put all the files/directories/copyFromLocals we have to create/delete into a json file.
- And then create in it with ONLY ONE expensive hadoop call to our custom jar fast-hdfs-resource.jar which grabs this json.
+ The idea is to put all the files/directories/copyFromLocals/copyToLocals we have to create/delete into a json file.
+ And then perform them with ONLY ONE expensive hadoop call to our custom jar fast-hdfs-resource.jar which grabs this json.
- 'create_and_execute' and 'delete_on_execute' does nothing but add files/directories to this json,
+ 'create_and_execute', 'delete_on_execute' and "download_on_execute do nothing except add actions to this json,
while execute does all the expensive creating/deleting work executing the jar with the json as parameter.
"""
def action_delayed(self, action_name, main_resource):
@@ -96,7 +98,7 @@ class HdfsResourceJar:
main_resource.assert_parameter_is_set('user')
if not 'hdfs_files' in env.config or not env.config['hdfs_files']:
- Logger.info("No resources to create. 'create_on_execute' or 'delete_on_execute' wasn't triggered before this 'execute' action.")
+ Logger.info("No resources to create. 'create_on_execute' or 'delete_on_execute' or 'download_on_execute' wasn't triggered before this 'execute' action.")
return
hadoop_bin_dir = main_resource.resource.hadoop_bin_dir
@@ -163,13 +165,18 @@ class WebHDFSUtil:
for k,v in kwargs.iteritems():
url = format("{url}&{k}={v}")
- if file_to_put and not os.path.exists(file_to_put):
- raise Fail(format("File {file_to_put} is not found."))
-
cmd = ["curl", "-sS","-L", "-w", "%{http_code}", "-X", method]
-
- if file_to_put:
- cmd += ["--data-binary", "@"+file_to_put, "-H", "Content-Type: application/octet-stream"]
+
+ # When operation is "OPEN" the target is actually the DFS file to download and the file_to_put is actually the target see _download_file
+ if operation == "OPEN":
+ cmd += ["-o", file_to_put]
+ else:
+ if file_to_put and not os.path.exists(file_to_put):
+ raise Fail(format("File {file_to_put} is not found."))
+
+ if file_to_put:
+ cmd += ["--data-binary", "@"+file_to_put, "-H", "Content-Type: application/octet-stream"]
+
if self.security_enabled:
cmd += ["--negotiate", "-u", ":"]
if self.is_https_enabled:
@@ -232,6 +239,28 @@ class HdfsResourceWebHDFS:
if self.target_status and self.target_status['type'].lower() != type:
raise Fail(format("Trying to create file/directory but directory/file exists in the DFS on {target}"))
+
+ def _assert_download_valid(self):
+ source = self.main_resource.resource.source
+ type = self.main_resource.resource.type
+ target = self.main_resource.resource.target
+
+ if source:
+ self.source_status = self._get_file_status(source)
+ if self.source_status == None:
+ raise Fail(format("Source {source} doesn't exist"))
+ if type == "directory" and self.source_status['type'] == "FILE":
+ raise Fail(format("Source {source} is file but type is {type}"))
+ elif type == "file" and self.source_status['type'] == "DIRECTORY":
+ raise Fail(format("Source {source} is directory but type is {type}"))
+ else:
+ raise Fail(format("No source provided"))
+
+ if os.path.exists(target):
+ if type == "directory" and os.path.isfile(target):
+ raise Fail(format("Trying to download directory but file exists locally {target}"))
+ elif type == "file" and os.path.isdir(target):
+ raise Fail(format("Trying to download file but directory exists locally {target}"))
def action_delayed(self, action_name, main_resource):
main_resource.assert_parameter_is_set('user')
@@ -244,7 +273,10 @@ class HdfsResourceWebHDFS:
self.mode = oct(main_resource.resource.mode)[1:] if main_resource.resource.mode else main_resource.resource.mode
self.mode_set = False
self.main_resource = main_resource
- self._assert_valid()
+ if action_name == "download":
+ self._assert_download_valid()
+ else:
+ self._assert_valid()
if self.main_resource.manage_if_exists == False and self.target_status:
Logger.info("Skipping the operation for not managed DFS directory " + str(self.main_resource.resource.target) +
@@ -255,6 +287,8 @@ class HdfsResourceWebHDFS:
self._create_resource()
self._set_mode(self.target_status)
self._set_owner(self.target_status)
+ elif action_name == "download":
+ self._download_resource()
else:
self._delete_resource()
@@ -282,16 +316,78 @@ class HdfsResourceWebHDFS:
else:
self._create_file(new_target, new_source)
+ def _download_resource(self):
+ if self.main_resource.resource.source == None:
+ return
+
+ if self.main_resource.resource.type == "file":
+ self._download_file(self.main_resource.resource.target, self.main_resource.resource.source, self.source_status)
+ elif self.main_resource.resource.type == "directory":
+ self._download_directory(self.main_resource.resource.target, self.main_resource.resource.source)
+
+ def _download_directory(self, target, source):
+ self._create_local_directory(target)
+
+ for file_status in self._list_directory(source):
+ if not file_status == None:
+ next_path_part = file_status['pathSuffix']
+ new_source = format("{source}/{next_path_part}")
+ new_target = os.path.join(target, next_path_part)
+ if file_status['type'] == "DIRECTORY":
+ self._download_directory(new_target, new_source)
+ else:
+ self._download_file(new_target, new_source, file_status)
+
+ def _create_local_directory(self, target):
+ if not os.path.exists(target):
+ Logger.info(format("Creating local directory {target}"))
+ sudo.makedir(target, "")
+
+ owner_name = "" if not self.main_resource.resource.owner else self.main_resource.resource.owner
+ group_name = "" if not self.main_resource.resource.group else self.main_resource.resource.group
+ owner = pwd.getpwnam(owner_name)
+ group = grp.getgrnam(group_name)
+ sudo.chown(target, owner, group)
+
+ def _download_file(self, target, source, file_status):
+ """
+ PUT file command is slow, however _get_file_status is pretty fast,
+ so we should check if the file really should be put before doing it.
+ """
+
+ if file_status and os.path.exists(target):
+ length = file_status['length']
+ local_file_size = os.stat(target).st_size # TODO: os -> sudo
+
+ # TODO: re-implement this using checksums
+ if local_file_size == length:
+ Logger.info(format("DFS file {source} is identical to {target}, skipping the download"))
+ return
+ elif not self.main_resource.resource.replace_existing_files:
+ Logger.info(format("Not replacing existing local file {target} which is different from DFS file {source}, due to replace_existing_files=False"))
+ return
+
+ kwargs = {}
+ self.util.run_command(source, 'OPEN', method='GET', overwrite=True, assertable_result=False, file_to_put=target, **kwargs)
+
+
def _create_directory(self, target):
if target == self.main_resource.resource.target and self.target_status:
return
-
+
self.util.run_command(target, 'MKDIRS', method='PUT')
-
+
def _get_file_status(self, target):
list_status = self.util.run_command(target, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
return list_status['FileStatus'] if 'FileStatus' in list_status else None
-
+
+ def _list_directory(self, target):
+ results = self.util.run_command(target, 'LISTSTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
+ entry = results['FileStatuses'] if 'FileStatuses' in results else None
+ if entry == None:
+ return []
+ return entry['FileStatus'] if 'FileStatus' in entry else []
+
def _create_file(self, target, source=None, mode=""):
"""
PUT file command in slow, however _get_file_status is pretty fast,
@@ -299,12 +395,12 @@ class HdfsResourceWebHDFS:
"""
file_status = self._get_file_status(target) if target!=self.main_resource.resource.target else self.target_status
mode = "" if not mode else mode
-
+
if file_status:
if source:
length = file_status['length']
local_file_size = os.stat(source).st_size # TODO: os -> sudo
-
+
# TODO: re-implement this using checksums
if local_file_size == length:
Logger.info(format("DFS file {target} is identical to {source}, skipping the copying"))
@@ -315,16 +411,16 @@ class HdfsResourceWebHDFS:
else:
Logger.info(format("File {target} already exists in DFS, skipping the creation"))
return
-
+
Logger.info(format("Creating new file {target} in DFS"))
kwargs = {'permission': mode} if mode else {}
-
+
self.util.run_command(target, 'CREATE', method='PUT', overwrite=True, assertable_result=False, file_to_put=source, **kwargs)
-
+
if mode and file_status:
file_status['permission'] = mode
-
-
+
+
def _delete_resource(self):
if not self.target_status:
return
@@ -336,14 +432,14 @@ class HdfsResourceWebHDFS:
if not self.main_resource.resource.recursive_chown and (not owner or file_status and file_status['owner'] == owner) and (not group or file_status and file_status['group'] == group):
return
-
+
self.util.run_command(self.main_resource.resource.target, 'SETOWNER', method='PUT', owner=owner, group=group, assertable_result=False)
-
+
results = []
-
+
if self.main_resource.resource.recursive_chown:
content_summary = self.util.run_command(self.main_resource.resource.target, 'GETCONTENTSUMMARY', method='GET', assertable_result=False)
-
+
if content_summary['ContentSummary']['fileCount'] <= HdfsResourceWebHDFS.MAX_FILES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS and content_summary['ContentSummary']['directoryCount'] <= HdfsResourceWebHDFS.MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS:
self._fill_directories_list(self.main_resource.resource.target, results)
else: # avoid chmowning a lot of files and listing a lot dirs via webhdfs which can take a lot of time.
@@ -351,34 +447,33 @@ class HdfsResourceWebHDFS:
if self.main_resource.resource.change_permissions_for_parents:
self._fill_in_parent_directories(self.main_resource.resource.target, results)
-
+
for path in results:
self.util.run_command(path, 'SETOWNER', method='PUT', owner=owner, group=group, assertable_result=False)
-
+
def _set_mode(self, file_status=None):
if not self.mode or file_status and file_status['permission'] == self.mode:
return
-
+
if not self.mode_set:
self.util.run_command(self.main_resource.resource.target, 'SETPERMISSION', method='PUT', permission=self.mode, assertable_result=False)
-
+
results = []
-
+
if self.main_resource.resource.recursive_chmod:
content_summary = self.util.run_command(self.main_resource.resource.target, 'GETCONTENTSUMMARY', method='GET', assertable_result=False)
-
+
if content_summary['ContentSummary']['fileCount'] <= HdfsResourceWebHDFS.MAX_FILES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS and content_summary['ContentSummary']['directoryCount'] <= HdfsResourceWebHDFS.MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS:
self._fill_directories_list(self.main_resource.resource.target, results)
else: # avoid chmoding a lot of files and listing a lot dirs via webhdfs which can take a lot of time.
shell.checked_call(["hadoop", "fs", "-chmod", "-R", self.mode, self.main_resource.resource.target], user=self.main_resource.resource.user)
-
+
if self.main_resource.resource.change_permissions_for_parents:
self._fill_in_parent_directories(self.main_resource.resource.target, results)
-
+
for path in results:
self.util.run_command(path, 'SETPERMISSION', method='PUT', permission=self.mode, assertable_result=False)
-
-
+
def _fill_in_parent_directories(self, target, results):
path_parts = HdfsResourceProvider.parse_path(target).split("/")[1:]# [1:] remove '' from parts
path = "/"
@@ -386,18 +481,19 @@ class HdfsResourceWebHDFS:
for path_part in path_parts:
path += path_part + "/"
results.append(path)
-
+
+
def _fill_directories_list(self, target, results):
list_status = self.util.run_command(target, 'LISTSTATUS', method='GET', assertable_result=False)['FileStatuses']['FileStatus']
-
+
for file in list_status:
if file['pathSuffix']:
new_path = target + "/" + file['pathSuffix']
results.append(new_path)
-
+
if file['type'] == 'DIRECTORY':
self._fill_directories_list(new_path, results)
-
+
class HdfsResourceProvider(Provider):
def __init__(self, resource):
super(HdfsResourceProvider,self).__init__(resource)
@@ -461,6 +557,9 @@ class HdfsResourceProvider(Provider):
def action_delete_on_execute(self):
self.action_delayed("delete")
+ def action_download_on_execute(self):
+ self.action_delayed("download")
+
def action_execute(self):
self.get_hdfs_resource_executor().action_execute(self)
http://git-wip-us.apache.org/repos/asf/ambari/blob/5228e184/ambari-common/src/main/python/resource_management/libraries/resources/hdfs_resource.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/resources/hdfs_resource.py b/ambari-common/src/main/python/resource_management/libraries/resources/hdfs_resource.py
index 5761fd6..0a22147 100644
--- a/ambari-common/src/main/python/resource_management/libraries/resources/hdfs_resource.py
+++ b/ambari-common/src/main/python/resource_management/libraries/resources/hdfs_resource.py
@@ -30,7 +30,7 @@ The cause is that for every call new connection initialized, with datanodes, nam
While this resource can gather the directories/files to create/delete/copyFromLocal.
And after just with one call create all that.
-action = create_on_execute / delete_on_execute. Are for gathering information about what you want
+action = create_on_execute / delete_on_execute / download_on_execute. Are for gathering information about what you want
to create.
After everything is gathered you should execute action = execute. To perform delayed actions
@@ -43,6 +43,7 @@ The resource is a replacement for the following operations:
5) hadoop fs -touchz
6) hadoop fs -chmod
7) hadoop fs -chown
+ 8) hadoop fs -copyToLocal
"""
@@ -52,7 +53,7 @@ class HdfsResource(Resource):
target = ResourceArgument(default=lambda obj: obj.name)
# "directory" or "file"
type = ResourceArgument()
- # "create_on_execute" or "delete_on_execute" or "execute"
+ # "create_on_execute" or "delete_on_execute" or "download_on_execute" or "execute"
action = ForcedListArgument()
# if present - copies file/directory from local path {source} to hadoop path - {target}
source = ResourceArgument()
@@ -102,6 +103,6 @@ class HdfsResource(Resource):
# To support HCFS
dfs_type = ResourceArgument(default="")
- #action 'execute' immediately creates all pending files/directories in efficient manner
- #action 'create_on_execute/delete_on_execute' adds file/directory to list of pending directories
- actions = Resource.actions + ["create_on_execute", "delete_on_execute", "execute"]
+ #action 'execute' immediately performs all pending actions in an efficient manner
+ #action 'create_on_execute/delete_on_execute/download_on_execute' adds to the list of pending actions
+ actions = Resource.actions + ["create_on_execute", "delete_on_execute", "download_on_execute", "execute"]
http://git-wip-us.apache.org/repos/asf/ambari/blob/5228e184/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Resource.java
----------------------------------------------------------------------
diff --git a/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Resource.java b/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Resource.java
index da5a706..9cbfab2 100644
--- a/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Resource.java
+++ b/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Resource.java
@@ -30,7 +30,10 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.FileSystem;
/**
- * Used to: 1) copy files/directories from localFS to hadoopFs 2) create empty
+ * Used to:
+ * 1) copy files/directories from localFS to hadoopFs
+ * 2) create empty
+ * 3) download files/directories from hadoopFs to localFS
* files/directories in hadoopFs
*/
public class Resource {
@@ -153,47 +156,55 @@ public class Resource {
ArrayList<String> actionsAvailable = new ArrayList<String>();
actionsAvailable.add("create");
actionsAvailable.add("delete");
+ actionsAvailable.add("download");
ArrayList<String> typesAvailable = new ArrayList<String>();
typesAvailable.add("file");
typesAvailable.add("directory");
- if (resource.getTarget() == null)
- throw new IllegalArgumentException(
- "Path to resource in HadoopFs must be filled.");
-
- if (resource.getAction() == null
- || !actionsAvailable.contains(resource.getAction()))
+ if (resource.getAction() == null || !actionsAvailable.contains(resource.getAction())) {
throw new IllegalArgumentException("Action is not supported.");
+ }
+
+ String dfsPath = resource.getTarget();
+ String localPath = resource.getSource();
+ if (resource.getAction().equals("download")) {
+ dfsPath = resource.getSource();
+ localPath = resource.getTarget();
+ }
+
+ if (dfsPath == null) {
+ throw new IllegalArgumentException("Path to resource in HadoopFs must be filled.");
+ }
- if (resource.getType() == null
- || !typesAvailable.contains(resource.getType()))
+ if (resource.getType() == null || !typesAvailable.contains(resource.getType())) {
throw new IllegalArgumentException("Type is not supported.");
+ }
// Check consistency for ("type":"file" == file in hadoop)
- if (dfs.isFile(new Path(resource.getTarget()))
- && !"file".equals(resource.getType()))
+ if (dfs.isFile(new Path(dfsPath)) && !"file".equals(resource.getType())) {
throw new IllegalArgumentException(
- "Cannot create a directory " + resource.getTarget() +
+ "Cannot create a directory " + dfsPath +
" because file is present on the given path.");
+ }
// Check consistency for ("type":"directory" == directory in hadoop)
- else if (dfs.isDirectory(new Path(resource.getTarget()))
- && !"directory".equals(resource.getType()))
+ else if (dfs.isDirectory(new Path(dfsPath)) && !"directory".equals(resource.getType())) {
throw new IllegalArgumentException(
- "Cannot create a file " + resource.getTarget() +
+ "Cannot create a file " + dfsPath +
" because directory is present on the given path.");
-
- if(resource.getSource() != null) {
- File source = new File(resource.getSource());
- if(source.isFile()
- && !"file".equals(resource.getType()))
+ }
+
+ if(localPath != null) {
+ File local = new File(localPath);
+ if(local.isFile() && !"file".equals(resource.getType())) {
throw new IllegalArgumentException(
- "Cannot create a directory " + resource.getTarget() +
- " because source " + resource.getSource() + "is a file");
- else if(source.isDirectory()
- && !"directory".equals(resource.getType()))
+ "Cannot create a directory " + dfsPath +
+ " because source " + localPath + "is a file");
+ }
+ else if(local.isDirectory() && !"directory".equals(resource.getType())) {
throw new IllegalArgumentException(
- "Cannot create a file " + resource.getTarget() +
- " because source " + resource.getSource() + "is a directory");
+ "Cannot create a file " + dfsPath +
+ " because source " + localPath + "is a directory");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5228e184/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Runner.java
----------------------------------------------------------------------
diff --git a/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Runner.java b/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Runner.java
index e4656c7..e210876 100644
--- a/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Runner.java
+++ b/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Runner.java
@@ -71,11 +71,18 @@ public class Runner {
Resource.checkResourceParameters(resource, dfs);
- Path pathHadoop = new Path(resource.getTarget());
- if (!resource.isManageIfExists() && dfs.exists(pathHadoop)) {
- System.out.println("Skipping the operation for not managed DFS directory " + resource.getTarget() +
- " since immutable_paths contains it.");
- continue;
+ Path pathHadoop = null;
+
+ if (resource.getAction().equals("download")) {
+ pathHadoop = new Path(resource.getSource());
+ }
+ else {
+ if (!resource.isManageIfExists() && dfs.exists(pathHadoop)) {
+ System.out.println("Skipping the operation for not managed DFS directory " + resource.getTarget() +
+ " since immutable_paths contains it.");
+ continue;
+ }
+ pathHadoop = new Path(resource.getTarget());
}
if (resource.getAction().equals("create")) {
@@ -86,6 +93,9 @@ public class Runner {
} else if (resource.getAction().equals("delete")) {
// 6 - Delete
dfs.delete(pathHadoop, true);
+ } else if (resource.getAction().equals("download")) {
+ // 7 - Download
+ dfs.copyToLocalFile(pathHadoop, new Path(resource.getTarget()));
}
}
}