You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by gatorsmile <gi...@git.apache.org> on 2018/09/10 17:45:59 UTC
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/20611#discussion_r216413405
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---
@@ -303,94 +303,44 @@ case class LoadDataCommand(
s"partitioned, but a partition spec was provided.")
}
}
-
- val loadPath =
+ val loadPath = {
if (isLocal) {
- val uri = Utils.resolveURI(path)
- val file = new File(uri.getPath)
- val exists = if (file.getAbsolutePath.contains("*")) {
- val fileSystem = FileSystems.getDefault
- val dir = file.getParentFile.getAbsolutePath
- if (dir.contains("*")) {
- throw new AnalysisException(
- s"LOAD DATA input path allows only filename wildcard: $path")
- }
-
- // Note that special characters such as "*" on Windows are not allowed as a path.
- // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path.
- val dirPath = fileSystem.getPath(dir)
- val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath
- val safePathPattern = if (Utils.isWindows) {
- // On Windows, the pattern should not start with slashes for absolute file paths.
- pathPattern.stripPrefix("/")
- } else {
- pathPattern
- }
- val files = new File(dir).listFiles()
- if (files == null) {
- false
- } else {
- val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern)
- files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath)))
- }
- } else {
- new File(file.getAbsolutePath).exists()
- }
- if (!exists) {
- throw new AnalysisException(s"LOAD DATA input path does not exist: $path")
- }
- uri
+ val localFS = FileContext.getLocalFSFileContext()
+ makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path))
} else {
- val uri = new URI(path)
- val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) {
- uri
- } else {
- // Follow Hive's behavior:
- // If no schema or authority is provided with non-local inpath,
- // we will use hadoop configuration "fs.defaultFS".
- val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS")
- val defaultFS = if (defaultFSConf == null) {
- new URI("")
- } else {
- new URI(defaultFSConf)
- }
-
- val scheme = if (uri.getScheme() != null) {
- uri.getScheme()
- } else {
- defaultFS.getScheme()
- }
- val authority = if (uri.getAuthority() != null) {
- uri.getAuthority()
- } else {
- defaultFS.getAuthority()
- }
-
- if (scheme == null) {
- throw new AnalysisException(
- s"LOAD DATA: URI scheme is required for non-local input paths: '$path'")
- }
-
- // Follow Hive's behavior:
- // If LOCAL is not specified, and the path is relative,
- // then the path is interpreted relative to "/user/<username>"
- val uriPath = uri.getPath()
- val absolutePath = if (uriPath != null && uriPath.startsWith("/")) {
- uriPath
- } else {
- s"/user/${System.getProperty("user.name")}/$uriPath"
- }
- new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment())
- }
- val hadoopConf = sparkSession.sessionState.newHadoopConf()
- val srcPath = new Path(hdfsUri)
- val fs = srcPath.getFileSystem(hadoopConf)
- if (!fs.exists(srcPath)) {
- throw new AnalysisException(s"LOAD DATA input path does not exist: $path")
- }
- hdfsUri
+ val loadPath = new Path(path)
--- End diff --
The change of this line is a behavior change. @sujith71955
We need to add this to the migration guide.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org