You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/03/02 07:13:47 UTC

[spark] branch master updated: [SPARK-42639][CONNECT] Add createDataFrame/createDataset methods

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a9626d578eb [SPARK-42639][CONNECT] Add createDataFrame/createDataset methods
a9626d578eb is described below

commit a9626d578ebe970f45c218701743d6eb3ad75340
Author: Herman van Hovell <he...@databricks.com>
AuthorDate: Wed Mar 1 23:13:33 2023 -0800

    [SPARK-42639][CONNECT] Add createDataFrame/createDataset methods
    
    ### What changes were proposed in this pull request?
    This PR adds all the `SparkSession.createDataFrame(..)` and `SparkSession.createDataset(..)` methods we can support in connect. The implicit conversion that uses this is also added.
    
    I moved the `ArrowWriter` class from sql/core to sql/catalyst for the arrow writing.
    
    ### Why are the changes needed?
    API partity with the existing SQL APIs
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    
    ### How was this patch tested?
    I have added a number of tests to `ClientE2ETestSuite`.
    
    Closes #40242 from hvanhovell/SPARK-42639.
    
    Authored-by: Herman van Hovell <he...@databricks.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../sql/{package.scala => DatasetHolder.scala}     |  25 ++++-
 .../scala/org/apache/spark/sql/SQLImplicits.scala  |  11 +-
 .../scala/org/apache/spark/sql/SparkSession.scala  | 118 ++++++++++++++++++++-
 .../sql/connect/client/util/ConvertToArrow.scala   |  72 +++++++++++++
 .../main/scala/org/apache/spark/sql/package.scala  |   6 ++
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  |  72 +++++++++++--
 .../sql/connect/client/CompatibilitySuite.scala    |  18 ++--
 .../spark/sql/execution/arrow/ArrowWriter.scala    |   0
 8 files changed, 297 insertions(+), 25 deletions(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DatasetHolder.scala
similarity index 50%
copy from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala
copy to connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DatasetHolder.scala
index ada94b76fcb..66f591bf1fb 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DatasetHolder.scala
@@ -14,9 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.spark.sql
 
-package org.apache.spark
+/**
+ * A container for a [[Dataset]], used for implicit conversions in Scala.
+ *
+ * To use this, import implicit conversions in SQL:
+ * {{{
+ *   val spark: SparkSession = ...
+ *   import spark.implicits._
+ * }}}
+ *
+ * @since 3.4.0
+ */
+case class DatasetHolder[T] private[sql] (private val ds: Dataset[T]) {
+
+  // This is declared with parentheses to prevent the Scala compiler from treating
+  // `rdd.toDS("1")` as invoking this toDS and then apply on the returned Dataset.
+  def toDS(): Dataset[T] = ds
+
+  // This is declared with parentheses to prevent the Scala compiler from treating
+  // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
+  def toDF(): DataFrame = ds.toDF()
 
-package object sql {
-  type DataFrame = Dataset[Row]
+  def toDF(colNames: String*): DataFrame = ds.toDF(colNames: _*)
 }
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
index 8f429541def..6c626fd716d 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql
 
+import scala.collection.Map
 import scala.language.implicitConversions
 import scala.reflect.classTag
 import scala.reflect.runtime.universe.TypeTag
@@ -30,7 +31,7 @@ import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
  *
  * @since 3.4.0
  */
-abstract class SQLImplicits extends LowPrioritySQLImplicits {
+abstract class SQLImplicits private[sql] (session: SparkSession) extends LowPrioritySQLImplicits {
 
   /**
    * Converts $"col name" into a [[Column]].
@@ -263,6 +264,14 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits {
   implicit def newProductArrayEncoder[A <: Product: TypeTag]: Encoder[Array[A]] = {
     newArrayEncoder(ScalaReflection.encoderFor[A])
   }
+
+  /**
+   * Creates a [[Dataset]] from a local Seq.
+   * @since 3.4.0
+   */
+  implicit def localSeqToDatasetHolder[T: Encoder](s: Seq[T]): DatasetHolder[T] = {
+    DatasetHolder(session.createDataset(s))
+  }
 }
 
 /**
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 48b86474b9e..d463af68832 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -21,16 +21,19 @@ import java.util.concurrent.TimeUnit._
 import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.JavaConverters._
+import scala.reflect.runtime.universe.TypeTag
 
 import org.apache.arrow.memory.RootAllocator
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.connect.proto
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection}
+import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder}
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BoxedLongEncoder, UnboundRowEncoder}
 import org.apache.spark.sql.connect.client.{SparkConnectClient, SparkResult}
-import org.apache.spark.sql.connect.client.util.Cleaner
+import org.apache.spark.sql.connect.client.util.{Cleaner, ConvertToArrow}
+import org.apache.spark.sql.types.StructType
 
 /**
  * The entry point to programming Spark with the Dataset and DataFrame API.
@@ -93,6 +96,115 @@ class SparkSession private[sql] (
     ret
   }
 
+  /**
+   * Returns a `DataFrame` with no rows or columns.
+   *
+   * @since 3.4.0
+   */
+  @transient
+  val emptyDataFrame: DataFrame = emptyDataset(UnboundRowEncoder)
+
+  /**
+   * Creates a new [[Dataset]] of type T containing zero elements.
+   *
+   * @since 3.4.0
+   */
+  def emptyDataset[T: Encoder]: Dataset[T] = createDataset[T](Nil)
+
+  private def createDataset[T](encoder: AgnosticEncoder[T], data: Iterator[T]): Dataset[T] = {
+    newDataset(encoder) { builder =>
+      val localRelationBuilder = builder.getLocalRelationBuilder
+        .setSchema(encoder.schema.catalogString)
+      if (data.nonEmpty) {
+        val timeZoneId = conf.get("spark.sql.session.timeZone")
+        val arrowData = ConvertToArrow(encoder, data, timeZoneId, allocator)
+        localRelationBuilder.setData(arrowData)
+      }
+    }
+  }
+
+  /**
+   * Creates a `DataFrame` from a local Seq of Product.
+   *
+   * @since 3.4.0
+   */
+  def createDataFrame[A <: Product: TypeTag](data: Seq[A]): DataFrame = {
+    createDataset(ScalaReflection.encoderFor[A], data.iterator).toDF()
+  }
+
+  /**
+   * :: DeveloperApi :: Creates a `DataFrame` from a `java.util.List` containing [[Row]]s using
+   * the given schema. It is important to make sure that the structure of every [[Row]] of the
+   * provided List matches the provided schema. Otherwise, there will be runtime exception.
+   *
+   * @since 3.4.0
+   */
+  def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = {
+    createDataset(RowEncoder.encoderFor(schema), rows.iterator().asScala).toDF()
+  }
+
+  /**
+   * Applies a schema to a List of Java Beans.
+   *
+   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, SELECT * queries
+   * will return the columns in an undefined order.
+   * @since 3.4.0
+   */
+  def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame = {
+    val encoder = JavaTypeInference.encoderFor(beanClass.asInstanceOf[Class[Any]])
+    createDataset(encoder, data.iterator().asScala).toDF()
+  }
+
+  /**
+   * Creates a [[Dataset]] from a local Seq of data of a given type. This method requires an
+   * encoder (to convert a JVM object of type `T` to and from the internal Spark SQL
+   * representation) that is generally created automatically through implicits from a
+   * `SparkSession`, or can be created explicitly by calling static methods on [[Encoders]].
+   *
+   * ==Example==
+   *
+   * {{{
+   *
+   *   import spark.implicits._
+   *   case class Person(name: String, age: Long)
+   *   val data = Seq(Person("Michael", 29), Person("Andy", 30), Person("Justin", 19))
+   *   val ds = spark.createDataset(data)
+   *
+   *   ds.show()
+   *   // +-------+---+
+   *   // |   name|age|
+   *   // +-------+---+
+   *   // |Michael| 29|
+   *   // |   Andy| 30|
+   *   // | Justin| 19|
+   *   // +-------+---+
+   * }}}
+   *
+   * @since 3.4.0
+   */
+  def createDataset[T: Encoder](data: Seq[T]): Dataset[T] = {
+    createDataset(encoderFor[T], data.iterator)
+  }
+
+  /**
+   * Creates a [[Dataset]] from a `java.util.List` of a given type. This method requires an
+   * encoder (to convert a JVM object of type `T` to and from the internal Spark SQL
+   * representation) that is generally created automatically through implicits from a
+   * `SparkSession`, or can be created explicitly by calling static methods on [[Encoders]].
+   *
+   * ==Java Example==
+   *
+   * {{{
+   *     List<String> data = Arrays.asList("hello", "world");
+   *     Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
+   * }}}
+   *
+   * @since 3.4.0
+   */
+  def createDataset[T: Encoder](data: java.util.List[T]): Dataset[T] = {
+    createDataset(data.asScala.toSeq)
+  }
+
   /**
    * Executes a SQL query substituting named parameters by the given arguments, returning the
    * result as a `DataFrame`. This API eagerly runs DDL/DML commands, but not for SELECT queries.
@@ -227,7 +339,7 @@ class SparkSession private[sql] (
    *
    * @since 3.4.0
    */
-  object implicits extends SQLImplicits
+  object implicits extends SQLImplicits(this)
   // scalastyle:on
 
   def newSession(): SparkSession = {
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/util/ConvertToArrow.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/util/ConvertToArrow.scala
new file mode 100644
index 00000000000..d124870e162
--- /dev/null
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/util/ConvertToArrow.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client.util
+
+import java.nio.channels.Channels
+
+import com.google.protobuf.ByteString
+import org.apache.arrow.memory.BufferAllocator
+import org.apache.arrow.vector.{VectorSchemaRoot, VectorUnloader}
+import org.apache.arrow.vector.ipc.{ArrowStreamWriter, WriteChannel}
+import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer}
+
+import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, ExpressionEncoder}
+import org.apache.spark.sql.execution.arrow.ArrowWriter
+import org.apache.spark.sql.util.ArrowUtils
+
+/**
+ * Utility for converting common Scala objects into Arrow IPC Stream.
+ */
+private[sql] object ConvertToArrow {
+
+  /**
+   * Convert an iterator of common Scala objects into a sinlge Arrow IPC Stream.
+   */
+  def apply[T](
+      encoder: AgnosticEncoder[T],
+      data: Iterator[T],
+      timeZoneId: String,
+      bufferAllocator: BufferAllocator): ByteString = {
+    val arrowSchema = ArrowUtils.toArrowSchema(encoder.schema, timeZoneId)
+    val root = VectorSchemaRoot.create(arrowSchema, bufferAllocator)
+    val writer: ArrowWriter = ArrowWriter.create(root)
+    val unloader = new VectorUnloader(root)
+    val bytes = ByteString.newOutput()
+    val channel = new WriteChannel(Channels.newChannel(bytes))
+
+    try {
+      // Convert and write the data to the vector root.
+      val serializer = ExpressionEncoder(encoder).createSerializer()
+      data.foreach(o => writer.write(serializer(o)))
+      writer.finish()
+
+      // Write the IPC Stream
+      MessageSerializer.serialize(channel, root.getSchema)
+      val batch = unloader.getRecordBatch
+      try MessageSerializer.serialize(channel, batch)
+      finally {
+        batch.close()
+      }
+      ArrowStreamWriter.writeEndOfStream(channel, IpcOption.DEFAULT)
+
+      // Done
+      bytes.toByteString
+    } finally {
+      root.close()
+    }
+  }
+}
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala
index ada94b76fcb..556b472283a 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala
@@ -17,6 +17,12 @@
 
 package org.apache.spark
 
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+
 package object sql {
   type DataFrame = Dataset[Row]
+
+  private[sql] def encoderFor[E: Encoder]: AgnosticEncoder[E] = {
+    implicitly[Encoder[E]].asInstanceOf[AgnosticEncoder[E]]
+  }
 }
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 274424f0f0d..ffbf3cee025 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -20,7 +20,6 @@ import java.io.{ByteArrayOutputStream, PrintStream}
 import java.nio.file.Files
 
 import scala.collection.JavaConverters._
-import scala.reflect.runtime.universe.TypeTag
 
 import io.grpc.StatusRuntimeException
 import org.apache.commons.io.FileUtils
@@ -28,7 +27,6 @@ import org.apache.commons.io.output.TeeOutputStream
 import org.scalactic.TolerantNumerics
 
 import org.apache.spark.SPARK_VERSION
-import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, RemoteSparkSession}
 import org.apache.spark.sql.functions.{aggregate, array, col, count, lit, rand, sequence, shuffle, struct, transform, udf}
 import org.apache.spark.sql.types._
@@ -410,12 +408,10 @@ class ClientE2ETestSuite extends RemoteSparkSession {
     assert(spark.range(10).count() === 10)
   }
 
-  // We can remove this as soon this is added to SQLImplicits.
-  private implicit def newProductEncoder[T <: Product: TypeTag]: Encoder[T] =
-    ScalaReflection.encoderFor[T]
-
   test("Dataset collect tuple") {
-    val result = spark
+    val session = spark
+    import session.implicits._
+    val result = session
       .range(3)
       .select(col("id"), (col("id") % 2).cast("int").as("a"), (col("id") / lit(10.0d)).as("b"))
       .as[(Long, Int, Double)]
@@ -442,7 +438,9 @@ class ClientE2ETestSuite extends RemoteSparkSession {
   }
 
   test("Dataset collect complex type") {
-    val result = spark
+    val session = spark
+    import session.implicits._
+    val result = session
       .range(3)
       .select(generateMyTypeColumns: _*)
       .as[MyType]
@@ -456,7 +454,9 @@ class ClientE2ETestSuite extends RemoteSparkSession {
   }
 
   test("Dataset typed select - complex column") {
-    val ds = spark
+    val session = spark
+    import session.implicits._
+    val ds = session
       .range(3)
       .select(struct(generateMyTypeColumns: _*).as[MyType])
     validateMyTypeResult(ds.collect())
@@ -537,6 +537,60 @@ class ClientE2ETestSuite extends RemoteSparkSession {
   test("SparkVersion") {
     assert(!spark.version.isEmpty)
   }
+
+  private def checkSameResult[E](expected: scala.collection.Seq[E], dataset: Dataset[E]): Unit = {
+    dataset.withResult { result =>
+      assert(expected === result.iterator.asScala.toBuffer)
+    }
+  }
+
+  test("Local Relation implicit conversion") {
+    val session = spark
+    import session.implicits._
+
+    val simpleValues = Seq(1, 24, 3)
+    checkSameResult(simpleValues, simpleValues.toDS())
+    checkSameResult(simpleValues.map(v => Row(v)), simpleValues.toDF())
+
+    val complexValues = Seq((5, "a"), (6, "b"))
+    checkSameResult(complexValues, complexValues.toDS())
+    checkSameResult(
+      complexValues.map(kv => KV(kv._2, kv._1)),
+      complexValues.toDF("value", "key").as[KV])
+  }
+
+  test("SparkSession.createDataFrame - row") {
+    val rows = java.util.Arrays.asList(Row("bob", 99), Row("Club", 5), Row("Bag", 5))
+    val schema = new StructType().add("key", "string").add("value", "int")
+    checkSameResult(rows.asScala, spark.createDataFrame(rows, schema))
+  }
+
+  test("SparkSession.createDataFrame - bean") {
+    def bean(v: String): SimpleBean = {
+      val bean = new SimpleBean
+      bean.setValue(v)
+      bean
+    }
+    val beans = java.util.Arrays.asList(bean("x"), bean("s"), bean("d"))
+    checkSameResult(
+      beans.asScala.map(b => Row(b.value)),
+      spark.createDataFrame(beans, classOf[SimpleBean]))
+  }
+
+  test("SparkSession typed createDataSet/createDataframe") {
+    val session = spark
+    import session.implicits._
+    val list = java.util.Arrays.asList(KV("bob", 99), KV("Club", 5), KV("Bag", 5))
+    checkSameResult(list.asScala, session.createDataset(list))
+    checkSameResult(
+      list.asScala.map(kv => Row(kv.key, kv.value)),
+      session.createDataFrame(list.asScala.toSeq))
+  }
 }
 
 private[sql] case class MyType(id: Long, a: Double, b: Double)
+private[sql] case class KV(key: String, value: Int)
+private[sql] class SimpleBean {
+  @scala.beans.BeanProperty
+  var value: String = _
+}
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
index 8728c351780..a4cb2a13e5b 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
@@ -82,7 +82,9 @@ class CompatibilitySuite extends ConnectFunSuite {
       IncludeByName("org.apache.spark.sql.RelationalGroupedDataset.*"),
       IncludeByName("org.apache.spark.sql.SparkSession.*"),
       IncludeByName("org.apache.spark.sql.RuntimeConfig.*"),
-      IncludeByName("org.apache.spark.sql.TypedColumn.*"))
+      IncludeByName("org.apache.spark.sql.TypedColumn.*"),
+      IncludeByName("org.apache.spark.sql.SQLImplicits.*"),
+      IncludeByName("org.apache.spark.sql.DatasetHolder.*"))
     val excludeRules = Seq(
       // Filter unsupported rules:
       // Note when muting errors for a method, checks on all overloading methods are also muted.
@@ -146,7 +148,6 @@ class CompatibilitySuite extends ConnectFunSuite {
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.this"),
 
       // SparkSession
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getDefaultSession"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.clearDefaultSession"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setDefaultSession"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sparkContext"),
@@ -157,9 +158,6 @@ class CompatibilitySuite extends ConnectFunSuite {
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.experimental"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.udf"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.streams"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.newSession"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.emptyDataFrame"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.emptyDataset"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.createDataFrame"),
       ProblemFilters.exclude[Problem](
         "org.apache.spark.sql.SparkSession.baseRelationToDataFrame"),
@@ -168,15 +166,17 @@ class CompatibilitySuite extends ConnectFunSuite {
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.executeCommand"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.readStream"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.this"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setDefaultSession"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.clearDefaultSession"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getDefaultSession"),
 
       // RuntimeConfig
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.RuntimeConfig.this"),
 
       // TypedColumn
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"))
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"),
+
+      // SQLImplicits
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.this"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.rddToDatasetHolder"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits._sqlContext"))
     val problems = allProblems
       .filter { p =>
         includedRules.exists(rule => rule(p))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
similarity index 100%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org