You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/03/02 07:13:47 UTC
[spark] branch master updated: [SPARK-42639][CONNECT] Add createDataFrame/createDataset methods
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a9626d578eb [SPARK-42639][CONNECT] Add createDataFrame/createDataset methods
a9626d578eb is described below
commit a9626d578ebe970f45c218701743d6eb3ad75340
Author: Herman van Hovell <he...@databricks.com>
AuthorDate: Wed Mar 1 23:13:33 2023 -0800
[SPARK-42639][CONNECT] Add createDataFrame/createDataset methods
### What changes were proposed in this pull request?
This PR adds all the `SparkSession.createDataFrame(..)` and `SparkSession.createDataset(..)` methods we can support in connect. The implicit conversion that uses this is also added.
I moved the `ArrowWriter` class from sql/core to sql/catalyst for the arrow writing.
### Why are the changes needed?
API partity with the existing SQL APIs
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
I have added a number of tests to `ClientE2ETestSuite`.
Closes #40242 from hvanhovell/SPARK-42639.
Authored-by: Herman van Hovell <he...@databricks.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../sql/{package.scala => DatasetHolder.scala} | 25 ++++-
.../scala/org/apache/spark/sql/SQLImplicits.scala | 11 +-
.../scala/org/apache/spark/sql/SparkSession.scala | 118 ++++++++++++++++++++-
.../sql/connect/client/util/ConvertToArrow.scala | 72 +++++++++++++
.../main/scala/org/apache/spark/sql/package.scala | 6 ++
.../org/apache/spark/sql/ClientE2ETestSuite.scala | 72 +++++++++++--
.../sql/connect/client/CompatibilitySuite.scala | 18 ++--
.../spark/sql/execution/arrow/ArrowWriter.scala | 0
8 files changed, 297 insertions(+), 25 deletions(-)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DatasetHolder.scala
similarity index 50%
copy from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala
copy to connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DatasetHolder.scala
index ada94b76fcb..66f591bf1fb 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DatasetHolder.scala
@@ -14,9 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.spark.sql
-package org.apache.spark
+/**
+ * A container for a [[Dataset]], used for implicit conversions in Scala.
+ *
+ * To use this, import implicit conversions in SQL:
+ * {{{
+ * val spark: SparkSession = ...
+ * import spark.implicits._
+ * }}}
+ *
+ * @since 3.4.0
+ */
+case class DatasetHolder[T] private[sql] (private val ds: Dataset[T]) {
+
+ // This is declared with parentheses to prevent the Scala compiler from treating
+ // `rdd.toDS("1")` as invoking this toDS and then apply on the returned Dataset.
+ def toDS(): Dataset[T] = ds
+
+ // This is declared with parentheses to prevent the Scala compiler from treating
+ // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
+ def toDF(): DataFrame = ds.toDF()
-package object sql {
- type DataFrame = Dataset[Row]
+ def toDF(colNames: String*): DataFrame = ds.toDF(colNames: _*)
}
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
index 8f429541def..6c626fd716d 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql
+import scala.collection.Map
import scala.language.implicitConversions
import scala.reflect.classTag
import scala.reflect.runtime.universe.TypeTag
@@ -30,7 +31,7 @@ import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
*
* @since 3.4.0
*/
-abstract class SQLImplicits extends LowPrioritySQLImplicits {
+abstract class SQLImplicits private[sql] (session: SparkSession) extends LowPrioritySQLImplicits {
/**
* Converts $"col name" into a [[Column]].
@@ -263,6 +264,14 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits {
implicit def newProductArrayEncoder[A <: Product: TypeTag]: Encoder[Array[A]] = {
newArrayEncoder(ScalaReflection.encoderFor[A])
}
+
+ /**
+ * Creates a [[Dataset]] from a local Seq.
+ * @since 3.4.0
+ */
+ implicit def localSeqToDatasetHolder[T: Encoder](s: Seq[T]): DatasetHolder[T] = {
+ DatasetHolder(session.createDataset(s))
+ }
}
/**
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 48b86474b9e..d463af68832 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -21,16 +21,19 @@ import java.util.concurrent.TimeUnit._
import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
+import scala.reflect.runtime.universe.TypeTag
import org.apache.arrow.memory.RootAllocator
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection}
+import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BoxedLongEncoder, UnboundRowEncoder}
import org.apache.spark.sql.connect.client.{SparkConnectClient, SparkResult}
-import org.apache.spark.sql.connect.client.util.Cleaner
+import org.apache.spark.sql.connect.client.util.{Cleaner, ConvertToArrow}
+import org.apache.spark.sql.types.StructType
/**
* The entry point to programming Spark with the Dataset and DataFrame API.
@@ -93,6 +96,115 @@ class SparkSession private[sql] (
ret
}
+ /**
+ * Returns a `DataFrame` with no rows or columns.
+ *
+ * @since 3.4.0
+ */
+ @transient
+ val emptyDataFrame: DataFrame = emptyDataset(UnboundRowEncoder)
+
+ /**
+ * Creates a new [[Dataset]] of type T containing zero elements.
+ *
+ * @since 3.4.0
+ */
+ def emptyDataset[T: Encoder]: Dataset[T] = createDataset[T](Nil)
+
+ private def createDataset[T](encoder: AgnosticEncoder[T], data: Iterator[T]): Dataset[T] = {
+ newDataset(encoder) { builder =>
+ val localRelationBuilder = builder.getLocalRelationBuilder
+ .setSchema(encoder.schema.catalogString)
+ if (data.nonEmpty) {
+ val timeZoneId = conf.get("spark.sql.session.timeZone")
+ val arrowData = ConvertToArrow(encoder, data, timeZoneId, allocator)
+ localRelationBuilder.setData(arrowData)
+ }
+ }
+ }
+
+ /**
+ * Creates a `DataFrame` from a local Seq of Product.
+ *
+ * @since 3.4.0
+ */
+ def createDataFrame[A <: Product: TypeTag](data: Seq[A]): DataFrame = {
+ createDataset(ScalaReflection.encoderFor[A], data.iterator).toDF()
+ }
+
+ /**
+ * :: DeveloperApi :: Creates a `DataFrame` from a `java.util.List` containing [[Row]]s using
+ * the given schema. It is important to make sure that the structure of every [[Row]] of the
+ * provided List matches the provided schema. Otherwise, there will be runtime exception.
+ *
+ * @since 3.4.0
+ */
+ def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = {
+ createDataset(RowEncoder.encoderFor(schema), rows.iterator().asScala).toDF()
+ }
+
+ /**
+ * Applies a schema to a List of Java Beans.
+ *
+ * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, SELECT * queries
+ * will return the columns in an undefined order.
+ * @since 3.4.0
+ */
+ def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame = {
+ val encoder = JavaTypeInference.encoderFor(beanClass.asInstanceOf[Class[Any]])
+ createDataset(encoder, data.iterator().asScala).toDF()
+ }
+
+ /**
+ * Creates a [[Dataset]] from a local Seq of data of a given type. This method requires an
+ * encoder (to convert a JVM object of type `T` to and from the internal Spark SQL
+ * representation) that is generally created automatically through implicits from a
+ * `SparkSession`, or can be created explicitly by calling static methods on [[Encoders]].
+ *
+ * ==Example==
+ *
+ * {{{
+ *
+ * import spark.implicits._
+ * case class Person(name: String, age: Long)
+ * val data = Seq(Person("Michael", 29), Person("Andy", 30), Person("Justin", 19))
+ * val ds = spark.createDataset(data)
+ *
+ * ds.show()
+ * // +-------+---+
+ * // | name|age|
+ * // +-------+---+
+ * // |Michael| 29|
+ * // | Andy| 30|
+ * // | Justin| 19|
+ * // +-------+---+
+ * }}}
+ *
+ * @since 3.4.0
+ */
+ def createDataset[T: Encoder](data: Seq[T]): Dataset[T] = {
+ createDataset(encoderFor[T], data.iterator)
+ }
+
+ /**
+ * Creates a [[Dataset]] from a `java.util.List` of a given type. This method requires an
+ * encoder (to convert a JVM object of type `T` to and from the internal Spark SQL
+ * representation) that is generally created automatically through implicits from a
+ * `SparkSession`, or can be created explicitly by calling static methods on [[Encoders]].
+ *
+ * ==Java Example==
+ *
+ * {{{
+ * List<String> data = Arrays.asList("hello", "world");
+ * Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
+ * }}}
+ *
+ * @since 3.4.0
+ */
+ def createDataset[T: Encoder](data: java.util.List[T]): Dataset[T] = {
+ createDataset(data.asScala.toSeq)
+ }
+
/**
* Executes a SQL query substituting named parameters by the given arguments, returning the
* result as a `DataFrame`. This API eagerly runs DDL/DML commands, but not for SELECT queries.
@@ -227,7 +339,7 @@ class SparkSession private[sql] (
*
* @since 3.4.0
*/
- object implicits extends SQLImplicits
+ object implicits extends SQLImplicits(this)
// scalastyle:on
def newSession(): SparkSession = {
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/util/ConvertToArrow.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/util/ConvertToArrow.scala
new file mode 100644
index 00000000000..d124870e162
--- /dev/null
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/util/ConvertToArrow.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client.util
+
+import java.nio.channels.Channels
+
+import com.google.protobuf.ByteString
+import org.apache.arrow.memory.BufferAllocator
+import org.apache.arrow.vector.{VectorSchemaRoot, VectorUnloader}
+import org.apache.arrow.vector.ipc.{ArrowStreamWriter, WriteChannel}
+import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer}
+
+import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, ExpressionEncoder}
+import org.apache.spark.sql.execution.arrow.ArrowWriter
+import org.apache.spark.sql.util.ArrowUtils
+
+/**
+ * Utility for converting common Scala objects into Arrow IPC Stream.
+ */
+private[sql] object ConvertToArrow {
+
+ /**
+ * Convert an iterator of common Scala objects into a sinlge Arrow IPC Stream.
+ */
+ def apply[T](
+ encoder: AgnosticEncoder[T],
+ data: Iterator[T],
+ timeZoneId: String,
+ bufferAllocator: BufferAllocator): ByteString = {
+ val arrowSchema = ArrowUtils.toArrowSchema(encoder.schema, timeZoneId)
+ val root = VectorSchemaRoot.create(arrowSchema, bufferAllocator)
+ val writer: ArrowWriter = ArrowWriter.create(root)
+ val unloader = new VectorUnloader(root)
+ val bytes = ByteString.newOutput()
+ val channel = new WriteChannel(Channels.newChannel(bytes))
+
+ try {
+ // Convert and write the data to the vector root.
+ val serializer = ExpressionEncoder(encoder).createSerializer()
+ data.foreach(o => writer.write(serializer(o)))
+ writer.finish()
+
+ // Write the IPC Stream
+ MessageSerializer.serialize(channel, root.getSchema)
+ val batch = unloader.getRecordBatch
+ try MessageSerializer.serialize(channel, batch)
+ finally {
+ batch.close()
+ }
+ ArrowStreamWriter.writeEndOfStream(channel, IpcOption.DEFAULT)
+
+ // Done
+ bytes.toByteString
+ } finally {
+ root.close()
+ }
+ }
+}
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala
index ada94b76fcb..556b472283a 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala
@@ -17,6 +17,12 @@
package org.apache.spark
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+
package object sql {
type DataFrame = Dataset[Row]
+
+ private[sql] def encoderFor[E: Encoder]: AgnosticEncoder[E] = {
+ implicitly[Encoder[E]].asInstanceOf[AgnosticEncoder[E]]
+ }
}
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 274424f0f0d..ffbf3cee025 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -20,7 +20,6 @@ import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.file.Files
import scala.collection.JavaConverters._
-import scala.reflect.runtime.universe.TypeTag
import io.grpc.StatusRuntimeException
import org.apache.commons.io.FileUtils
@@ -28,7 +27,6 @@ import org.apache.commons.io.output.TeeOutputStream
import org.scalactic.TolerantNumerics
import org.apache.spark.SPARK_VERSION
-import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, RemoteSparkSession}
import org.apache.spark.sql.functions.{aggregate, array, col, count, lit, rand, sequence, shuffle, struct, transform, udf}
import org.apache.spark.sql.types._
@@ -410,12 +408,10 @@ class ClientE2ETestSuite extends RemoteSparkSession {
assert(spark.range(10).count() === 10)
}
- // We can remove this as soon this is added to SQLImplicits.
- private implicit def newProductEncoder[T <: Product: TypeTag]: Encoder[T] =
- ScalaReflection.encoderFor[T]
-
test("Dataset collect tuple") {
- val result = spark
+ val session = spark
+ import session.implicits._
+ val result = session
.range(3)
.select(col("id"), (col("id") % 2).cast("int").as("a"), (col("id") / lit(10.0d)).as("b"))
.as[(Long, Int, Double)]
@@ -442,7 +438,9 @@ class ClientE2ETestSuite extends RemoteSparkSession {
}
test("Dataset collect complex type") {
- val result = spark
+ val session = spark
+ import session.implicits._
+ val result = session
.range(3)
.select(generateMyTypeColumns: _*)
.as[MyType]
@@ -456,7 +454,9 @@ class ClientE2ETestSuite extends RemoteSparkSession {
}
test("Dataset typed select - complex column") {
- val ds = spark
+ val session = spark
+ import session.implicits._
+ val ds = session
.range(3)
.select(struct(generateMyTypeColumns: _*).as[MyType])
validateMyTypeResult(ds.collect())
@@ -537,6 +537,60 @@ class ClientE2ETestSuite extends RemoteSparkSession {
test("SparkVersion") {
assert(!spark.version.isEmpty)
}
+
+ private def checkSameResult[E](expected: scala.collection.Seq[E], dataset: Dataset[E]): Unit = {
+ dataset.withResult { result =>
+ assert(expected === result.iterator.asScala.toBuffer)
+ }
+ }
+
+ test("Local Relation implicit conversion") {
+ val session = spark
+ import session.implicits._
+
+ val simpleValues = Seq(1, 24, 3)
+ checkSameResult(simpleValues, simpleValues.toDS())
+ checkSameResult(simpleValues.map(v => Row(v)), simpleValues.toDF())
+
+ val complexValues = Seq((5, "a"), (6, "b"))
+ checkSameResult(complexValues, complexValues.toDS())
+ checkSameResult(
+ complexValues.map(kv => KV(kv._2, kv._1)),
+ complexValues.toDF("value", "key").as[KV])
+ }
+
+ test("SparkSession.createDataFrame - row") {
+ val rows = java.util.Arrays.asList(Row("bob", 99), Row("Club", 5), Row("Bag", 5))
+ val schema = new StructType().add("key", "string").add("value", "int")
+ checkSameResult(rows.asScala, spark.createDataFrame(rows, schema))
+ }
+
+ test("SparkSession.createDataFrame - bean") {
+ def bean(v: String): SimpleBean = {
+ val bean = new SimpleBean
+ bean.setValue(v)
+ bean
+ }
+ val beans = java.util.Arrays.asList(bean("x"), bean("s"), bean("d"))
+ checkSameResult(
+ beans.asScala.map(b => Row(b.value)),
+ spark.createDataFrame(beans, classOf[SimpleBean]))
+ }
+
+ test("SparkSession typed createDataSet/createDataframe") {
+ val session = spark
+ import session.implicits._
+ val list = java.util.Arrays.asList(KV("bob", 99), KV("Club", 5), KV("Bag", 5))
+ checkSameResult(list.asScala, session.createDataset(list))
+ checkSameResult(
+ list.asScala.map(kv => Row(kv.key, kv.value)),
+ session.createDataFrame(list.asScala.toSeq))
+ }
}
private[sql] case class MyType(id: Long, a: Double, b: Double)
+private[sql] case class KV(key: String, value: Int)
+private[sql] class SimpleBean {
+ @scala.beans.BeanProperty
+ var value: String = _
+}
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
index 8728c351780..a4cb2a13e5b 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
@@ -82,7 +82,9 @@ class CompatibilitySuite extends ConnectFunSuite {
IncludeByName("org.apache.spark.sql.RelationalGroupedDataset.*"),
IncludeByName("org.apache.spark.sql.SparkSession.*"),
IncludeByName("org.apache.spark.sql.RuntimeConfig.*"),
- IncludeByName("org.apache.spark.sql.TypedColumn.*"))
+ IncludeByName("org.apache.spark.sql.TypedColumn.*"),
+ IncludeByName("org.apache.spark.sql.SQLImplicits.*"),
+ IncludeByName("org.apache.spark.sql.DatasetHolder.*"))
val excludeRules = Seq(
// Filter unsupported rules:
// Note when muting errors for a method, checks on all overloading methods are also muted.
@@ -146,7 +148,6 @@ class CompatibilitySuite extends ConnectFunSuite {
ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.this"),
// SparkSession
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getDefaultSession"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.clearDefaultSession"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setDefaultSession"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sparkContext"),
@@ -157,9 +158,6 @@ class CompatibilitySuite extends ConnectFunSuite {
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.experimental"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.udf"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.streams"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.newSession"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.emptyDataFrame"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.emptyDataset"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.createDataFrame"),
ProblemFilters.exclude[Problem](
"org.apache.spark.sql.SparkSession.baseRelationToDataFrame"),
@@ -168,15 +166,17 @@ class CompatibilitySuite extends ConnectFunSuite {
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.executeCommand"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.readStream"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.this"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setDefaultSession"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.clearDefaultSession"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getDefaultSession"),
// RuntimeConfig
ProblemFilters.exclude[Problem]("org.apache.spark.sql.RuntimeConfig.this"),
// TypedColumn
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"))
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"),
+
+ // SQLImplicits
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.this"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.rddToDatasetHolder"),
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits._sqlContext"))
val problems = allProblems
.filter { p =>
includedRules.exists(rule => rule(p))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
similarity index 100%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org