You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/04/28 01:15:45 UTC
[spark] branch master updated: [SPARK-35236][SQL] Support archive
files as resources for CREATE FUNCTION USING syntax
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new abb1f0c [SPARK-35236][SQL] Support archive files as resources for CREATE FUNCTION USING syntax
abb1f0c is described below
commit abb1f0c5d7e78b06dd5f2bf6856d2baf97f95b10
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Wed Apr 28 10:15:21 2021 +0900
[SPARK-35236][SQL] Support archive files as resources for CREATE FUNCTION USING syntax
### What changes were proposed in this pull request?
This PR proposes to make `CREATE FUNCTION USING` syntax can take archives as resources.
### Why are the changes needed?
It would be useful.
`CREATE FUNCTION USING` syntax doesn't support archives as resources because archives were not supported in Spark SQL.
Now Spark SQL supports archives so I think we can support them for the syntax.
### Does this PR introduce _any_ user-facing change?
Yes. Users can specify archives for `CREATE FUNCTION USING` syntax.
### How was this patch tested?
New test.
Closes #32359 from sarutak/load-function-using-archive.
Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
Signed-off-by: hyukjinkwon <gu...@apache.org>
---
.../apache/spark/sql/internal/SessionState.scala | 5 +-
.../spark/sql/hive/execution/HiveUDFSuite.scala | 53 +++++++++++++++++++++-
2 files changed, 53 insertions(+), 5 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 319b226..79fbca6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -150,10 +150,7 @@ class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoade
resource.resourceType match {
case JarResource => addJar(resource.uri)
case FileResource => session.sparkContext.addFile(resource.uri)
- case ArchiveResource =>
- throw new AnalysisException(
- "Archive is not allowed to be loaded. If YARN mode is used, " +
- "please use --archives options while calling spark-submit.")
+ case ArchiveResource => session.sparkContext.addArchive(resource.uri)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 9e8046b9..f988287 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.hive.execution
import java.io.{DataInput, DataOutput, File, PrintWriter}
import java.util.{ArrayList, Arrays, Properties}
+import scala.collection.JavaConverters._
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.ql.exec.UDF
import org.apache.hadoop.hive.ql.udf.{UDAFPercentile, UDFType}
@@ -30,6 +32,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectIns
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
import org.apache.hadoop.io.{LongWritable, Writable}
+import org.apache.spark.{SparkFiles, TestUtils}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.command.FunctionsCommand
@@ -676,6 +679,47 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
assert(msg2.contains(s"No handler for UDF/UDAF/UDTF '${classOf[ArraySumUDF].getName}'"))
}
}
+
+ test("SPARK-35236: CREATE FUNCTION should take an archive in USING clause") {
+ withTempDir { dir =>
+ withUserDefinedFunction("testListFiles1" -> false) {
+ val text1 = File.createTempFile("test1_", ".txt", dir)
+ val json1 = File.createTempFile("test1_", ".json", dir)
+ val zipFile1 = File.createTempFile("test1_", ".zip", dir)
+ TestUtils.createJar(Seq(text1, json1), zipFile1)
+
+ sql(s"CREATE FUNCTION testListFiles1 AS '${classOf[ListFiles].getName}' " +
+ s"USING ARCHIVE '${zipFile1.getAbsolutePath}'")
+ val df1 = sql(s"SELECT testListFiles1('${SparkFiles.get(zipFile1.getName)}')")
+ val fileList1 =
+ df1.collect().map(_.getList[String](0)).head.asScala.filter(_ != "META-INF")
+
+ assert(fileList1.length === 2)
+ assert(fileList1.contains(text1.getName))
+ assert(fileList1.contains(json1.getName))
+ }
+
+ // Test for file#alias style archive registration.
+ withUserDefinedFunction("testListFiles2" -> false) {
+ val text2 = File.createTempFile("test2_", ".txt", dir)
+ val json2 = File.createTempFile("test2_", ".json", dir)
+ val csv2 = File.createTempFile("test2", ".csv", dir)
+ val zipFile2 = File.createTempFile("test2_", ".zip", dir)
+ TestUtils.createJar(Seq(text2, json2, csv2), zipFile2)
+
+ sql(s"CREATE FUNCTION testListFiles2 AS '${classOf[ListFiles].getName}' " +
+ s"USING ARCHIVE '${zipFile2.getAbsolutePath}#foo'")
+ val df2 = sql(s"SELECT testListFiles2('${SparkFiles.get("foo")}')")
+ val fileList2 =
+ df2.collect().map(_.getList[String](0)).head.asScala.filter(_ != "META-INF")
+
+ assert(fileList2.length === 3)
+ assert(fileList2.contains(text2.getName))
+ assert(fileList2.contains(json2.getName))
+ assert(fileList2.contains(csv2.getName))
+ }
+ }
+ }
}
class TestPair(x: Int, y: Int) extends Writable with Serializable {
@@ -761,7 +805,6 @@ class StatelessUDF extends UDF {
}
class ArraySumUDF extends UDF {
- import scala.collection.JavaConverters._
def evaluate(values: java.util.List[java.lang.Double]): java.lang.Double = {
var r = 0d
for (v <- values.asScala) {
@@ -770,3 +813,11 @@ class ArraySumUDF extends UDF {
r
}
}
+
+class ListFiles extends UDF {
+ import java.util.{ArrayList, Arrays, List => JList}
+ def evaluate(path: String): JList[String] = {
+ val fileArray = new File(path).list()
+ if (fileArray != null) Arrays.asList(fileArray: _*) else new ArrayList[String]()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org