You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by tt...@apache.org on 2017/01/12 13:40:00 UTC

ambari git commit: AMBARI-19241 - Ambari python scripts should support hdfs download

Repository: ambari
Updated Branches:
  refs/heads/trunk 497586479 -> 5228e1847


AMBARI-19241 - Ambari python scripts should support hdfs download


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5228e184
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5228e184
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5228e184

Branch: refs/heads/trunk
Commit: 5228e1847bd7b92d7a3379d26002879dbfeafffe
Parents: 4975864
Author: Tim Thorpe <tt...@apache.org>
Authored: Thu Jan 12 05:30:19 2017 -0800
Committer: Tim Thorpe <tt...@apache.org>
Committed: Thu Jan 12 05:30:19 2017 -0800

----------------------------------------------------------------------
 .../libraries/functions/download_from_hdfs.py   |  76 ++++++++
 .../libraries/providers/hdfs_resource.py        | 177 +++++++++++++++----
 .../libraries/resources/hdfs_resource.py        |  11 +-
 .../ambari/fast_hdfs_resource/Resource.java     |  63 ++++---
 .../ambari/fast_hdfs_resource/Runner.java       |  20 ++-
 5 files changed, 272 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/5228e184/ambari-common/src/main/python/resource_management/libraries/functions/download_from_hdfs.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/download_from_hdfs.py b/ambari-common/src/main/python/resource_management/libraries/functions/download_from_hdfs.py
new file mode 100644
index 0000000..5826fc1
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/download_from_hdfs.py
@@ -0,0 +1,76 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+__all__ = ["download_from_hdfs", ]
+
+import os
+import uuid
+import tempfile
+import re
+
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.resources.hdfs_resource import HdfsResource
+from resource_management.libraries.functions.default import default
+from resource_management.core import shell
+from resource_management.core.logger import Logger
+
+
+def download_from_hdfs(source_file, dest_path, user_group, owner, download_type="file", file_mode=0444, force_execute=False,
+                       replace_existing_files=False):
+  """
+  :param source_file: the source file path
+  :param dest_path: the destination path
+  :param user_group: Group to own the directory.
+  :param owner: File owner
+  :param download_type: file or directory
+  :param file_mode: File permission
+  :param force_execute: If true, will execute the HDFS commands immediately, otherwise, will defer to the calling function.
+  :param replace_existing_files: If true, will replace existing files even if they are the same size
+  :return: Will return True if successful, otherwise, False.
+  """
+  import params
+
+  Logger.info("Called download_from_hdfs source in HDFS: {0} , local destination path: {1}".format(source_file, dest_path))
+
+  # The destination directory must already exist
+  if not os.path.exists(dest_path):
+    Logger.error("Cannot copy {0} because destination directory {1} does not exist.".format(source_file, dest_path))
+    return False
+
+  filename = os.path.basename(source_file)
+  dest_file = os.path.join(dest_path, filename)
+
+  params.HdfsResource(dest_file,
+                      type=download_type,
+                      action="download_on_execute",
+                      source=source_file,
+                      group=user_group,
+                      owner=owner,
+                      mode=file_mode,
+                      replace_existing_files=replace_existing_files,
+  )
+
+  Logger.info("Will attempt to copy from DFS at {0} to local file system {1}.".format(source_file, dest_file))
+
+  # For improved performance, force_execute should be False so that it is delayed and combined with other calls.
+  if force_execute:
+    params.HdfsResource(None, action="execute")
+
+  return True

http://git-wip-us.apache.org/repos/asf/ambari/blob/5228e184/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
index f1aa3e1..69cc7cd 100644
--- a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
+++ b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
@@ -21,6 +21,8 @@ Ambari Agent
 """
 import re
 import os
+import grp
+import pwd
 import time
 from resource_management.core.environment import Environment
 from resource_management.core.base import Fail
@@ -63,10 +65,10 @@ class HdfsResourceJar:
   """
   This is slower than HdfsResourceWebHDFS implementation of HdfsResouce, but it works in any cases on any DFS types.
   
-  The idea is to put all the files/directories/copyFromLocals we have to create/delete into a json file.
-  And then create in it with ONLY ONE expensive hadoop call to our custom jar fast-hdfs-resource.jar which grabs this json.
+  The idea is to put all the files/directories/copyFromLocals/copyToLocals we have to create/delete into a json file.
+  And then perform them with ONLY ONE expensive hadoop call to our custom jar fast-hdfs-resource.jar which grabs this json.
   
-  'create_and_execute' and 'delete_on_execute' does nothing but add files/directories to this json,
+  'create_and_execute', 'delete_on_execute' and "download_on_execute do nothing except add actions to this json,
   while execute does all the expensive creating/deleting work executing the jar with the json as parameter.
   """
   def action_delayed(self, action_name, main_resource):
@@ -96,7 +98,7 @@ class HdfsResourceJar:
     main_resource.assert_parameter_is_set('user')
 
     if not 'hdfs_files' in env.config or not env.config['hdfs_files']:
-      Logger.info("No resources to create. 'create_on_execute' or 'delete_on_execute' wasn't triggered before this 'execute' action.")
+      Logger.info("No resources to create. 'create_on_execute' or 'delete_on_execute' or 'download_on_execute' wasn't triggered before this 'execute' action.")
       return
     
     hadoop_bin_dir = main_resource.resource.hadoop_bin_dir
@@ -163,13 +165,18 @@ class WebHDFSUtil:
     for k,v in kwargs.iteritems():
       url = format("{url}&{k}={v}")
     
-    if file_to_put and not os.path.exists(file_to_put):
-      raise Fail(format("File {file_to_put} is not found."))
-    
     cmd = ["curl", "-sS","-L", "-w", "%{http_code}", "-X", method]
-    
-    if file_to_put:
-      cmd += ["--data-binary", "@"+file_to_put, "-H", "Content-Type: application/octet-stream"]
+
+    # When operation is "OPEN" the target is actually the DFS file to download and the file_to_put is actually the target see _download_file
+    if operation == "OPEN":
+      cmd += ["-o", file_to_put]
+    else:
+      if file_to_put and not os.path.exists(file_to_put):
+        raise Fail(format("File {file_to_put} is not found."))
+
+      if file_to_put:
+        cmd += ["--data-binary", "@"+file_to_put, "-H", "Content-Type: application/octet-stream"]
+
     if self.security_enabled:
       cmd += ["--negotiate", "-u", ":"]
     if self.is_https_enabled:
@@ -232,6 +239,28 @@ class HdfsResourceWebHDFS:
     
     if self.target_status and self.target_status['type'].lower() != type:
       raise Fail(format("Trying to create file/directory but directory/file exists in the DFS on {target}"))
+
+  def _assert_download_valid(self):
+    source = self.main_resource.resource.source
+    type = self.main_resource.resource.type
+    target = self.main_resource.resource.target
+
+    if source:
+      self.source_status = self._get_file_status(source)
+      if self.source_status == None:
+        raise Fail(format("Source {source} doesn't exist"))
+      if type == "directory" and self.source_status['type'] == "FILE":
+        raise Fail(format("Source {source} is file but type is {type}"))
+      elif type == "file" and self.source_status['type'] == "DIRECTORY":
+        raise Fail(format("Source {source} is directory but type is {type}"))
+    else:
+      raise Fail(format("No source provided"))
+
+    if os.path.exists(target):
+      if type == "directory" and os.path.isfile(target):
+        raise Fail(format("Trying to download directory but file exists locally {target}"))
+      elif type == "file" and os.path.isdir(target):
+        raise Fail(format("Trying to download file but directory exists locally {target}"))
     
   def action_delayed(self, action_name, main_resource):
     main_resource.assert_parameter_is_set('user')
@@ -244,7 +273,10 @@ class HdfsResourceWebHDFS:
     self.mode = oct(main_resource.resource.mode)[1:] if main_resource.resource.mode else main_resource.resource.mode
     self.mode_set = False
     self.main_resource = main_resource
-    self._assert_valid()
+    if action_name == "download":
+      self._assert_download_valid()
+    else:
+      self._assert_valid()
     
     if self.main_resource.manage_if_exists == False and self.target_status:
       Logger.info("Skipping the operation for not managed DFS directory " + str(self.main_resource.resource.target) +
@@ -255,6 +287,8 @@ class HdfsResourceWebHDFS:
       self._create_resource()
       self._set_mode(self.target_status)
       self._set_owner(self.target_status)
+    elif action_name == "download":
+      self._download_resource()
     else:
       self._delete_resource()
     
@@ -282,16 +316,78 @@ class HdfsResourceWebHDFS:
       else:
         self._create_file(new_target, new_source)
   
+  def _download_resource(self):
+    if self.main_resource.resource.source == None:
+      return
+
+    if self.main_resource.resource.type == "file":
+      self._download_file(self.main_resource.resource.target, self.main_resource.resource.source, self.source_status)
+    elif self.main_resource.resource.type == "directory":
+      self._download_directory(self.main_resource.resource.target, self.main_resource.resource.source)
+
+  def _download_directory(self, target, source):
+    self._create_local_directory(target)
+
+    for file_status in self._list_directory(source):
+      if not file_status == None:
+        next_path_part = file_status['pathSuffix']
+        new_source = format("{source}/{next_path_part}")
+        new_target = os.path.join(target, next_path_part)
+        if file_status['type'] == "DIRECTORY":
+          self._download_directory(new_target, new_source)
+        else:
+          self._download_file(new_target, new_source, file_status)
+
+  def _create_local_directory(self, target):
+    if not os.path.exists(target):
+      Logger.info(format("Creating local directory {target}"))
+      sudo.makedir(target, "")
+
+      owner_name = "" if not self.main_resource.resource.owner else self.main_resource.resource.owner
+      group_name = "" if not self.main_resource.resource.group else self.main_resource.resource.group
+      owner = pwd.getpwnam(owner_name)
+      group = grp.getgrnam(group_name)
+      sudo.chown(target, owner, group)
+
+  def _download_file(self, target, source, file_status):
+    """
+    PUT file command is slow, however _get_file_status is pretty fast,
+    so we should check if the file really should be put before doing it.
+    """
+
+    if file_status and os.path.exists(target):
+      length = file_status['length']
+      local_file_size = os.stat(target).st_size # TODO: os -> sudo
+
+      # TODO: re-implement this using checksums
+      if local_file_size == length:
+        Logger.info(format("DFS file {source} is identical to {target}, skipping the download"))
+        return
+      elif not self.main_resource.resource.replace_existing_files:
+        Logger.info(format("Not replacing existing local file {target} which is different from DFS file {source}, due to replace_existing_files=False"))
+        return
+
+    kwargs = {}
+    self.util.run_command(source, 'OPEN', method='GET', overwrite=True, assertable_result=False, file_to_put=target, **kwargs)
+
+
   def _create_directory(self, target):
     if target == self.main_resource.resource.target and self.target_status:
       return
-    
+
     self.util.run_command(target, 'MKDIRS', method='PUT')
-    
+
   def _get_file_status(self, target):
     list_status = self.util.run_command(target, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
     return list_status['FileStatus'] if 'FileStatus' in list_status else None
-    
+
+  def _list_directory(self, target):
+    results = self.util.run_command(target, 'LISTSTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
+    entry = results['FileStatuses'] if 'FileStatuses' in results else None
+    if entry == None:
+      return []
+    return entry['FileStatus'] if 'FileStatus' in entry else []
+
   def _create_file(self, target, source=None, mode=""):
     """
     PUT file command in slow, however _get_file_status is pretty fast,
@@ -299,12 +395,12 @@ class HdfsResourceWebHDFS:
     """
     file_status = self._get_file_status(target) if target!=self.main_resource.resource.target else self.target_status
     mode = "" if not mode else mode
-    
+
     if file_status:
       if source:
         length = file_status['length']
         local_file_size = os.stat(source).st_size # TODO: os -> sudo
-        
+
         # TODO: re-implement this using checksums
         if local_file_size == length:
           Logger.info(format("DFS file {target} is identical to {source}, skipping the copying"))
@@ -315,16 +411,16 @@ class HdfsResourceWebHDFS:
       else:
         Logger.info(format("File {target} already exists in DFS, skipping the creation"))
         return
-    
+
     Logger.info(format("Creating new file {target} in DFS"))
     kwargs = {'permission': mode} if mode else {}
-      
+
     self.util.run_command(target, 'CREATE', method='PUT', overwrite=True, assertable_result=False, file_to_put=source, **kwargs)
-    
+
     if mode and file_status:
       file_status['permission'] = mode
-    
-     
+
+
   def _delete_resource(self):
     if not self.target_status:
           return
@@ -336,14 +432,14 @@ class HdfsResourceWebHDFS:
 
     if not self.main_resource.resource.recursive_chown and (not owner or file_status and file_status['owner'] == owner) and (not group or file_status and file_status['group'] == group):
       return
-    
+
     self.util.run_command(self.main_resource.resource.target, 'SETOWNER', method='PUT', owner=owner, group=group, assertable_result=False)
-    
+
     results = []
-    
+
     if self.main_resource.resource.recursive_chown:
       content_summary = self.util.run_command(self.main_resource.resource.target, 'GETCONTENTSUMMARY', method='GET', assertable_result=False)
-      
+
       if content_summary['ContentSummary']['fileCount'] <= HdfsResourceWebHDFS.MAX_FILES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS and content_summary['ContentSummary']['directoryCount'] <= HdfsResourceWebHDFS.MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS:
         self._fill_directories_list(self.main_resource.resource.target, results)
       else: # avoid chmowning a lot of files and listing a lot dirs via webhdfs which can take a lot of time.
@@ -351,34 +447,33 @@ class HdfsResourceWebHDFS:
 
     if self.main_resource.resource.change_permissions_for_parents:
       self._fill_in_parent_directories(self.main_resource.resource.target, results)
-      
+
     for path in results:
       self.util.run_command(path, 'SETOWNER', method='PUT', owner=owner, group=group, assertable_result=False)
-  
+
   def _set_mode(self, file_status=None):
     if not self.mode or file_status and file_status['permission'] == self.mode:
       return
-    
+
     if not self.mode_set:
       self.util.run_command(self.main_resource.resource.target, 'SETPERMISSION', method='PUT', permission=self.mode, assertable_result=False)
-    
+
     results = []
-    
+
     if self.main_resource.resource.recursive_chmod:
       content_summary = self.util.run_command(self.main_resource.resource.target, 'GETCONTENTSUMMARY', method='GET', assertable_result=False)
-      
+
       if content_summary['ContentSummary']['fileCount'] <= HdfsResourceWebHDFS.MAX_FILES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS and content_summary['ContentSummary']['directoryCount'] <= HdfsResourceWebHDFS.MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS:
         self._fill_directories_list(self.main_resource.resource.target, results)
       else: # avoid chmoding a lot of files and listing a lot dirs via webhdfs which can take a lot of time.
         shell.checked_call(["hadoop", "fs", "-chmod", "-R", self.mode, self.main_resource.resource.target], user=self.main_resource.resource.user)
-      
+
     if self.main_resource.resource.change_permissions_for_parents:
       self._fill_in_parent_directories(self.main_resource.resource.target, results)
-      
+
     for path in results:
       self.util.run_command(path, 'SETPERMISSION', method='PUT', permission=self.mode, assertable_result=False)
-    
-    
+
   def _fill_in_parent_directories(self, target, results):
     path_parts = HdfsResourceProvider.parse_path(target).split("/")[1:]# [1:] remove '' from parts
     path = "/"
@@ -386,18 +481,19 @@ class HdfsResourceWebHDFS:
     for path_part in path_parts:
       path += path_part + "/"
       results.append(path)
-      
+
+
   def _fill_directories_list(self, target, results):
     list_status = self.util.run_command(target, 'LISTSTATUS', method='GET', assertable_result=False)['FileStatuses']['FileStatus']
-    
+
     for file in list_status:
       if file['pathSuffix']:
         new_path = target + "/" + file['pathSuffix']
         results.append(new_path)
-        
+
         if file['type'] == 'DIRECTORY':
           self._fill_directories_list(new_path, results)  
-    
+
 class HdfsResourceProvider(Provider):
   def __init__(self, resource):
     super(HdfsResourceProvider,self).__init__(resource)
@@ -461,6 +557,9 @@ class HdfsResourceProvider(Provider):
   def action_delete_on_execute(self):
     self.action_delayed("delete")
 
+  def action_download_on_execute(self):
+    self.action_delayed("download")
+
   def action_execute(self):
     self.get_hdfs_resource_executor().action_execute(self)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/5228e184/ambari-common/src/main/python/resource_management/libraries/resources/hdfs_resource.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/resources/hdfs_resource.py b/ambari-common/src/main/python/resource_management/libraries/resources/hdfs_resource.py
index 5761fd6..0a22147 100644
--- a/ambari-common/src/main/python/resource_management/libraries/resources/hdfs_resource.py
+++ b/ambari-common/src/main/python/resource_management/libraries/resources/hdfs_resource.py
@@ -30,7 +30,7 @@ The cause is that for every call new connection initialized, with datanodes, nam
 While this resource can gather the directories/files to create/delete/copyFromLocal.
 And after just with one call create all that.
 
-action = create_on_execute / delete_on_execute. Are for gathering information  about what you want
+action = create_on_execute / delete_on_execute / download_on_execute. Are for gathering information  about what you want
 to create.
 
 After everything is gathered you should execute action = execute. To perform delayed actions
@@ -43,6 +43,7 @@ The resource is a replacement for the following operations:
   5) hadoop fs -touchz
   6) hadoop fs -chmod
   7) hadoop fs -chown
+  8) hadoop fs -copyToLocal
 """
 
 
@@ -52,7 +53,7 @@ class HdfsResource(Resource):
   target = ResourceArgument(default=lambda obj: obj.name)
   # "directory" or "file"
   type = ResourceArgument()
-  # "create_on_execute" or "delete_on_execute" or "execute"
+  # "create_on_execute" or "delete_on_execute" or "download_on_execute" or "execute"
   action = ForcedListArgument()
   # if present - copies file/directory from local path {source} to hadoop path - {target}
   source = ResourceArgument()
@@ -102,6 +103,6 @@ class HdfsResource(Resource):
   # To support HCFS
   dfs_type = ResourceArgument(default="")
 
-  #action 'execute' immediately creates all pending files/directories in efficient manner
-  #action 'create_on_execute/delete_on_execute' adds file/directory to list of pending directories
-  actions = Resource.actions + ["create_on_execute", "delete_on_execute", "execute"]
+  #action 'execute' immediately performs all pending actions in an efficient manner
+  #action 'create_on_execute/delete_on_execute/download_on_execute' adds to the list of pending actions
+  actions = Resource.actions + ["create_on_execute", "delete_on_execute", "download_on_execute", "execute"]

http://git-wip-us.apache.org/repos/asf/ambari/blob/5228e184/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Resource.java
----------------------------------------------------------------------
diff --git a/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Resource.java b/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Resource.java
index da5a706..9cbfab2 100644
--- a/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Resource.java
+++ b/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Resource.java
@@ -30,7 +30,10 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.FileSystem;
 
 /**
- * Used to: 1) copy files/directories from localFS to hadoopFs 2) create empty
+ * Used to:
+ *   1) copy files/directories from localFS to hadoopFs
+ *   2) create empty
+ *   3) download files/directories from hadoopFs to localFS
  * files/directories in hadoopFs
  */
 public class Resource {
@@ -153,47 +156,55 @@ public class Resource {
     ArrayList<String> actionsAvailable = new ArrayList<String>();
     actionsAvailable.add("create");
     actionsAvailable.add("delete");
+    actionsAvailable.add("download");
     ArrayList<String> typesAvailable = new ArrayList<String>();
     typesAvailable.add("file");
     typesAvailable.add("directory");
 
-    if (resource.getTarget() == null)
-      throw new IllegalArgumentException(
-          "Path to resource in HadoopFs must be filled.");
-
-    if (resource.getAction() == null
-        || !actionsAvailable.contains(resource.getAction()))
+    if (resource.getAction() == null || !actionsAvailable.contains(resource.getAction())) {
       throw new IllegalArgumentException("Action is not supported.");
+    }
+
+    String dfsPath = resource.getTarget();
+    String localPath = resource.getSource();
+    if (resource.getAction().equals("download")) {
+      dfsPath = resource.getSource();
+      localPath = resource.getTarget();
+    }
+
+    if (dfsPath == null) {
+      throw new IllegalArgumentException("Path to resource in HadoopFs must be filled.");
+    }
 
-    if (resource.getType() == null
-        || !typesAvailable.contains(resource.getType()))
+    if (resource.getType() == null || !typesAvailable.contains(resource.getType())) {
       throw new IllegalArgumentException("Type is not supported.");
+    }
 
     // Check consistency for ("type":"file" == file in hadoop)
-    if (dfs.isFile(new Path(resource.getTarget()))
-        && !"file".equals(resource.getType()))
+    if (dfs.isFile(new Path(dfsPath)) && !"file".equals(resource.getType())) {
       throw new IllegalArgumentException(
-          "Cannot create a directory " + resource.getTarget() +
+          "Cannot create a directory " + dfsPath +
               " because file is present on the given path.");
+    }
     // Check consistency for ("type":"directory" == directory in hadoop)
-    else if (dfs.isDirectory(new Path(resource.getTarget()))
-        && !"directory".equals(resource.getType()))
+    else if (dfs.isDirectory(new Path(dfsPath)) && !"directory".equals(resource.getType())) {
       throw new IllegalArgumentException(
-          "Cannot create a file " + resource.getTarget() +
+          "Cannot create a file " + dfsPath +
               " because directory is present on the given path.");
-    
-    if(resource.getSource() != null) {
-      File source = new File(resource.getSource());
-      if(source.isFile()
-          && !"file".equals(resource.getType()))
+    }
+
+    if(localPath != null) {
+      File local = new File(localPath);
+      if(local.isFile() && !"file".equals(resource.getType())) {
         throw new IllegalArgumentException(
-            "Cannot create a directory " + resource.getTarget() +
-                " because source " + resource.getSource() + "is a file");
-      else if(source.isDirectory()
-          && !"directory".equals(resource.getType()))
+            "Cannot create a directory " + dfsPath +
+                " because source " + localPath + "is a file");
+      }
+      else if(local.isDirectory() && !"directory".equals(resource.getType())) {
         throw new IllegalArgumentException(
-            "Cannot create a file " + resource.getTarget() +
-                " because source " + resource.getSource() + "is a directory");      
+            "Cannot create a file " + dfsPath +
+                " because source " + localPath + "is a directory");
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/5228e184/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Runner.java
----------------------------------------------------------------------
diff --git a/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Runner.java b/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Runner.java
index e4656c7..e210876 100644
--- a/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Runner.java
+++ b/contrib/fast-hdfs-resource/src/main/java/org/apache/ambari/fast_hdfs_resource/Runner.java
@@ -71,11 +71,18 @@ public class Runner {
 
         Resource.checkResourceParameters(resource, dfs);
 
-        Path pathHadoop = new Path(resource.getTarget());
-        if (!resource.isManageIfExists() && dfs.exists(pathHadoop)) {
-          System.out.println("Skipping the operation for not managed DFS directory " + resource.getTarget() +
-                             " since immutable_paths contains it.");
-          continue;
+        Path pathHadoop = null;
+
+        if (resource.getAction().equals("download")) {
+          pathHadoop = new Path(resource.getSource());
+        }
+        else {
+          if (!resource.isManageIfExists() && dfs.exists(pathHadoop)) {
+            System.out.println("Skipping the operation for not managed DFS directory " + resource.getTarget() +
+                               " since immutable_paths contains it.");
+            continue;
+          }
+          pathHadoop = new Path(resource.getTarget());
         }
 
         if (resource.getAction().equals("create")) {
@@ -86,6 +93,9 @@ public class Runner {
         } else if (resource.getAction().equals("delete")) {
           // 6 - Delete
           dfs.delete(pathHadoop, true);
+        } else if (resource.getAction().equals("download")) {
+          // 7 - Download
+          dfs.copyToLocalFile(pathHadoop, new Path(resource.getTarget()));
         }
       }
     }