You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2015/10/17 23:56:29 UTC

spark git commit: [SPARK-10185] [SQL] Feat sql comma separated paths

Repository: spark
Updated Branches:
  refs/heads/master 254937420 -> 57f83e36d


[SPARK-10185] [SQL] Feat sql comma separated paths

Make sure comma-separated paths get processed correcly in ResolvedDataSource for a HadoopFsRelationProvider

Author: Koert Kuipers <ko...@tresata.com>

Closes #8416 from koertkuipers/feat-sql-comma-separated-paths.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/57f83e36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/57f83e36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/57f83e36

Branch: refs/heads/master
Commit: 57f83e36d63bbd79663c49a6c1e8f6c3c8fe4789
Parents: 2549374
Author: Koert Kuipers <ko...@tresata.com>
Authored: Sat Oct 17 14:56:24 2015 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Sat Oct 17 14:56:24 2015 -0700

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                | 14 +++++-
 python/test_support/sql/people1.json            |  2 +
 .../org/apache/spark/sql/DataFrameReader.scala  | 11 +++++
 .../datasources/ResolvedDataSource.scala        | 47 +++++++++++++++-----
 .../org/apache/spark/sql/DataFrameSuite.scala   | 18 ++++++++
 5 files changed, 81 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/57f83e36/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index f43d8bf..93832d4 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -116,6 +116,10 @@ class DataFrameReader(object):
         ...     opt2=1, opt3='str')
         >>> df.dtypes
         [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
+        >>> df = sqlContext.read.format('json').load(['python/test_support/sql/people.json',
+        ...     'python/test_support/sql/people1.json'])
+        >>> df.dtypes
+        [('age', 'bigint'), ('aka', 'string'), ('name', 'string')]
         """
         if format is not None:
             self.format(format)
@@ -123,7 +127,15 @@ class DataFrameReader(object):
             self.schema(schema)
         self.options(**options)
         if path is not None:
-            return self._df(self._jreader.load(path))
+            if type(path) == list:
+                paths = path
+                gateway = self._sqlContext._sc._gateway
+                jpaths = gateway.new_array(gateway.jvm.java.lang.String, len(paths))
+                for i in range(0, len(paths)):
+                    jpaths[i] = paths[i]
+                return self._df(self._jreader.load(jpaths))
+            else:
+                return self._df(self._jreader.load(path))
         else:
             return self._df(self._jreader.load())
 

http://git-wip-us.apache.org/repos/asf/spark/blob/57f83e36/python/test_support/sql/people1.json
----------------------------------------------------------------------
diff --git a/python/test_support/sql/people1.json b/python/test_support/sql/people1.json
new file mode 100644
index 0000000..6d217da
--- /dev/null
+++ b/python/test_support/sql/people1.json
@@ -0,0 +1,2 @@
+{"name":"Jonathan", "aka": "John"}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/57f83e36/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index eacdea2..e8651a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -22,6 +22,7 @@ import java.util.Properties
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.util.StringUtils
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.JavaRDD
@@ -124,6 +125,16 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
   }
 
   /**
+   * Loads input in as a [[DataFrame]], for data sources that support multiple paths.
+   * Only works if the source is a HadoopFsRelationProvider.
+   *
+   * @since 1.6.0
+   */
+  def load(paths: Array[String]): DataFrame = {
+    option("paths", paths.map(StringUtils.escapeString(_, '\\', ',')).mkString(",")).load()
+  }
+
+  /**
    * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
    * url named table and connection properties.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/57f83e36/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
index 0117244..54beabb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
@@ -24,6 +24,7 @@ import scala.language.{existentials, implicitConversions}
 import scala.util.{Success, Failure, Try}
 
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.util.StringUtils
 
 import org.apache.spark.Logging
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -89,7 +90,11 @@ object ResolvedDataSource extends Logging {
     val relation = userSpecifiedSchema match {
       case Some(schema: StructType) => clazz.newInstance() match {
         case dataSource: SchemaRelationProvider =>
-          dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
+          val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+          if (caseInsensitiveOptions.contains("paths")) {
+            throw new AnalysisException(s"$className does not support paths option.")
+          }
+          dataSource.createRelation(sqlContext, caseInsensitiveOptions, schema)
         case dataSource: HadoopFsRelationProvider =>
           val maybePartitionsSchema = if (partitionColumns.isEmpty) {
             None
@@ -99,10 +104,19 @@ object ResolvedDataSource extends Logging {
 
           val caseInsensitiveOptions = new CaseInsensitiveMap(options)
           val paths = {
-            val patternPath = new Path(caseInsensitiveOptions("path"))
-            val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-            val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-            SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
+            if (caseInsensitiveOptions.contains("paths") &&
+              caseInsensitiveOptions.contains("path")) {
+              throw new AnalysisException(s"Both path and paths options are present.")
+            }
+            caseInsensitiveOptions.get("paths")
+              .map(_.split("(?<!\\\\),").map(StringUtils.unEscapeString(_, '\\', ',')))
+              .getOrElse(Array(caseInsensitiveOptions("path")))
+              .flatMap{ pathString =>
+                val hdfsPath = new Path(pathString)
+                val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+                val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+                SparkHadoopUtil.get.globPathIfNecessary(qualified).map(_.toString)
+              }
           }
 
           val dataSchema =
@@ -122,14 +136,27 @@ object ResolvedDataSource extends Logging {
 
       case None => clazz.newInstance() match {
         case dataSource: RelationProvider =>
-          dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
+          val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+          if (caseInsensitiveOptions.contains("paths")) {
+            throw new AnalysisException(s"$className does not support paths option.")
+          }
+          dataSource.createRelation(sqlContext, caseInsensitiveOptions)
         case dataSource: HadoopFsRelationProvider =>
           val caseInsensitiveOptions = new CaseInsensitiveMap(options)
           val paths = {
-            val patternPath = new Path(caseInsensitiveOptions("path"))
-            val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-            val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-            SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
+            if (caseInsensitiveOptions.contains("paths") &&
+              caseInsensitiveOptions.contains("path")) {
+              throw new AnalysisException(s"Both path and paths options are present.")
+            }
+            caseInsensitiveOptions.get("paths")
+              .map(_.split("(?<!\\\\),").map(StringUtils.unEscapeString(_, '\\', ',')))
+              .getOrElse(Array(caseInsensitiveOptions("path")))
+              .flatMap{ pathString =>
+                val hdfsPath = new Path(pathString)
+                val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+                val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+                SparkHadoopUtil.get.globPathIfNecessary(qualified).map(_.toString)
+              }
           }
           dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
         case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>

http://git-wip-us.apache.org/repos/asf/spark/blob/57f83e36/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index d919877..832ea02 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -890,6 +890,24 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
       .collect()
   }
 
+  test("SPARK-10185: Read multiple Hadoop Filesystem paths and paths with a comma in it") {
+    withTempDir { dir =>
+      val df1 = Seq((1, 22)).toDF("a", "b")
+      val dir1 = new File(dir, "dir,1").getCanonicalPath
+      df1.write.format("json").save(dir1)
+
+      val df2 = Seq((2, 23)).toDF("a", "b")
+      val dir2 = new File(dir, "dir2").getCanonicalPath
+      df2.write.format("json").save(dir2)
+
+      checkAnswer(sqlContext.read.format("json").load(Array(dir1, dir2)),
+        Row(1, 22) :: Row(2, 23) :: Nil)
+
+      checkAnswer(sqlContext.read.format("json").load(dir1),
+        Row(1, 22) :: Nil)
+    }
+  }
+
   test("SPARK-10034: Sort on Aggregate with aggregation expression named 'aggOrdering'") {
     val df = Seq(1 -> 2).toDF("i", "j")
     val query = df.groupBy('i)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org