You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@kyuubi.apache.org by "lsm1 (via GitHub)" <gi...@apache.org> on 2023/10/31 15:12:21 UTC

[PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

lsm1 opened a new pull request, #5591:
URL: https://github.com/apache/kyuubi/pull/5591

   <!--
   Thanks for sending a pull request!
   
   Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/CONTRIBUTING.html
     2. If the PR is related to an issue in https://github.com/apache/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
   -->
   
   ### _Why are the changes needed?_
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you add a feature, you can talk about the use case of it.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   close #5377
   
   ### _How was this patch tested?_
   - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
   
   - [ ] Add screenshots for manual tests if appropriate
   
   - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request
   
   
   ### _Was this patch authored or co-authored using generative AI tooling?_
   <!--
   If a generative AI tooling has been used in the process of authoring this patch, please include
   phrase 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   NO


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "wForget (via GitHub)" <gi...@apache.org>.
wForget commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1387351552


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala:
##########
@@ -159,6 +160,25 @@ class ExecuteStatement(
         override def iterator: Iterator[Any] = incrementalCollectResult(resultDF)
       })
     } else {
+      val sparkSave = spark.conf.getOption(OPERATION_RESULT_SAVE_TO_FILE.key).map(_.toBoolean)
+        .getOrElse(session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE))
+      lazy val threshold =
+        session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD)
+      if (hasResultSet && sparkSave && shouldSaveResultToHdfs(resultMaxRows, threshold, result)) {

Review Comment:
   `shouldSaveResultToHdfs` is inferred based on the execution plan and may not be accurate. Should we change it to `sparkSave || shouldSaveResultToHdfs`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418539461


##########
kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala:
##########
@@ -1893,6 +1893,28 @@ object KyuubiConf {
       .intConf
       .createWithDefault(0)
 
+  val OPERATION_RESULT_SAVE_TO_FILE: ConfigEntry[Boolean] =
+    buildConf("kyuubi.operation.result.save.to.file")

Review Comment:
   ```
   kyuubi.operation.result.saveToFile.enabled
   kyuubi.operation.result.saveToFile.dir
   kyuubi.operation.result.saveToFile.minSize
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query result save to file [kyuubi]

Posted by "turboFei (via GitHub)" <gi...@apache.org>.
turboFei commented on PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#issuecomment-1853089095

   Some question:
   
   I wonder that, If the result is order needed, if we save the result into files and then read from when client fetching result, the result return to users is not ordered as expected. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "wForget (via GitHub)" <gi...@apache.org>.
wForget commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1387342443


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala:
##########
@@ -159,6 +160,25 @@ class ExecuteStatement(
         override def iterator: Iterator[Any] = incrementalCollectResult(resultDF)
       })
     } else {
+      val sparkSave = spark.conf.getOption(OPERATION_RESULT_SAVE_TO_FILE.key).map(_.toBoolean)
+        .getOrElse(session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE))
+      lazy val threshold =
+        session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD)
+      if (hasResultSet && sparkSave && shouldSaveResultToHdfs(resultMaxRows, threshold, result)) {
+        val sessionId = session.handle.identifier.toString
+        val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_PATH)
+        val fileName = s"$savePath/$engineId/$sessionId/$statementId"
+        val colName = range(0, result.schema.size).map(x => "col" + x)
+        if (resultMaxRows > 0) {
+          result.toDF(colName: _*).limit(resultMaxRows).write

Review Comment:
   Use `resultDF` instead of `result`. Also, is `toDF(colName: _*)` necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "wForget (via GitHub)" <gi...@apache.org>.
wForget commented on PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#issuecomment-1811744755

   > we still call `result.collect()` later, if the result is too large,we can not avoid driver OOM
   
   Can we combine incremental collection mode?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#issuecomment-1844923893

   @lsm1 I left some comments about the user-facing parts, I think it's good to go once those comments are addressed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "wForget (via GitHub)" <gi...@apache.org>.
wForget commented on PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#issuecomment-1811835864

   > Maybe you can test the scenario of generating multiple files
   
   The output seems to be in order even when outputting multiple files.
   
   ```
   set spark.sql.files.maxRecordsPerFile=10000;
   
   set kyuubi.operation.language=scala;
   
   spark.range(0, 1000000, 1, numPartitions = 10)
     .selectExpr("id", "cast(id as string) as name")
     .createOrReplaceTempView("wangzhen_test_20231115_tmp1")
   
   val df = spark.sql("select * from wangzhen_test_20231115_tmp1 order by id limit 100000");
   df.write.format("parquet").save("hdfs://XXX/result");
   
   spark.sql("set kyuubi.operation.language=sql");
   select * from `parquet`.`hdfs://XXX/result`;
   ```
   
   ![image](https://github.com/apache/kyuubi/assets/17894939/65dd77bf-86f3-4740-a9f1-3523db54e0f5)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "cxzl25 (via GitHub)" <gi...@apache.org>.
cxzl25 commented on PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#issuecomment-1810141459

   > It might be simpler for us to make changes in the `executeStatement` method, like:
   
   > result = spark.read.load(resultPath)
   
   This may lose the ordering of the query data, e.g. `order by xx limit 100`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418578943


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.operation
+
+import scala.Array._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocatedFileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.orc.mapred.OrcStruct
+import org.apache.orc.mapreduce.OrcInputFormat
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.RecordReaderIterator
+import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer
+import org.apache.spark.sql.types.StructType
+
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION
+import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator}
+import org.apache.kyuubi.util.reflect.DynConstructors
+
+class FetchOrcStatement(spark: SparkSession) {
+
+  var orcIter: OrcFileIterator = _
+  def getIterator(path: String, orcSchema: StructType): FetchIterator[Row] = {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val savePath = new Path(path)
+    val fsIterator = savePath.getFileSystem(conf).listFiles(savePath, false)
+    val list = new ListBuffer[LocatedFileStatus]
+    while (fsIterator.hasNext) {
+      val file = fsIterator.next()
+      if (file.getPath.getName.endsWith(".orc")) {
+        list += file
+      }
+    }
+    val toRowConverter: InternalRow => Row = {
+      CatalystTypeConverters.createToScalaConverter(orcSchema)
+        .asInstanceOf[InternalRow => Row]
+    }
+    val colId = range(0, orcSchema.size)
+    val fullSchema = orcSchema.map(f =>
+      AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+    val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+    val deserializer = getOrcDeserializer(orcSchema, colId)
+    orcIter = new OrcFileIterator(list)
+    val iterRow = orcIter.map(value =>
+      unsafeProjection(deserializer.deserialize(value)))
+      .map(value => toRowConverter(value))
+    new IterableFetchIterator[Row](iterRow.toIterable)
+  }
+
+  def close(): Unit = {
+    orcIter.close()
+  }
+
+  private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = {
+    try {
+      if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
+        // https://issues.apache.org/jira/browse/SPARK-34535

Review Comment:
   ```suggestion
           // SPARK-34535 changed the constructor signature of OrcDeserializer
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query result save to file [kyuubi]

Posted by "cfmcgrady (via GitHub)" <gi...@apache.org>.
cfmcgrady closed pull request #5591: [KYUUBI #5377] Spark engine query result save to file
URL: https://github.com/apache/kyuubi/pull/5591


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "wForget (via GitHub)" <gi...@apache.org>.
wForget commented on PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#issuecomment-1811711951

   > This may lose the ordering of the query data, e.g. `order by xx limit 100`
   
   I did a simple test and the results were as expected. (Test Env: Kyuubi 1.8.0 + Spark 3.5.0)
   
   ```
   create table wangzhen_test_20231115_t1(id bigint, name string) stored as parquet;
   insert into wangzhen_test_20231115_t1 values (1, 'a');
   insert into wangzhen_test_20231115_t1 values (2, 'b');
   insert into wangzhen_test_20231115_t1 values (3, 'c');
   
   set kyuubi.operation.language=scala;
   
   val df = spark.sql("select * from wangzhen_test_20231115_t1 order by id limit 2");
   df.write.format("parquet").save("hfds://XXX/result.parquet");
   
   spark.sql("set kyuubi.operation.language=sql");
   select * from `parquet`.`hfds://XXX/result.parquet`;
   ```
   
   ```
   == Physical Plan ==
   Execute InsertIntoHadoopFsRelationCommand (5)
   +- WriteFiles (4)
      +- TakeOrderedAndProject (3)
         +- * ColumnarToRow (2)
            +- Scan parquet spark_catalog.XXX.wangzhen_test_20231115_t1 (1)
   ```
   ![image](https://github.com/apache/kyuubi/assets/17894939/63220d12-95e8-46f3-9cc1-9ff4313c0c06)
   
   ![image](https://github.com/apache/kyuubi/assets/17894939/7a9b3fda-8dcc-45e0-a2bd-6ad89a098d31)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query result save to file [kyuubi]

Posted by "cfmcgrady (via GitHub)" <gi...@apache.org>.
cfmcgrady commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1420257763


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.operation
+
+import scala.Array._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocatedFileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.orc.mapred.OrcStruct
+import org.apache.orc.mapreduce.OrcInputFormat
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.RecordReaderIterator
+import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer
+import org.apache.spark.sql.types.StructType
+
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION
+import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator}
+import org.apache.kyuubi.util.reflect.DynConstructors
+
+class FetchOrcStatement(spark: SparkSession) {
+
+  var orcIter: OrcFileIterator = _
+  def getIterator(path: String, orcSchema: StructType): FetchIterator[Row] = {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val savePath = new Path(path)
+    val fsIterator = savePath.getFileSystem(conf).listFiles(savePath, false)
+    val list = new ListBuffer[LocatedFileStatus]
+    while (fsIterator.hasNext) {
+      val file = fsIterator.next()
+      if (file.getPath.getName.endsWith(".orc")) {
+        list += file
+      }
+    }
+    val toRowConverter: InternalRow => Row = {
+      CatalystTypeConverters.createToScalaConverter(orcSchema)
+        .asInstanceOf[InternalRow => Row]
+    }
+    val colId = range(0, orcSchema.size)
+    val fullSchema = orcSchema.map(f =>
+      AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+    val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+    val deserializer = getOrcDeserializer(orcSchema, colId)
+    orcIter = new OrcFileIterator(list)
+    val iterRow = orcIter.map(value =>
+      unsafeProjection(deserializer.deserialize(value)))
+      .map(value => toRowConverter(value))
+    new IterableFetchIterator[Row](iterRow.toIterable)
+  }
+
+  def close(): Unit = {
+    orcIter.close()
+  }
+
+  private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = {
+    try {
+      if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
+        // SPARK-34535 changed the constructor signature of OrcDeserializer
+        DynConstructors.builder()
+          .impl(classOf[OrcDeserializer], classOf[StructType], classOf[Array[Int]])
+          .build[OrcDeserializer]()
+          .newInstance(
+            orcSchema,
+            colId)
+      } else {
+        DynConstructors.builder()
+          .impl(
+            classOf[OrcDeserializer],
+            classOf[StructType],
+            classOf[StructType],
+            classOf[Array[Int]])
+          .build[OrcDeserializer]()
+          .newInstance(
+            new StructType,
+            orcSchema,
+            colId)
+      }
+    } catch {
+      case e: Throwable =>
+        throw new KyuubiException("Failed to create OrcDeserializer", e)
+    }
+  }
+}
+
+class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends Iterator[OrcStruct] {
+
+  private val iters = fileList.map(x => getOrcFileIterator(x))
+
+  var idx = 0
+
+  override def hasNext: Boolean = {
+    if (iters(idx).hasNext) {
+      true
+    } else {
+      iters(idx).close()
+      idx < iters.size - 1
+    }
+  }
+
+  override def next(): OrcStruct = {
+    if (iters(idx).hasNext) {
+      iters(idx).next()
+    } else {
+      idx = idx + 1
+      iters(idx).next()

Review Comment:
   The index for `iters` has been adjusted from `idx` to `idx + 1` after the check condition `iters(idx).hasNext`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418548278


##########
kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala:
##########
@@ -1893,6 +1893,28 @@ object KyuubiConf {
       .intConf
       .createWithDefault(0)
 
+  val OPERATION_RESULT_SAVE_TO_FILE: ConfigEntry[Boolean] =
+    buildConf("kyuubi.operation.result.save.to.file")
+      .doc("The switch for Spark query result save to hdfs file")
+      .version("1.9.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val OPERATION_RESULT_SAVE_TO_FILE_PATH: ConfigEntry[String] =
+    buildConf("kyuubi.operation.result.save.to.file.path")
+      .doc("The hdfs path of Spark query result save")
+      .version("1.9.0")
+      .stringConf
+      .createWithDefault("/tmp/kyuubi/tmp_kyuubi_result")
+
+  val OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD: ConfigEntry[Int] =
+    buildConf("kyuubi.operation.result.save.to.file.threshold")
+      .doc("The threshold of Spark result save to hdfs file, default value is 200 MB")

Review Comment:
   let's add a brief description of the size calculation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418546755


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala:
##########
@@ -278,4 +279,32 @@ object SparkDatasetHelper extends Logging {
     val executionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
     SQLMetrics.postDriverMetricUpdates(sc, executionId, metrics.values.toSeq)
   }
+
+  def shouldSaveResultToHdfs(resultMaxRows: Int, threshold: Int, result: DataFrame): Boolean = {

Review Comment:
   ```suggestion
     def shouldSaveResultToFs(resultMaxRows: Int, threshold: Int, result: DataFrame): Boolean = {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query result save to file [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1432442569


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala:
##########
@@ -158,6 +171,24 @@ class ExecuteStatement(
         override def iterator: Iterator[Any] = incrementalCollectResult(resultDF)
       })
     } else {
+      val sparkSave = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark)
+      lazy val threshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD, spark)
+      if (hasResultSet && sparkSave && shouldSaveResultToHdfs(resultMaxRows, threshold, result)) {
+        val sessionId = session.handle.identifier.toString
+        val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_PATH)
+        saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId")
+        val colName = range(0, result.schema.size).map(x => "col" + x)
+        if (resultMaxRows > 0) {
+          result.toDF(colName: _*).limit(resultMaxRows).write
+            .option("compression", "zstd").format("orc").save(saveFileName.get)

Review Comment:
   maybe it's time to have a discussion for dropping support of Spark 3.1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query result save to file [kyuubi]

Posted by "lsm1 (via GitHub)" <gi...@apache.org>.
lsm1 commented on PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#issuecomment-1853401462

   > Some question:
   > 
   > I wonder that, If the result is order needed, if we save the result into files and then read from when client fetching result, the result returned to users is not ordered as expected.
   
   1. spark save ordered result to multiple `part-X` files in the filesystem, in order of the keys
    https://github.com/lsm1/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala#L47
     ```
     org.apache.spark.rdd.OrderedRDDFunctions#sortByKey
      /**
     	* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
     	* `collect` or `save` on the resulting RDD will return or output an ordered list of records
     	* (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
     	* order of the keys).  
     */
     ```
   2. When fetchOrcStatement read file, it will be sorted by file name, so it will return ordered result


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query result save to file [kyuubi]

Posted by "turboFei (via GitHub)" <gi...@apache.org>.
turboFei commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1432908204


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala:
##########
@@ -158,6 +171,24 @@ class ExecuteStatement(
         override def iterator: Iterator[Any] = incrementalCollectResult(resultDF)
       })
     } else {
+      val sparkSave = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark)
+      lazy val threshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD, spark)
+      if (hasResultSet && sparkSave && shouldSaveResultToHdfs(resultMaxRows, threshold, result)) {
+        val sessionId = session.handle.identifier.toString
+        val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_PATH)
+        saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId")
+        val colName = range(0, result.schema.size).map(x => "col" + x)
+        if (resultMaxRows > 0) {
+          result.toDF(colName: _*).limit(resultMaxRows).write
+            .option("compression", "zstd").format("orc").save(saveFileName.get)

Review Comment:
   😅we are still using spark 3.1, It need time to migrate to new spark version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "wForget (via GitHub)" <gi...@apache.org>.
wForget commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1393565693


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala:
##########
@@ -101,6 +113,10 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
         exec,
         Duration(60, TimeUnit.SECONDS))
     })
+    engineSavePath.foreach { p =>

Review Comment:
   > `engineSavePath` can be HDFS as the storage path, so it is not the same as tmp dir.
   
   got it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "cxzl25 (via GitHub)" <gi...@apache.org>.
cxzl25 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1410140914


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.operation
+
+import scala.Array._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocatedFileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.orc.mapred.OrcStruct
+import org.apache.orc.mapreduce.OrcInputFormat
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.RecordReaderIterator
+import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer
+import org.apache.spark.sql.types.StructType
+
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator}
+
+class FetchOrcStatement(spark: SparkSession) {
+  def getIterator(path: String, orcSchema: StructType): FetchIterator[Row] = {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val savePath = new Path(path)
+    val fsIterator = savePath.getFileSystem(conf).listFiles(savePath, false)
+    val list = new ListBuffer[LocatedFileStatus]
+    while (fsIterator.hasNext) {
+      val file = fsIterator.next()
+      if (file.getPath.getName.endsWith(".orc")) {
+        list += file
+      }
+    }
+    val toRowConverter: InternalRow => Row = {
+      CatalystTypeConverters.createToScalaConverter(orcSchema)
+        .asInstanceOf[InternalRow => Row]
+    }
+    val colId = range(0, orcSchema.size)
+    val fullSchema = orcSchema.map(f =>
+      AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+    val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+    val deserializer = getOrcDeserializer(orcSchema, colId)
+    val iter = new OrcFileIterator(list)
+    val iterRow = iter.map(value =>
+      unsafeProjection(deserializer.deserialize(value)))
+      .map(value => toRowConverter(value))
+    new IterableFetchIterator[Row](iterRow.toIterable)
+  }
+
+  private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = {
+    try {
+      val cls = Class.forName("org.apache.spark.sql.execution.datasources.orc.OrcDeserializer")
+      val constructor = cls.getDeclaredConstructors.apply(0)
+      if (constructor.getParameterCount == 3) {
+        constructor.newInstance(new StructType, orcSchema, colId).asInstanceOf[OrcDeserializer]
+      } else {
+        constructor.newInstance(orcSchema, colId).asInstanceOf[OrcDeserializer]
+      }

Review Comment:
   ```suggestion
         if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
           // https://issues.apache.org/jira/browse/SPARK-34535
           DynConstructors.builder()
             .impl(
               classOf[OrcDeserializer],
               classOf[StructType],
               classOf[Array[Int]])
             .build[OrcDeserializer]()
             .newInstance(
               new StructType,
               orcSchema,
               colId)
         } else {
           DynConstructors.builder()
             .impl(
               classOf[OrcDeserializer],
               classOf[StructType],
               classOf[StructType],
               classOf[Array[Int]])
             .build[OrcDeserializer]()
             .newInstance(
               new StructType,
               orcSchema,
               colId)
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418539461


##########
kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala:
##########
@@ -1893,6 +1893,28 @@ object KyuubiConf {
       .intConf
       .createWithDefault(0)
 
+  val OPERATION_RESULT_SAVE_TO_FILE: ConfigEntry[Boolean] =
+    buildConf("kyuubi.operation.result.save.to.file")

Review Comment:
   ```
   kyuubi.operation.result.saveToFile.enabled
   kyuubi.operation.result.saveToFile.dir
   kyuubi.operation.result.saveToFile.minSize  (we may want to support row num based threshold in the future)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418725236


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala:
##########
@@ -158,6 +171,24 @@ class ExecuteStatement(
         override def iterator: Iterator[Any] = incrementalCollectResult(resultDF)
       })
     } else {
+      val sparkSave = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark)
+      lazy val threshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD, spark)
+      if (hasResultSet && sparkSave && shouldSaveResultToHdfs(resultMaxRows, threshold, result)) {
+        val sessionId = session.handle.identifier.toString
+        val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_PATH)
+        saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId")
+        val colName = range(0, result.schema.size).map(x => "col" + x)
+        if (resultMaxRows > 0) {
+          result.toDF(colName: _*).limit(resultMaxRows).write
+            .option("compression", "zstd").format("orc").save(saveFileName.get)

Review Comment:
   I prefer to keep it internally, it may be out of control if we expose the internal implementation early



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "lsm1 (via GitHub)" <gi...@apache.org>.
lsm1 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418849915


##########
kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala:
##########
@@ -1893,6 +1893,28 @@ object KyuubiConf {
       .intConf
       .createWithDefault(0)
 
+  val OPERATION_RESULT_SAVE_TO_FILE: ConfigEntry[Boolean] =
+    buildConf("kyuubi.operation.result.save.to.file")
+      .doc("The switch for Spark query result save to hdfs file")
+      .version("1.9.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val OPERATION_RESULT_SAVE_TO_FILE_PATH: ConfigEntry[String] =
+    buildConf("kyuubi.operation.result.save.to.file.path")
+      .doc("The hdfs path of Spark query result save")
+      .version("1.9.0")
+      .stringConf
+      .createWithDefault("/tmp/kyuubi/tmp_kyuubi_result")
+
+  val OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD: ConfigEntry[Int] =
+    buildConf("kyuubi.operation.result.save.to.file.threshold")
+      .doc("The threshold of Spark result save to hdfs file, default value is 200 MB")
+      .version("1.9.0")
+      .intConf
+      .checkValue(_ > 0, "must be positive value")
+      .createWithDefault(209715200)

Review Comment:
   `kyuubiConf` doesn't have `byteConf` like spark, I can submit another pr if need



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query result save to file [kyuubi]

Posted by "cfmcgrady (via GitHub)" <gi...@apache.org>.
cfmcgrady commented on PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#issuecomment-1853430414

   thanks all. merging to master(v1.9.0)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query result save to file [kyuubi]

Posted by "cxzl25 (via GitHub)" <gi...@apache.org>.
cxzl25 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1432281147


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala:
##########
@@ -158,6 +171,24 @@ class ExecuteStatement(
         override def iterator: Iterator[Any] = incrementalCollectResult(resultDF)
       })
     } else {
+      val sparkSave = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark)
+      lazy val threshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD, spark)
+      if (hasResultSet && sparkSave && shouldSaveResultToHdfs(resultMaxRows, threshold, result)) {
+        val sessionId = session.handle.identifier.toString
+        val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_PATH)
+        saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId")
+        val colName = range(0, result.schema.size).map(x => "col" + x)
+        if (resultMaxRows > 0) {
+          result.toDF(colName: _*).limit(resultMaxRows).write
+            .option("compression", "zstd").format("orc").save(saveFileName.get)

Review Comment:
   [SPARK-33978][SQL] Support ZSTD compression in ORC data source
   https://issues.apache.org/jira/browse/SPARK-33978
   Fix Version/s: 3.2.0
   
   Maybe we need a configuration item, or the Spark version less than 3.2.0 is compressed with zlib.
   
   3.1.1 bin/spark-shell
   ```scala
   scala> spark.range(10).write.option("compression", "zstd").orc("/tmp/zstd")
   java.lang.IllegalArgumentException: Codec [zstd] is not available. Available codecs are uncompressed, lzo, snappy, zlib, none.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418540310


##########
kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala:
##########
@@ -1893,6 +1893,28 @@ object KyuubiConf {
       .intConf
       .createWithDefault(0)
 
+  val OPERATION_RESULT_SAVE_TO_FILE: ConfigEntry[Boolean] =
+    buildConf("kyuubi.operation.result.save.to.file")
+      .doc("The switch for Spark query result save to hdfs file")

Review Comment:
   the dir is not limited to HDFS



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query result save to file [kyuubi]

Posted by "cfmcgrady (via GitHub)" <gi...@apache.org>.
cfmcgrady commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1420227178


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.operation
+
+import scala.Array._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocatedFileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.orc.mapred.OrcStruct
+import org.apache.orc.mapreduce.OrcInputFormat
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.RecordReaderIterator
+import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer
+import org.apache.spark.sql.types.StructType
+
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION
+import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator}
+import org.apache.kyuubi.util.reflect.DynConstructors
+
+class FetchOrcStatement(spark: SparkSession) {
+
+  var orcIter: OrcFileIterator = _
+  def getIterator(path: String, orcSchema: StructType): FetchIterator[Row] = {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val savePath = new Path(path)
+    val fsIterator = savePath.getFileSystem(conf).listFiles(savePath, false)
+    val list = new ListBuffer[LocatedFileStatus]
+    while (fsIterator.hasNext) {
+      val file = fsIterator.next()
+      if (file.getPath.getName.endsWith(".orc")) {
+        list += file
+      }
+    }
+    val toRowConverter: InternalRow => Row = {
+      CatalystTypeConverters.createToScalaConverter(orcSchema)
+        .asInstanceOf[InternalRow => Row]
+    }
+    val colId = range(0, orcSchema.size)
+    val fullSchema = orcSchema.map(f =>
+      AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+    val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+    val deserializer = getOrcDeserializer(orcSchema, colId)
+    orcIter = new OrcFileIterator(list)
+    val iterRow = orcIter.map(value =>
+      unsafeProjection(deserializer.deserialize(value)))
+      .map(value => toRowConverter(value))
+    new IterableFetchIterator[Row](iterRow.toIterable)
+  }
+
+  def close(): Unit = {
+    orcIter.close()
+  }
+
+  private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = {
+    try {
+      if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
+        // SPARK-34535 changed the constructor signature of OrcDeserializer
+        DynConstructors.builder()
+          .impl(classOf[OrcDeserializer], classOf[StructType], classOf[Array[Int]])
+          .build[OrcDeserializer]()
+          .newInstance(
+            orcSchema,
+            colId)
+      } else {
+        DynConstructors.builder()
+          .impl(
+            classOf[OrcDeserializer],
+            classOf[StructType],
+            classOf[StructType],
+            classOf[Array[Int]])
+          .build[OrcDeserializer]()
+          .newInstance(
+            new StructType,
+            orcSchema,
+            colId)
+      }
+    } catch {
+      case e: Throwable =>
+        throw new KyuubiException("Failed to create OrcDeserializer", e)
+    }
+  }
+}
+
+class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends Iterator[OrcStruct] {
+
+  private val iters = fileList.map(x => getOrcFileIterator(x))
+
+  var idx = 0
+
+  override def hasNext: Boolean = {
+    if (iters(idx).hasNext) {
+      true
+    } else {
+      iters(idx).close()
+      idx < iters.size - 1
+    }
+  }
+
+  override def next(): OrcStruct = {
+    if (iters(idx).hasNext) {
+      iters(idx).next()
+    } else {
+      idx = idx + 1
+      iters(idx).next()

Review Comment:
   Is it possible that the `iters(idx)` can be empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query result save to file [kyuubi]

Posted by "cfmcgrady (via GitHub)" <gi...@apache.org>.
cfmcgrady commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1420233336


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala:
##########
@@ -184,6 +186,12 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
       info("Session stopped due to shared level is Connection.")
       stopSession()
     }
+    if (conf.get(OPERATION_RESULT_SAVE_TO_FILE)) {

Review Comment:
   nit: only cleanup for the operation `ExecuteStatement`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query result save to file [kyuubi]

Posted by "lsm1 (via GitHub)" <gi...@apache.org>.
lsm1 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1420277031


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.operation
+
+import scala.Array._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocatedFileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.orc.mapred.OrcStruct
+import org.apache.orc.mapreduce.OrcInputFormat
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.RecordReaderIterator
+import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer
+import org.apache.spark.sql.types.StructType
+
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION
+import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator}
+import org.apache.kyuubi.util.reflect.DynConstructors
+
+class FetchOrcStatement(spark: SparkSession) {
+
+  var orcIter: OrcFileIterator = _
+  def getIterator(path: String, orcSchema: StructType): FetchIterator[Row] = {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val savePath = new Path(path)
+    val fsIterator = savePath.getFileSystem(conf).listFiles(savePath, false)
+    val list = new ListBuffer[LocatedFileStatus]
+    while (fsIterator.hasNext) {
+      val file = fsIterator.next()
+      if (file.getPath.getName.endsWith(".orc")) {
+        list += file
+      }
+    }
+    val toRowConverter: InternalRow => Row = {
+      CatalystTypeConverters.createToScalaConverter(orcSchema)
+        .asInstanceOf[InternalRow => Row]
+    }
+    val colId = range(0, orcSchema.size)
+    val fullSchema = orcSchema.map(f =>
+      AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+    val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+    val deserializer = getOrcDeserializer(orcSchema, colId)
+    orcIter = new OrcFileIterator(list)
+    val iterRow = orcIter.map(value =>
+      unsafeProjection(deserializer.deserialize(value)))
+      .map(value => toRowConverter(value))
+    new IterableFetchIterator[Row](iterRow.toIterable)
+  }
+
+  def close(): Unit = {
+    orcIter.close()
+  }
+
+  private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = {
+    try {
+      if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
+        // SPARK-34535 changed the constructor signature of OrcDeserializer
+        DynConstructors.builder()
+          .impl(classOf[OrcDeserializer], classOf[StructType], classOf[Array[Int]])
+          .build[OrcDeserializer]()
+          .newInstance(
+            orcSchema,
+            colId)
+      } else {
+        DynConstructors.builder()
+          .impl(
+            classOf[OrcDeserializer],
+            classOf[StructType],
+            classOf[StructType],
+            classOf[Array[Int]])
+          .build[OrcDeserializer]()
+          .newInstance(
+            new StructType,
+            orcSchema,
+            colId)
+      }
+    } catch {
+      case e: Throwable =>
+        throw new KyuubiException("Failed to create OrcDeserializer", e)
+    }
+  }
+}
+
+class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends Iterator[OrcStruct] {
+
+  private val iters = fileList.map(x => getOrcFileIterator(x))
+
+  var idx = 0
+
+  override def hasNext: Boolean = {
+    if (iters(idx).hasNext) {
+      true
+    } else {
+      iters(idx).close()
+      idx < iters.size - 1
+    }
+  }
+
+  override def next(): OrcStruct = {
+    if (iters(idx).hasNext) {
+      iters(idx).next()
+    } else {
+      idx = idx + 1
+      iters(idx).next()

Review Comment:
   hasnext make sure `idx < iters.size - 1`, `idx + 1` would not out of bounds?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "cxzl25 (via GitHub)" <gi...@apache.org>.
cxzl25 commented on PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#issuecomment-1811780430

   > I did a simple test and the results were as expected
   
   Maybe you can test the scenario of generating multiple files


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "lsm1 (via GitHub)" <gi...@apache.org>.
lsm1 commented on PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#issuecomment-1811737443

   > ```
   > if (saveResultToPath) {
   >   spark.sql(statement).write.format(format).save(resultPath)
   >   result = spark.read.load(resultPath)
   > } else {
   >   result = spark.sql(statement)
   > }
   > ```
   we still call `result.collect()` later, if the result is too large,we can not avoid driver OOM
    
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "wForget (via GitHub)" <gi...@apache.org>.
wForget commented on PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#issuecomment-1811800164

   > Maybe you can test the scenario of generating multiple files
   
   Do you mean a large data set or multiple tasks?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1393645302


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala:
##########
@@ -278,4 +279,34 @@ object SparkDatasetHelper extends Logging {
     val executionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
     SQLMetrics.postDriverMetricUpdates(sc, executionId, metrics.values.toSeq)
   }
+
+  def shouldSaveResultToHdfs(resultMaxRows: Int, threshold: Int, result: DataFrame): Boolean = {
+    lazy val limit = result.queryExecution.executedPlan match {
+      case plan if isCommandExec(plan.nodeName) => 0
+      case collectLimit: CollectLimitExec => collectLimit.limit
+      case _ => resultMaxRows
+    }
+    lazy val stats = if (limit > 0) {
+      limit * EstimationUtils.getSizePerRow(
+        result.queryExecution.executedPlan.output)
+    } else {
+      result.queryExecution.optimizedPlan.stats.sizeInBytes
+    }
+    lazy val isSort = result.queryExecution.sparkPlan match {
+      case SortExec(_, global, _, _) if global => true

Review Comment:
   is it binary-compatible across different spark versions? we must ensure that engine jar built against spark 3.4 also works on spark 3.1 ~ 3.5 runtimes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418562380


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala:
##########
@@ -158,6 +171,24 @@ class ExecuteStatement(
         override def iterator: Iterator[Any] = incrementalCollectResult(resultDF)
       })
     } else {
+      val sparkSave = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark)
+      lazy val threshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD, spark)
+      if (hasResultSet && sparkSave && shouldSaveResultToHdfs(resultMaxRows, threshold, result)) {
+        val sessionId = session.handle.identifier.toString
+        val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_PATH)
+        saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId")
+        val colName = range(0, result.schema.size).map(x => "col" + x)
+        if (resultMaxRows > 0) {
+          result.toDF(colName: _*).limit(resultMaxRows).write
+            .option("compression", "zstd").format("orc").save(saveFileName.get)

Review Comment:
   IIRC, the zstd implementation of ORC is powered by aircompressor, I'm not sure if it's powerful as zstd-jni does. Anyway, it's internal implementation detail, we can change it latter if we have better solution



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "cfmcgrady (via GitHub)" <gi...@apache.org>.
cfmcgrady commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418722393


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala:
##########
@@ -159,6 +160,25 @@ class ExecuteStatement(
         override def iterator: Iterator[Any] = incrementalCollectResult(resultDF)
       })
     } else {
+      val sparkSave = spark.conf.getOption(OPERATION_RESULT_SAVE_TO_FILE.key).map(_.toBoolean)
+        .getOrElse(session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE))
+      lazy val threshold =
+        session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD)
+      if (hasResultSet && sparkSave && shouldSaveResultToHdfs(resultMaxRows, threshold, result)) {
+        val sessionId = session.handle.identifier.toString
+        val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_PATH)
+        val fileName = s"$savePath/$engineId/$sessionId/$statementId"
+        val colName = range(0, result.schema.size).map(x => "col" + x)
+        if (resultMaxRows > 0) {
+          result.toDF(colName: _*).limit(resultMaxRows).write

Review Comment:
   there is another known issue as I mentioned in the issue comment
   
   > directly call `df.write` will introduce an extra shuffle for the outermost limit, and hurt performance
   
   I think we should also add this known issue to the comment and create a new ticket to track this issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query result save to file [kyuubi]

Posted by "lsm1 (via GitHub)" <gi...@apache.org>.
lsm1 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1420244650


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.operation
+
+import scala.Array._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocatedFileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.orc.mapred.OrcStruct
+import org.apache.orc.mapreduce.OrcInputFormat
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.RecordReaderIterator
+import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer
+import org.apache.spark.sql.types.StructType
+
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION
+import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator}
+import org.apache.kyuubi.util.reflect.DynConstructors
+
+class FetchOrcStatement(spark: SparkSession) {
+
+  var orcIter: OrcFileIterator = _
+  def getIterator(path: String, orcSchema: StructType): FetchIterator[Row] = {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val savePath = new Path(path)
+    val fsIterator = savePath.getFileSystem(conf).listFiles(savePath, false)
+    val list = new ListBuffer[LocatedFileStatus]
+    while (fsIterator.hasNext) {
+      val file = fsIterator.next()
+      if (file.getPath.getName.endsWith(".orc")) {
+        list += file
+      }
+    }
+    val toRowConverter: InternalRow => Row = {
+      CatalystTypeConverters.createToScalaConverter(orcSchema)
+        .asInstanceOf[InternalRow => Row]
+    }
+    val colId = range(0, orcSchema.size)
+    val fullSchema = orcSchema.map(f =>
+      AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+    val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+    val deserializer = getOrcDeserializer(orcSchema, colId)
+    orcIter = new OrcFileIterator(list)
+    val iterRow = orcIter.map(value =>
+      unsafeProjection(deserializer.deserialize(value)))
+      .map(value => toRowConverter(value))
+    new IterableFetchIterator[Row](iterRow.toIterable)
+  }
+
+  def close(): Unit = {
+    orcIter.close()
+  }
+
+  private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = {
+    try {
+      if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
+        // SPARK-34535 changed the constructor signature of OrcDeserializer
+        DynConstructors.builder()
+          .impl(classOf[OrcDeserializer], classOf[StructType], classOf[Array[Int]])
+          .build[OrcDeserializer]()
+          .newInstance(
+            orcSchema,
+            colId)
+      } else {
+        DynConstructors.builder()
+          .impl(
+            classOf[OrcDeserializer],
+            classOf[StructType],
+            classOf[StructType],
+            classOf[Array[Int]])
+          .build[OrcDeserializer]()
+          .newInstance(
+            new StructType,
+            orcSchema,
+            colId)
+      }
+    } catch {
+      case e: Throwable =>
+        throw new KyuubiException("Failed to create OrcDeserializer", e)
+    }
+  }
+}
+
+class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends Iterator[OrcStruct] {
+
+  private val iters = fileList.map(x => getOrcFileIterator(x))
+
+  var idx = 0
+
+  override def hasNext: Boolean = {
+    if (iters(idx).hasNext) {
+      true
+    } else {
+      iters(idx).close()
+      idx < iters.size - 1
+    }
+  }
+
+  override def next(): OrcStruct = {
+    if (iters(idx).hasNext) {
+      iters(idx).next()
+    } else {
+      idx = idx + 1
+      iters(idx).next()

Review Comment:
   `hasNext()` make sure `idx` < iters.size



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "lsm1 (via GitHub)" <gi...@apache.org>.
lsm1 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1388071558


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala:
##########
@@ -159,6 +160,25 @@ class ExecuteStatement(
         override def iterator: Iterator[Any] = incrementalCollectResult(resultDF)
       })
     } else {
+      val sparkSave = spark.conf.getOption(OPERATION_RESULT_SAVE_TO_FILE.key).map(_.toBoolean)
+        .getOrElse(session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE))
+      lazy val threshold =
+        session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD)
+      if (hasResultSet && sparkSave && shouldSaveResultToHdfs(resultMaxRows, threshold, result)) {
+        val sessionId = session.handle.identifier.toString
+        val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_PATH)
+        val fileName = s"$savePath/$engineId/$sessionId/$statementId"
+        val colName = range(0, result.schema.size).map(x => "col" + x)
+        if (resultMaxRows > 0) {
+          result.toDF(colName: _*).limit(resultMaxRows).write

Review Comment:
   If the result has duplicate columns, we can not write it to file, so we rename all col name to avoid this case
   ```
   spark.sql("select 1 as a,2 as a").write("/filepath")
   
   org.apache.spark.sql.AnalysisException: Found duplicate column(s) when inserting into hdfs
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "wForget (via GitHub)" <gi...@apache.org>.
wForget commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1387347284


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.operation
+
+import scala.Array._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocatedFileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.orc.mapred.OrcStruct
+import org.apache.orc.mapreduce.OrcInputFormat
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.RecordReaderIterator
+import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer
+import org.apache.spark.sql.types.StructType
+
+import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator}
+
+class FetchOrcStatement(spark: SparkSession) {
+  def getIterator(path: String, orcSchema: StructType): FetchIterator[Row] = {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val savePath = new Path(path)
+    val fsIterator = savePath.getFileSystem(conf).listFiles(savePath, false)
+    val list = new ListBuffer[LocatedFileStatus]
+    while (fsIterator.hasNext) {
+      val file = fsIterator.next()
+      if (file.getPath.getName.endsWith(".orc")) {
+        list += file
+      }
+    }
+    val toRowConverter: InternalRow => Row = {
+      CatalystTypeConverters.createToScalaConverter(orcSchema)
+        .asInstanceOf[InternalRow => Row]
+    }
+    val colId = range(0, orcSchema.size)
+    val fullSchema = orcSchema.map(f =>
+      AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+    val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+    val deserializer = getOrcDeserializer(orcSchema, colId)
+    val iter = new OrcFileIterator(list)
+    val iterRow = iter.map(value =>
+      unsafeProjection(deserializer.deserialize(value)))
+      .map(value => toRowConverter(value))
+    new IterableFetchIterator[Row](iterRow.toIterable)
+  }
+
+  private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = {
+    try {
+      val cls = Class.forName("org.apache.spark.sql.execution.datasources.orc.OrcDeserializer")
+      val constructor = cls.getDeclaredConstructors.apply(0)
+      if (constructor.getParameterCount == 3) {
+        constructor.newInstance(new StructType, orcSchema, colId).asInstanceOf[OrcDeserializer]
+      } else {
+        constructor.newInstance(orcSchema, colId).asInstanceOf[OrcDeserializer]
+      }
+    } catch {
+      case e: Throwable =>
+        throw new Exception("Failed to create OrcDeserializer", e)
+    }
+  }
+}
+
+class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends Iterator[OrcStruct] {
+
+  val iters = fileList.map(x => getOrcFileIterator(x))
+
+  var idx = 0
+
+  override def hasNext: Boolean = {
+    if (iters(idx).hasNext) {
+      true
+    } else {
+      iters(idx).close()
+      idx < iters.size - 1
+    }
+  }
+
+  override def next(): OrcStruct = {
+    if (iters(idx).hasNext) {

Review Comment:
   `iters(idx).hasNext)` has been called in the `hasNext` method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "lsm1 (via GitHub)" <gi...@apache.org>.
lsm1 commented on PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#issuecomment-1824278574

   > > we still call `result.collect()` later, if the result is too large,we can not avoid driver OOM
   > 
   > Can we combine incremental collection mode?
   
   When we use incremental collection mode, it may significantly impact performance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418545577


##########
kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala:
##########
@@ -1893,6 +1893,28 @@ object KyuubiConf {
       .intConf
       .createWithDefault(0)
 
+  val OPERATION_RESULT_SAVE_TO_FILE: ConfigEntry[Boolean] =
+    buildConf("kyuubi.operation.result.save.to.file")
+      .doc("The switch for Spark query result save to hdfs file")
+      .version("1.9.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val OPERATION_RESULT_SAVE_TO_FILE_PATH: ConfigEntry[String] =
+    buildConf("kyuubi.operation.result.save.to.file.path")
+      .doc("The hdfs path of Spark query result save")
+      .version("1.9.0")
+      .stringConf
+      .createWithDefault("/tmp/kyuubi/tmp_kyuubi_result")
+
+  val OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD: ConfigEntry[Int] =
+    buildConf("kyuubi.operation.result.save.to.file.threshold")
+      .doc("The threshold of Spark result save to hdfs file, default value is 200 MB")
+      .version("1.9.0")
+      .intConf
+      .checkValue(_ > 0, "must be positive value")
+      .createWithDefault(209715200)

Review Comment:
   possible to support human-readable strings? e.g. 200MiB, 200MB, 200m, 1G



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418550682


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.operation
+
+import scala.Array._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocatedFileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.orc.mapred.OrcStruct
+import org.apache.orc.mapreduce.OrcInputFormat
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.RecordReaderIterator
+import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer
+import org.apache.spark.sql.types.StructType
+
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION
+import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator}
+import org.apache.kyuubi.util.reflect.DynConstructors
+
+class FetchOrcStatement(spark: SparkSession) {
+
+  var orcIter: OrcFileIterator = _
+  def getIterator(path: String, orcSchema: StructType): FetchIterator[Row] = {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val savePath = new Path(path)
+    val fsIterator = savePath.getFileSystem(conf).listFiles(savePath, false)
+    val list = new ListBuffer[LocatedFileStatus]
+    while (fsIterator.hasNext) {
+      val file = fsIterator.next()
+      if (file.getPath.getName.endsWith(".orc")) {
+        list += file
+      }
+    }
+    val toRowConverter: InternalRow => Row = {
+      CatalystTypeConverters.createToScalaConverter(orcSchema)
+        .asInstanceOf[InternalRow => Row]
+    }
+    val colId = range(0, orcSchema.size)
+    val fullSchema = orcSchema.map(f =>
+      AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+    val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+    val deserializer = getOrcDeserializer(orcSchema, colId)
+    orcIter = new OrcFileIterator(list)
+    val iterRow = orcIter.map(value =>
+      unsafeProjection(deserializer.deserialize(value)))
+      .map(value => toRowConverter(value))
+    new IterableFetchIterator[Row](iterRow.toIterable)
+  }
+
+  def close(): Unit = {
+    orcIter.close()
+  }
+
+  private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = {
+    try {
+      if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
+        // https://issues.apache.org/jira/browse/SPARK-34535
+        DynConstructors.builder()
+          .impl(
+            classOf[OrcDeserializer],
+            classOf[StructType],
+            classOf[Array[Int]])
+          .build[OrcDeserializer]()
+          .newInstance(
+            orcSchema,
+            colId)

Review Comment:
   ```suggestion
             .impl(classOf[OrcDeserializer], classOf[StructType], classOf[Array[Int]])
             .build[OrcDeserializer]()
             .newInstance(orcSchema,colId)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418552399


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala:
##########
@@ -159,6 +160,25 @@ class ExecuteStatement(
         override def iterator: Iterator[Any] = incrementalCollectResult(resultDF)
       })
     } else {
+      val sparkSave = spark.conf.getOption(OPERATION_RESULT_SAVE_TO_FILE.key).map(_.toBoolean)
+        .getOrElse(session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE))
+      lazy val threshold =
+        session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD)
+      if (hasResultSet && sparkSave && shouldSaveResultToHdfs(resultMaxRows, threshold, result)) {

Review Comment:
   I prefer to keep it as-is, we need a configuration to disable this feature globally



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418725236


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala:
##########
@@ -158,6 +171,24 @@ class ExecuteStatement(
         override def iterator: Iterator[Any] = incrementalCollectResult(resultDF)
       })
     } else {
+      val sparkSave = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark)
+      lazy val threshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD, spark)
+      if (hasResultSet && sparkSave && shouldSaveResultToHdfs(resultMaxRows, threshold, result)) {
+        val sessionId = session.handle.identifier.toString
+        val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_PATH)
+        saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId")
+        val colName = range(0, result.schema.size).map(x => "col" + x)
+        if (resultMaxRows > 0) {
+          result.toDF(colName: _*).limit(resultMaxRows).write
+            .option("compression", "zstd").format("orc").save(saveFileName.get)

Review Comment:
   I prefer to keep it internally



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query result save to file [kyuubi]

Posted by "cfmcgrady (via GitHub)" <gi...@apache.org>.
cfmcgrady commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1420208385


##########
docs/configuration/settings.md:
##########
@@ -390,6 +390,9 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.operation.result.arrow.timestampAsString  | false                                                                           | When true, arrow-based rowsets will convert columns of type timestamp to strings for transmission.                                                                                                                                                                                                                                                                                                                                                                                                       | boolean  | 1.7.0 |
 | kyuubi.operation.result.format                   | thrift                                                                          | Specify the result format, available configs are: <ul> <li>THRIFT: the result will convert to TRow at the engine driver side. </li> <li>ARROW: the result will be encoded as Arrow at the executor side before collecting by the driver, and deserialized at the client side. note that it only takes effect for kyuubi-hive-jdbc clients now.</li></ul>                                                                                                                                                 | string   | 1.7.0 |
 | kyuubi.operation.result.max.rows                 | 0                                                                               | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit.                                                                                                                                                                                                                                                                                                                                                                    | int      | 1.6.0 |
+| kyuubi.operation.result.saveToFile.dir           | /tmp/kyuubi/tmp_kyuubi_result                                                   | The Spark query result save dir, it should be a public accessible to every engine. Results are saved in ORC format, and the directory structure is `/null/engineId/sessionId/statementId`. Each query result will delete when query finished.                                                                                                                                                                                                                                                            | string   | 1.9.0 |

Review Comment:
   `null`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query result save to file [kyuubi]

Posted by "lsm1 (via GitHub)" <gi...@apache.org>.
lsm1 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1420271398


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala:
##########
@@ -184,6 +186,12 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
       info("Session stopped due to shared level is Connection.")
       stopSession()
     }
+    if (conf.get(OPERATION_RESULT_SAVE_TO_FILE)) {

Review Comment:
   There is no simple way to determine whether the session has executed `ExecuteStatement`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "cxzl25 (via GitHub)" <gi...@apache.org>.
cxzl25 commented on PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#issuecomment-1837993078

   > The output seems to be in order even when outputting multiple files.
   
   When Spark reads datasource, it will be sorted by file length, so there is no guarantee.
   
   org.apache.spark.sql.execution.datasources.v2.FileScan#partitions
   ```scala
         partition.files.flatMap { file =>
           PartitionedFileUtil.splitFiles(
             file = file,
             isSplitable = isSplitable(file.getPath),
             maxSplitBytes = maxSplitBytes,
             partitionValues = partitionValues
           )
         }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
   ```
   
   
   https://github.com/apache/spark/blob/4398bb5d37328e2f3594302d98f98803a379a2e9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala#L146-L160


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "cxzl25 (via GitHub)" <gi...@apache.org>.
cxzl25 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1391203988


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala:
##########
@@ -101,6 +113,10 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
         exec,
         Duration(60, TimeUnit.SECONDS))
     })
+    engineSavePath.foreach { p =>

Review Comment:
   `engineSavePath` can be HDFS as the storage path, so it is not the same as tmp dir.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418544744


##########
kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala:
##########
@@ -1893,6 +1893,28 @@ object KyuubiConf {
       .intConf
       .createWithDefault(0)
 
+  val OPERATION_RESULT_SAVE_TO_FILE: ConfigEntry[Boolean] =
+    buildConf("kyuubi.operation.result.save.to.file")
+      .doc("The switch for Spark query result save to hdfs file")
+      .version("1.9.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val OPERATION_RESULT_SAVE_TO_FILE_PATH: ConfigEntry[String] =
+    buildConf("kyuubi.operation.result.save.to.file.path")
+      .doc("The hdfs path of Spark query result save")

Review Comment:
   can we have some description of:
   
   how does it organize the data under this dir?
   who is responsible for cleaning up the data?
   who is the producer/consumer of the data, and what permission should the folder be guaranteed?
   
   I guess the user may ask the above questions when they explore this feature, would be nice if you can document those information



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418546755


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala:
##########
@@ -278,4 +279,32 @@ object SparkDatasetHelper extends Logging {
     val executionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
     SQLMetrics.postDriverMetricUpdates(sc, executionId, metrics.values.toSeq)
   }
+
+  def shouldSaveResultToHdfs(resultMaxRows: Int, threshold: Int, result: DataFrame): Boolean = {

Review Comment:
   ```suggestion
     def shouldSaveResultToFs(resultMaxRows: Int, minSize: Int, result: DataFrame): Boolean = {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "cfmcgrady (via GitHub)" <gi...@apache.org>.
cfmcgrady commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418723767


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala:
##########
@@ -158,6 +171,24 @@ class ExecuteStatement(
         override def iterator: Iterator[Any] = incrementalCollectResult(resultDF)
       })
     } else {
+      val sparkSave = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark)
+      lazy val threshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD, spark)
+      if (hasResultSet && sparkSave && shouldSaveResultToHdfs(resultMaxRows, threshold, result)) {
+        val sessionId = session.handle.identifier.toString
+        val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_PATH)
+        saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId")
+        val colName = range(0, result.schema.size).map(x => "col" + x)
+        if (resultMaxRows > 0) {
+          result.toDF(colName: _*).limit(resultMaxRows).write
+            .option("compression", "zstd").format("orc").save(saveFileName.get)

Review Comment:
   should we also make the written format configurable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1418556967


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala:
##########
@@ -159,6 +160,25 @@ class ExecuteStatement(
         override def iterator: Iterator[Any] = incrementalCollectResult(resultDF)
       })
     } else {
+      val sparkSave = spark.conf.getOption(OPERATION_RESULT_SAVE_TO_FILE.key).map(_.toBoolean)
+        .getOrElse(session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE))
+      lazy val threshold =
+        session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD)
+      if (hasResultSet && sparkSave && shouldSaveResultToHdfs(resultMaxRows, threshold, result)) {
+        val sessionId = session.handle.identifier.toString
+        val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_PATH)
+        val fileName = s"$savePath/$engineId/$sessionId/$statementId"
+        val colName = range(0, result.schema.size).map(x => "col" + x)
+        if (resultMaxRows > 0) {
+          result.toDF(colName: _*).limit(resultMaxRows).write

Review Comment:
   @lsm1 let's add such information to the comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "cxzl25 (via GitHub)" <gi...@apache.org>.
cxzl25 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1403471225


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala:
##########
@@ -278,4 +279,34 @@ object SparkDatasetHelper extends Logging {
     val executionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
     SQLMetrics.postDriverMetricUpdates(sc, executionId, metrics.values.toSeq)
   }
+
+  def shouldSaveResultToHdfs(resultMaxRows: Int, threshold: Int, result: DataFrame): Boolean = {
+    lazy val limit = result.queryExecution.executedPlan match {
+      case plan if isCommandExec(plan.nodeName) => 0
+      case collectLimit: CollectLimitExec => collectLimit.limit
+      case _ => resultMaxRows
+    }
+    lazy val stats = if (limit > 0) {
+      limit * EstimationUtils.getSizePerRow(
+        result.queryExecution.executedPlan.output)
+    } else {
+      result.queryExecution.optimizedPlan.stats.sizeInBytes
+    }
+    lazy val isSort = result.queryExecution.sparkPlan match {
+      case SortExec(_, global, _, _) if global => true

Review Comment:
   ```suggestion
         case s: SortExec => s.global
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query result save to file [kyuubi]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#issuecomment-1853204403

   @turboFei from the context, I think the implementation already reserves the global order, @cxzl25 could you please clarify it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#issuecomment-1787654415

   ## [Codecov](https://app.codecov.io/gh/apache/kyuubi/pull/5591?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#5591](https://app.codecov.io/gh/apache/kyuubi/pull/5591?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (f8574f9) into [master](https://app.codecov.io/gh/apache/kyuubi/commit/e707bbccc5b411f0c23e76bfa4a1618fdfd886f7?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (e707bbc) will **increase** coverage by `0.29%`.
   > Report is 9 commits behind head on master.
   > The diff coverage is `19.79%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #5591      +/-   ##
   ============================================
   + Coverage     61.43%   61.73%   +0.29%     
     Complexity       23       23              
   ============================================
     Files           601      604       +3     
     Lines         34313    36253    +1940     
     Branches       4499     5022     +523     
   ============================================
   + Hits          21080    22379    +1299     
   - Misses        11098    11447     +349     
   - Partials       2135     2427     +292     
   ```
   
   
   | [Files](https://app.codecov.io/gh/apache/kyuubi/pull/5591?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...in/scala/org/apache/kyuubi/config/KyuubiConf.scala](https://app.codecov.io/gh/apache/kyuubi/pull/5591?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-a3l1dWJpLWNvbW1vbi9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2t5dXViaS9jb25maWcvS3l1dWJpQ29uZi5zY2FsYQ==) | `97.28% <100.00%> (+0.02%)` | :arrow_up: |
   | [.../engine/spark/session/SparkSQLSessionManager.scala](https://app.codecov.io/gh/apache/kyuubi/pull/5591?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-ZXh0ZXJuYWxzL2t5dXViaS1zcGFyay1zcWwtZW5naW5lL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUva3l1dWJpL2VuZ2luZS9zcGFyay9zZXNzaW9uL1NwYXJrU1FMU2Vzc2lvbk1hbmFnZXIuc2NhbGE=) | `74.71% <0.00%> (-4.56%)` | :arrow_down: |
   | [...g/apache/spark/sql/kyuubi/SparkDatasetHelper.scala](https://app.codecov.io/gh/apache/kyuubi/pull/5591?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-ZXh0ZXJuYWxzL2t5dXViaS1zcGFyay1zcWwtZW5naW5lL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvc3Bhcmsvc3FsL2t5dXViaS9TcGFya0RhdGFzZXRIZWxwZXIuc2NhbGE=) | `79.36% <0.00%> (-4.67%)` | :arrow_down: |
   | [...rg/apache/kyuubi/engine/spark/SparkSQLEngine.scala](https://app.codecov.io/gh/apache/kyuubi/pull/5591?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-ZXh0ZXJuYWxzL2t5dXViaS1zcGFyay1zcWwtZW5naW5lL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUva3l1dWJpL2VuZ2luZS9zcGFyay9TcGFya1NRTEVuZ2luZS5zY2FsYQ==) | `61.90% <18.18%> (-2.19%)` | :arrow_down: |
   | [...uubi/engine/spark/operation/ExecuteStatement.scala](https://app.codecov.io/gh/apache/kyuubi/pull/5591?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-ZXh0ZXJuYWxzL2t5dXViaS1zcGFyay1zcWwtZW5naW5lL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUva3l1dWJpL2VuZ2luZS9zcGFyay9vcGVyYXRpb24vRXhlY3V0ZVN0YXRlbWVudC5zY2FsYQ==) | `71.05% <13.33%> (-8.75%)` | :arrow_down: |
   | [...ubi/engine/spark/operation/FetchOrcStatement.scala](https://app.codecov.io/gh/apache/kyuubi/pull/5591?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-ZXh0ZXJuYWxzL2t5dXViaS1zcGFyay1zcWwtZW5naW5lL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUva3l1dWJpL2VuZ2luZS9zcGFyay9vcGVyYXRpb24vRmV0Y2hPcmNTdGF0ZW1lbnQuc2NhbGE=) | `0.00% <0.00%> (ø)` | |
   
   ... and [18 files with indirect coverage changes](https://app.codecov.io/gh/apache/kyuubi/pull/5591/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "cxzl25 (via GitHub)" <gi...@apache.org>.
cxzl25 commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1410153092


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.operation
+
+import scala.Array._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocatedFileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.orc.mapred.OrcStruct
+import org.apache.orc.mapreduce.OrcInputFormat
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.RecordReaderIterator
+import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer
+import org.apache.spark.sql.types.StructType
+
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator}
+
+class FetchOrcStatement(spark: SparkSession) {
+  def getIterator(path: String, orcSchema: StructType): FetchIterator[Row] = {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val savePath = new Path(path)
+    val fsIterator = savePath.getFileSystem(conf).listFiles(savePath, false)
+    val list = new ListBuffer[LocatedFileStatus]
+    while (fsIterator.hasNext) {
+      val file = fsIterator.next()
+      if (file.getPath.getName.endsWith(".orc")) {
+        list += file
+      }
+    }
+    val toRowConverter: InternalRow => Row = {
+      CatalystTypeConverters.createToScalaConverter(orcSchema)
+        .asInstanceOf[InternalRow => Row]
+    }
+    val colId = range(0, orcSchema.size)
+    val fullSchema = orcSchema.map(f =>
+      AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+    val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+    val deserializer = getOrcDeserializer(orcSchema, colId)
+    val iter = new OrcFileIterator(list)
+    val iterRow = iter.map(value =>
+      unsafeProjection(deserializer.deserialize(value)))
+      .map(value => toRowConverter(value))
+    new IterableFetchIterator[Row](iterRow.toIterable)
+  }
+
+  private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = {
+    try {
+      val cls = Class.forName("org.apache.spark.sql.execution.datasources.orc.OrcDeserializer")
+      val constructor = cls.getDeclaredConstructors.apply(0)
+      if (constructor.getParameterCount == 3) {
+        constructor.newInstance(new StructType, orcSchema, colId).asInstanceOf[OrcDeserializer]
+      } else {
+        constructor.newInstance(orcSchema, colId).asInstanceOf[OrcDeserializer]
+      }
+    } catch {
+      case e: Throwable =>
+        throw new KyuubiException("Failed to create OrcDeserializer", e)
+    }
+  }
+}
+
+class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends Iterator[OrcStruct] {
+
+  val iters = fileList.map(x => getOrcFileIterator(x))
+
+  var idx = 0
+
+  override def hasNext: Boolean = {
+    if (iters(idx).hasNext) {
+      true
+    } else {
+      iters(idx).close()

Review Comment:
   `ExecuteStatement` needs to implement the close method to close all readers of iters to prevent the client from not pulling all the results and causing reader leak.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "wForget (via GitHub)" <gi...@apache.org>.
wForget commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1387334211


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala:
##########
@@ -101,6 +113,10 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
         exec,
         Duration(60, TimeUnit.SECONDS))
     })
+    engineSavePath.foreach { p =>

Review Comment:
   Since `engineSavePath` is a temporary directory, why not use `spark tmp dir` directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "wForget (via GitHub)" <gi...@apache.org>.
wForget commented on code in PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#discussion_r1387345294


##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.operation
+
+import scala.Array._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocatedFileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.orc.mapred.OrcStruct
+import org.apache.orc.mapreduce.OrcInputFormat
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.RecordReaderIterator
+import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer
+import org.apache.spark.sql.types.StructType
+
+import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator}
+
+class FetchOrcStatement(spark: SparkSession) {
+  def getIterator(path: String, orcSchema: StructType): FetchIterator[Row] = {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val savePath = new Path(path)
+    val fsIterator = savePath.getFileSystem(conf).listFiles(savePath, false)
+    val list = new ListBuffer[LocatedFileStatus]
+    while (fsIterator.hasNext) {
+      val file = fsIterator.next()
+      if (file.getPath.getName.endsWith(".orc")) {
+        list += file
+      }
+    }
+    val toRowConverter: InternalRow => Row = {
+      CatalystTypeConverters.createToScalaConverter(orcSchema)
+        .asInstanceOf[InternalRow => Row]
+    }
+    val colId = range(0, orcSchema.size)
+    val fullSchema = orcSchema.map(f =>
+      AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+    val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+    val deserializer = getOrcDeserializer(orcSchema, colId)
+    val iter = new OrcFileIterator(list)
+    val iterRow = iter.map(value =>
+      unsafeProjection(deserializer.deserialize(value)))
+      .map(value => toRowConverter(value))
+    new IterableFetchIterator[Row](iterRow.toIterable)
+  }
+
+  private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = {
+    try {
+      val cls = Class.forName("org.apache.spark.sql.execution.datasources.orc.OrcDeserializer")
+      val constructor = cls.getDeclaredConstructors.apply(0)
+      if (constructor.getParameterCount == 3) {
+        constructor.newInstance(new StructType, orcSchema, colId).asInstanceOf[OrcDeserializer]
+      } else {
+        constructor.newInstance(orcSchema, colId).asInstanceOf[OrcDeserializer]
+      }
+    } catch {
+      case e: Throwable =>
+        throw new Exception("Failed to create OrcDeserializer", e)

Review Comment:
   use `KyuubiSQLException`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


Re: [PR] [KYUUBI #5377] Spark engine query results support reading from HDFS [kyuubi]

Posted by "wForget (via GitHub)" <gi...@apache.org>.
wForget commented on PR #5591:
URL: https://github.com/apache/kyuubi/pull/5591#issuecomment-1803001687

   It might be simpler for us to make changes in the executeStatement method, like:
   
   change
   
   ```
   result = spark.sql(statement)
   ```
   
   to
   
   ```
   if (saveResultToPath) {
     spark.sql(statement).write.format(format).save(resultPath)
     result = spark.read.load(resultPath)
   } else {
     result = spark.sql(statement)
   }
   ```
   
   WDYT? cc @pan3793 @cxzl25 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org