You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/06/08 18:39:30 UTC

[ambari] branch trunk updated: AMBARI-23945. Infra Solr migration - restored data can be deleted on Solr startup (if data on hdfs)

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5529b29  AMBARI-23945. Infra Solr migration - restored data can be deleted on Solr startup (if data on hdfs)
5529b29 is described below

commit 5529b29b0ba8e20bac7a69ef541539cc9085e709
Author: Oliver Szabo <ol...@gmail.com>
AuthorDate: Fri Jun 8 20:37:30 2018 +0200

    AMBARI-23945. Infra Solr migration - restored data can be deleted on Solr startup (if data on hdfs)
---
 .../libraries/functions/solr_cloud_util.py         |  15 +++
 ambari-infra/ambari-infra-solr-client/build.xml    |   5 +
 .../src/main/python/migrationHelper.py             | 113 ++++++++++++++++++++-
 .../src/main/resources/data/.keep                  |  15 +++
 .../0.1.0/package/scripts/collection.py            |  59 +++++++----
 .../0.1.0/package/scripts/command_commons.py       |  23 +++--
 6 files changed, 203 insertions(+), 27 deletions(-)

diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/solr_cloud_util.py b/ambari-common/src/main/python/resource_management/libraries/functions/solr_cloud_util.py
index bffbebd..a6ed9b3 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/solr_cloud_util.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/solr_cloud_util.py
@@ -199,6 +199,21 @@ def remove_admin_handlers(zookeeper_quorum, solr_znode, java64_home, collection,
   remove_admin_handlers_cmd = format('{solr_cli_prefix} --remove-admin-handlers --collection {collection} --retry {retry} --interval {interval}')
   Execute(remove_admin_handlers_cmd)
 
+def copy_solr_znode(zookeeper_quorum, solr_znode, java64_home, jaas_file, src_znode, target_znode, retry = 5, interval = 10, java_opts=None):
+  solr_cli_prefix = __create_solr_cloud_cli_prefix(zookeeper_quorum, solr_znode, java64_home, java_opts, jaas_file)
+  copy_znode_cmd = format("{solr_cli_prefix} --transfer-znode --copy-src {src_znode} --copy-dest {target_znode} --retry {retry} --interval {interval}")
+  Execute(copy_znode_cmd)
+
+def copy_solr_znode_to_locat(zookeeper_quorum, solr_znode, java64_home, jaas_file, src_znode, target, retry = 5, interval = 10, java_opts=None):
+  solr_cli_prefix = __create_solr_cloud_cli_prefix(zookeeper_quorum, solr_znode, java64_home, java_opts, jaas_file)
+  copy_znode_cmd = format("{solr_cli_prefix} --transfer-znode --transfer-mode copyToLocal --copy-src {src_znode} --copy-dest {target} --retry {retry} --interval {interval}")
+  Execute(copy_znode_cmd)
+
+def copy_solr_znode_from_local(zookeeper_quorum, solr_znode, java64_home, jaas_file, src, target_znode, retry = 5, interval = 10, java_opts=None):
+  solr_cli_prefix = __create_solr_cloud_cli_prefix(zookeeper_quorum, solr_znode, java64_home, java_opts, jaas_file)
+  copy_znode_cmd = format("{solr_cli_prefix} --transfer-znode --transfer-mode copyFromLocal --copy-src {src} --copy-dest {target_znode} --retry {retry} --interval {interval}")
+  Execute(copy_znode_cmd)
+
 def default_config(config, name, default_value):
   subdicts = filter(None, name.split('/'))
   if not config:
diff --git a/ambari-infra/ambari-infra-solr-client/build.xml b/ambari-infra/ambari-infra-solr-client/build.xml
index 866e323..71c59e7 100644
--- a/ambari-infra/ambari-infra-solr-client/build.xml
+++ b/ambari-infra/ambari-infra-solr-client/build.xml
@@ -35,6 +35,7 @@
       <fileset file="target/*.jar"/>
     </copy>
     <mkdir dir="target/migrate"/>
+    <mkdir dir="target/migrate/data"/>
     <get src="${lucene6-core.url}" dest="target/migrate/${lucene6-core-jar.name}" usetimestamp="true"/>
     <get src="${lucene6-backward-codecs.url}" dest="target/migrate/${lucene6-backward-codecs-jar.name}" usetimestamp="true"/>
     <copy todir="target/package/migrate" includeEmptyDirs="no">
@@ -42,6 +43,10 @@
       <fileset file="target/package/libs/lucene-core-${solr.version}.jar"/>
       <fileset file="target/package/libs/lucene-backward-codecs-${solr.version}.jar"/>
       <fileset file="src/main/resources/managed-schema"/>
+      <fileset file="src/main/resources/managed-schema"/>
+    </copy>
+    <copy todir="target/package/migrate/data" includeEmptyDirs="no">
+      <fileset file="src/main/resources/data/.keep"/>
     </copy>
     <copy todir="target/package" includeEmptyDirs="no">
       <fileset file="src/main/resources/solrCloudCli.sh"/>
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
index 4a971b2..29ae933 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
@@ -18,6 +18,8 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
+import copy
+import glob
 import logging
 import os
 import sys
@@ -800,8 +802,84 @@ def upgrade_ranger_solrconfig_xml(options, config, service_filter):
       solr_znode=config.get('infra_solr', 'znode')
     ranger_config_set_name = config.get('ranger_collection', 'ranger_config_set_name')
     backup_ranger_config_set_name = config.get('ranger_collection', 'backup_ranger_config_set_name')
-    copy_znode(options, config, "{0}/configs/solrconfig.xml".format(solr_znode, ranger_config_set_name),
-               "{0}/configs/solrconfig.xml".format(solr_znode, backup_ranger_config_set_name))
+    copy_znode(options, config, "{0}/configs/{1}/solrconfig.xml".format(solr_znode, ranger_config_set_name),
+               "{0}/configs/{1}/solrconfig.xml".format(solr_znode, backup_ranger_config_set_name))
+
+def update_state_json(collection, config, options):
+  solr_znode='/infra-solr'
+  if config.has_section('infra_solr') and config.has_option('infra_solr', 'znode'):
+    solr_znode=config.get('infra_solr', 'znode')
+  coll_data_dir = "{0}migrate/data/{1}".format(INFRA_SOLR_CLIENT_BASE_PATH, collection)
+  if not os.path.exists(coll_data_dir):
+    os.makedirs(coll_data_dir)
+
+  copy_znode(options, config, "{0}/collections/{1}/state.json".format(solr_znode, collection), "{0}/state.json".format(coll_data_dir), copy_to_local=True)
+  copy_znode(options, config, "{0}/restore_metadata/{1}".format(solr_znode, collection), "{0}".format(coll_data_dir), copy_to_local=True)
+
+  json_file_list=glob.glob("{0}/*.json".format(coll_data_dir))
+  logger.debug("Downloaded json files list: {0}".format(str(json_file_list)))
+
+  cores_data_json_list = [k for k in json_file_list if 'state.json' not in k]
+  state_json_list = [k for k in json_file_list if 'state.json' in k]
+
+  if not cores_data_json_list:
+    raise Exception('Cannot find any downloaded restore core metadata for {0}'.format(collection))
+  if not state_json_list:
+    raise Exception('Cannot find any downloaded restore collection state metadata for {0}'.format(collection))
+
+  state_json_file=state_json_list[0]
+  state_data = read_json(state_json_file)
+  core_json_data=[]
+  for core_data_json_file in cores_data_json_list:
+    core_json_data.append(read_json(core_data_json_file))
+
+  logger.debug("collection data content: {0}".format(str(state_data)))
+  core_details={}
+  for core in core_json_data:
+    core_details[core['core_node']]=core
+  logger.debug("core data contents: {0}".format(str(core_details)))
+
+  collection_data = state_data[collection]
+  shards = collection_data['shards']
+  new_state_json_data=copy.deepcopy(state_data)
+
+  for shard in shards:
+    replicas = shards[shard]['replicas']
+    for replica in replicas:
+      core_data = replicas[replica]
+      core = core_data['core']
+      base_url = core_data['base_url']
+      node_name = core_data['node_name']
+      data_dir = core_data['dataDir'] if 'dataDir' in core_data else None
+      ulog_dir = core_data['ulogDir'] if 'ulogDir' in core_data else None
+
+      if replica in core_details:
+        old_core_node=core_details[replica]['core_node']
+        new_core_node=core_details[replica]['new_core_node']
+
+        new_state_core = copy.deepcopy(state_data[collection]['shards'][shard]['replicas'][replica])
+        new_state_json_data[collection]['shards'][shard]['replicas'][new_core_node]=new_state_core
+        if old_core_node != new_core_node:
+          if old_core_node in new_state_json_data[collection]['shards'][shard]['replicas']:
+            del new_state_json_data[collection]['shards'][shard]['replicas'][old_core_node]
+          if data_dir:
+            new_state_json_data[collection]['shards'][shard]['replicas'][new_core_node]['dataDir']=data_dir.replace(old_core_node, new_core_node)
+          if ulog_dir:
+            new_state_json_data[collection]['shards'][shard]['replicas'][new_core_node]['ulogDir']=ulog_dir.replace(old_core_node, new_core_node)
+        old_host=core_details[replica]['old_host']
+        new_host=core_details[replica]['new_host']
+        if old_host != new_host and old_core_node != new_core_node:
+          new_state_json_data[collection]['shards'][shard]['replicas'][new_core_node]['base_url']=base_url.replace(old_host, new_host)
+          new_state_json_data[collection]['shards'][shard]['replicas'][new_core_node]['node_name']=node_name.replace(old_host, new_host)
+        elif old_host != new_host:
+          new_state_json_data[collection]['shards'][shard]['replicas'][replica]['base_url']=base_url.replace(old_host, new_host)
+          new_state_json_data[collection]['shards'][shard]['replicas'][replica]['node_name']=node_name.replace(old_host, new_host)
+
+  with open("{0}/new_state.json".format(coll_data_dir), 'w') as outfile:
+    json.dump(new_state_json_data, outfile)
+
+  copy_znode(options, config, "{0}/new_state.json".format(coll_data_dir), "{0}/collections/{1}/state.json".format(solr_znode, collection), copy_from_local=True)
+
 
 def delete_znodes(options, config, service_filter):
   solr_znode='/infra-solr'
@@ -1113,6 +1191,33 @@ def rolling_restart_solr(options, accessor, parser, config):
   post_json(accessor, BATCH_REQUEST_API_URL.format(cluster), request_body)
   print "Rolling Restart Infra Solr Instances request sent. (check Ambari UI about the requests)"
 
+def update_state_jsons(options, accessor, parser, config, service_filter):
+  solr_urls = get_solr_urls(config)
+  collections=retry(list_collections, options, config, solr_urls, context="[List Solr Collections]")
+  collections=filter_collections(options, collections)
+  if is_ranger_available(config, service_filter):
+    backup_ranger_collection = config.get('ranger_collection', 'backup_ranger_collection_name')
+    if backup_ranger_collection in collections:
+      update_state_json(backup_ranger_collection, config, options)
+    else:
+      print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_ranger_collection)
+  if is_atlas_available(config, service_filter):
+    backup_fulltext_index_name = config.get('atlas_collections', 'backup_fulltext_index_name')
+    if backup_fulltext_index_name in collections:
+      update_state_json(backup_fulltext_index_name, config, options)
+    else:
+      print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_fulltext_index_name)
+    backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name')
+    if backup_edge_index_name in collections:
+      update_state_json(backup_edge_index_name, config, options)
+    else:
+      print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_edge_index_name)
+    backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name')
+    if backup_vertex_index_name in collections:
+      update_state_json(backup_vertex_index_name, config, options)
+    else:
+      print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_fulltext_index_name)
+
 if __name__=="__main__":
   parser = optparse.OptionParser("usage: %prog [options]")
 
@@ -1187,6 +1292,10 @@ if __name__=="__main__":
         upgrade_ranger_solrconfig_xml(options, config, service_filter)
         create_backup_collections(options, accessor, parser, config, service_filter)
         restore_collections(options, accessor, parser, config, service_filter)
+        update_state_jsons(options, accessor, parser, config, service_filter)
+      elif options.action.lower() == 'update-collection-state':
+        update_state_jsons(options, accessor, parser, config, service_filter)
+      elif options.action.lower() == 'reload':
         reload_collections(options, accessor, parser, config, service_filter)
       elif options.action.lower() == 'migrate':
         migrate_snapshots(options, accessor, parser, config, service_filter)
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/resources/data/.keep b/ambari-infra/ambari-infra-solr-client/src/main/resources/data/.keep
new file mode 100644
index 0000000..5c8bc35
--- /dev/null
+++ b/ambari-infra/ambari-infra-solr-client/src/main/resources/data/.keep
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+echo "Keep this empty file"
\ No newline at end of file
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
index 46c4e0e..3d83e14 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py
@@ -16,13 +16,14 @@ See the License for the specific language governing permissions and
 limitations under the License.
 
 """
+import os
 import time
 from resource_management.core.logger import Logger
 from resource_management.core.resources.system import Directory, Execute, File
 from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions import solr_cloud_util
 from resource_management.libraries.resources.properties_file import PropertiesFile
 
-
 def backup_collection(env):
   """
   Backup collections using replication API (as Solr Cloud Backup API is not available in Solr 5)
@@ -158,7 +159,10 @@ def restore_collection(env):
     core_properties['name'] = target_core
     core_properties['replicaType'] = core_details['type']
     core_properties['collection'] = command_commons.collection
-    core_properties['coreNodeName'] = core_details['node']
+    if command_commons.solr_hdfs_path:
+      core_properties['coreNodeName'] = 'backup_' + core_details['node']
+    else:
+      core_properties['coreNodeName'] = core_details['node']
     core_properties['shard'] = core_details['shard']
     if command_commons.solr_hdfs_path:
       hdfs_solr_node_folder=command_commons.solr_hdfs_path + format("/backup_{collection}/") + core_details['node']
@@ -174,30 +178,21 @@ def restore_collection(env):
                                    recursive_chown=True,
                                    recursive_chmod=True
                                    )
-        command_commons.HdfsResource(None, action="execute")
         command_commons.HdfsResource(format("{hdfs_solr_node_folder}/data/tlog"),
                                    type="directory",
                                    action="create_on_execute",
                                    owner=params.infra_solr_user,
                                    mode=0755
                                    )
-        command_commons.HdfsResource(None, action="execute")
         command_commons.HdfsResource(format("{hdfs_solr_node_folder}/data/snapshot_metadata"),
                                    type="directory",
                                    action="create_on_execute",
                                    owner=params.infra_solr_user,
                                    mode=0755
                                    )
-        command_commons.HdfsResource(None, action="execute")
-        if command_commons.solr_keep_backup:
-          Directory(format("{index_location}/snapshot.{src_core}"),
-                  action="delete",
-                  only_if=only_if_cmd,
-                  owner=params.infra_solr_user)
     else:
-      copy_cmd = format(
-        "mv {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/") if command_commons.solr_keep_backup \
-        else format("cp -r {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/")
+      copy_cmd = format("cp -r {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/") if command_commons.solr_keep_backup \
+        else format("mv {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/")
       Execute(
         copy_cmd, only_if=only_if_cmd,
         user=params.infra_solr_user,
@@ -218,6 +213,7 @@ def restore_collection(env):
           logoutput=True)
   for core_data in core_pairs:
     src_core = core_data['src_core']
+    src_host = core_data['src_host']
     target_core = core_data['target_core']
 
     if src_core in command_commons.skip_cores:
@@ -227,14 +223,36 @@ def restore_collection(env):
       Logger.info(format("Core '{target_core}' (target) is filtered out."))
       continue
 
+    if os.path.exists(format("{index_location}/snapshot.{src_core}")):
+      data_to_save = {}
+      host_core_data=host_cores_map[command_commons.CORE_DATA]
+      core_details=host_core_data[target_core]
+      core_node=core_details['node']
+      data_to_save['core']=target_core
+      data_to_save['core_node']=core_node
+      data_to_save['old_host']=core_data['target_host']
+      data_to_save['new_host']=src_host
+      if command_commons.solr_hdfs_path:
+        data_to_save['new_core_node']="backup_" + core_node
+      else:
+        data_to_save['new_core_node']=core_node
+
+      command_commons.write_core_file(target_core, data_to_save)
+      jaas_file = params.infra_solr_jaas_file if params.security_enabled else None
+      core_json_location = format("{index_location}/{target_core}.json")
+      znode_json_location = format("/restore_metadata/{collection}/{target_core}.json")
+      solr_cloud_util.copy_solr_znode_from_local(params.zookeeper_quorum, params.infra_solr_znode, params.java64_home, jaas_file, core_json_location, znode_json_location)
+
     core_root_dir = format("{solr_datadir}/backup_{target_core}")
     core_root_without_backup_dir = format("{solr_datadir}/{target_core}")
 
     if command_commons.solr_hdfs_path:
       if target_core in hdfs_cores_on_host:
+
         Logger.info(format("Core data '{target_core}' is located on this host, processing..."))
-        core_data=host_cores_map[command_commons.CORE_DATA]
-        core_details=core_data[target_core]
+        host_core_data=host_cores_map[command_commons.CORE_DATA]
+        core_details=host_core_data[target_core]
+
         core_node=core_details['node']
         collection_core_dir=command_commons.solr_hdfs_path + format("/{collection}/{core_node}")
         backup_collection_core_dir=command_commons.solr_hdfs_path + format("/backup_{collection}/{core_node}")
@@ -243,9 +261,9 @@ def restore_collection(env):
                                action="delete_on_execute",
                                owner=params.infra_solr_user
                                )
-        command_commons.HdfsResource(None, action="execute")
         if command_commons.check_hdfs_folder_exists(backup_collection_core_dir):
-          command_commons.move_hdfs_folder(backup_collection_core_dir, collection_core_dir)
+          collection_backup_core_dir=command_commons.solr_hdfs_path + format("/{collection}/backup_{core_node}")
+          command_commons.move_hdfs_folder(backup_collection_core_dir, collection_backup_core_dir)
       else:
         Logger.info(format("Core data '{target_core}' is not located on this host, skipping..."))
 
@@ -266,3 +284,10 @@ def restore_collection(env):
       recursive_ownership=True,
       only_if=format("test -d {core_root_without_backup_dir}")
     )
+
+    if command_commons.solr_hdfs_path and not command_commons.solr_keep_backup:
+      only_if_cmd = format("test -d {index_location}/snapshot.{src_core}")
+      Directory(format("{index_location}/snapshot.{src_core}"),
+            action="delete",
+            only_if=only_if_cmd,
+            owner=params.infra_solr_user)
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
index 5d3e897..8051d6c 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py
@@ -87,6 +87,12 @@ solr_num_shards = int(default("/commandParams/solr_shards", "0"))
 
 solr_hdfs_path=default("/commandParams/solr_hdfs_path", None)
 
+keytab = None
+principal = None
+if params.security_enabled:
+  keytab = params.infra_solr_kerberos_keytab
+  principal = params.infra_solr_kerberos_principal
+
 if solr_hdfs_path:
 
   import functools
@@ -120,21 +126,17 @@ if solr_hdfs_path:
     user=params.infra_solr_user,
     hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore",
     security_enabled = params.security_enabled,
-    keytab = hdfs_user_keytab,
+    keytab = keytab,
     kinit_path_local = kinit_path_local,
     hadoop_bin_dir = hadoop_bin_dir,
     hadoop_conf_dir = hadoop_conf_dir,
-    principal_name = hdfs_principal_name,
+    principal_name = principal,
     hdfs_site = hdfs_site,
     default_fs = default_fs,
     immutable_paths = get_not_managed_resources(),
     dfs_type = dfs_type
   )
 
-if params.security_enabled:
-  keytab = params.infra_solr_kerberos_keytab
-  principal = params.infra_solr_kerberos_principal
-
 hostname_suffix = params.hostname.replace(".", "_")
 
 HOST_CORES='host-cores'
@@ -318,6 +320,11 @@ def __read_host_cores_from_clusterstate_json(json_zk_state_path, json_host_cores
     json.dump(json_content, outfile)
   return json_content
 
+def write_core_file(core, core_data):
+  core_json_location = format("{index_location}/{core}.json")
+  with open(core_json_location, 'w') as outfile:
+    json.dump(core_data, outfile)
+
 def __read_host_cores_from_file(json_host_cores_path):
   """
   Read host cores from file, can be useful if you do not want to regenerate host core data (with that you can generate your own host core pairs for restore)
@@ -410,10 +417,10 @@ def execute_commad(command):
   return call(command, user=params.infra_solr_user, timeout=300)
 
 def move_hdfs_folder(source_dir, target_dir):
-  cmd=create_command(format("hdfs dfs -mv {source_dir} {target_dir}"))
+  cmd=create_command(format('hdfs dfs -mv {source_dir} {target_dir}'))
   returncode, stdout = execute_commad(cmd)
   if returncode:
-    raise Fail("Unable to move HDFS dir '{0}' to '{1}' (return code: {2})".format(source_dir, target_dir, str(returncode)))
+    raise Exception("Unable to move HDFS dir '{0}' to '{1}' (return code: {2})".format(source_dir, target_dir, str(returncode)))
   return stdout.strip()
 
 def check_hdfs_folder_exists(hdfs_dir):

-- 
To stop receiving notification emails like this one, please contact
oleewere@apache.org.