You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/02/27 08:00:59 UTC

[kylin] 19/34: KYLIN-5454 Downloading the async query result may cause OOM

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit db624c3e89e5444a3462893e6c86973f3fb72790
Author: fanshu.kong <17...@qq.com>
AuthorDate: Wed Nov 23 15:11:15 2022 +0800

    KYLIN-5454 Downloading the async query result may cause OOM
    
    Co-authored-by: Dorris Zhang <ru...@kyligence.io>
---
 .../org/apache/kylin/rest/request/SQLRequest.java  |   2 +
 .../java/org/apache/kylin/common/QueryContext.java |   1 +
 .../common/exception/code/ErrorCodeServer.java     |   6 +
 .../org/apache/kylin/common/msg/CnMessage.java     |  10 -
 .../java/org/apache/kylin/common/msg/Message.java  |  11 +-
 .../resources/kylin_error_msg_conf_cn.properties   |   6 +-
 .../resources/kylin_error_msg_conf_en.properties   |   8 +
 .../main/resources/kylin_errorcode_conf.properties |   6 +
 .../apache/kylin/query/util/AsyncQueryUtil.java    |   9 +-
 .../rest/controller/NAsyncQueryController.java     |  39 +-
 .../rest/controller/NAsyncQueryControllerV2.java   |  24 +-
 .../rest/controller/NAsyncQueryControllerTest.java |  98 ++--
 .../controller/NAsyncQueryControllerV2Test.java    |  40 +-
 .../kylin/rest/request/AsyncQuerySQLRequestV2.java |   3 +
 .../kylin/rest/service/AsyncQueryService.java      |  83 +---
 .../org/apache/kylin/rest/service/CSVWriter.java   | 120 -----
 .../apache/kylin/rest/service/XLSXExcelWriter.java | 155 -------
 .../kylin/rest/service/AysncQueryServiceTest.java  | 496 ++++++++++++---------
 .../kylin/query/pushdown/SparkSqlClient.scala      |  51 ++-
 .../kylin/query/runtime/plan/ResultPlan.scala      | 214 +++++++--
 20 files changed, 629 insertions(+), 753 deletions(-)

diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
index 597ebd52c6..4bc4ce91d8 100644
--- a/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
+++ b/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
@@ -69,6 +69,8 @@ public class SQLRequest implements Serializable, ProjectInsensitiveRequest, Vali
     @JsonProperty("file_name")
     private String fileName = "result";
     private Integer forcedToTieredStorage;  //0:CH->DFS; 1:CH->pushDown; 2:CH->return error
+    @JsonProperty("include_header")
+    private boolean includeHeader;
 
     private Map<String, String> backdoorToggles;
 
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 551b66d90f..d65396e95b 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -360,6 +360,7 @@ public class QueryContext implements Closeable {
         private String fileName;
         private String separator;
         private boolean isRefused;
+        private boolean includeHeader;
     }
 
     @Getter
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java
index 54f81183e7..a51a2f54ef 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java
@@ -132,6 +132,12 @@ public enum ErrorCodeServer implements ErrorCodeProducer {
     USER_GROUP_NOT_EXIST("KE-010043220"),
     REPEATED_PARAMETER("KE-010043221"),
 
+    // 100313xx async query
+    ASYNC_QUERY_RESULT_NOT_FOUND("KE-010031301"),
+    ASYNC_QUERY_PROJECT_NAME_EMPTY("KE-010031302"),
+    ASYNC_QUERY_TIME_FORMAT_ERROR("KE-010031303"),
+    ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY("KE-010031304"),
+
     // 400272XX resource group
     RESOURCE_GROUP_DISABLE_FAILED("KE-040027201"),
     RESOURCE_GROUP_ENABLE_FAILED("KE-040027202"),
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
index 6e099d683b..1f7923622b 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
@@ -423,16 +423,6 @@ public class CnMessage extends Message {
         return "当前无法清理文件夹。请确保相关 HDFS 文件可以正常访问。";
     }
 
-    @Override
-    public String getAsyncQueryTimeFormatError() {
-        return "无效的时间格式。请按 “yyyy-MM-dd HH:mm:ss” 格式填写。";
-    }
-
-    @Override
-    public String getAsyncQueryProjectNameEmpty() {
-        return "项目名称不能为空。请检查后重试。";
-    }
-
     @Override
     public String getUserNotFound() {
         return "找不到用户 '%s'";
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
index 8f349bd31f..805caa5688 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
@@ -60,8 +60,7 @@ public class Message {
     private static final String LICENSE_MISMATCH_LICENSE = "The license doesn’t match the current cluster information. Please upload a new license, or contact Kyligence.";
     private static final String LICENSE_NOT_EFFECTIVE = "License is not effective yet, please apply for a new license.";
     private static final String LICENSE_EXPIRED = "The license has expired. Please upload a new license, or contact Kyligence.";
-    private static final String DDL_UNSUPPORTED = "Unsupported DDL syntax, only support single `create view`, `drop "
-        + "view`,  `alter view`, `show create table`";
+    private static final String DDL_UNSUPPORTED = "Unsupported DDL syntax, only support single `create view`, `drop view`,  `alter view`, `show create table`";
     private static final String DDL_VIEW_NAME_ERROR = "View names need to start with KE_";
     private static final String DDL_VIEW_NAME_DUPLICATE_ERROR = "Logical View names is duplicate";
     private static final String DDL_DROP_ERROR = "Only support drop view";
@@ -522,14 +521,6 @@ public class Message {
         return "Can’t clean file folder at the moment. Please ensure that the related file on HDFS could be accessed.";
     }
 
-    public String getAsyncQueryTimeFormatError() {
-        return "The time format is invalid. Please enter the date in the format “yyyy-MM-dd HH:mm:ss”.";
-    }
-
-    public String getAsyncQueryProjectNameEmpty() {
-        return "The project name can’t be empty. Please check and try again.";
-    }
-
     public String getUserNotFound() {
         return "User '%s' not found.";
     }
diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
index 2768232b9d..7697a01c8f 100644
--- a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
+++ b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
@@ -134,7 +134,6 @@ KE-010043219=请使用中文、英文、空格命名用户名和公司。
 KE-010043220=找不到用户组 “%s”。请检查后重试。
 KE-010043221=参数 “%s” 已存在。请检查后重试。
 
-
 ## Streaming
 KE-010035202=使用解析器 “%s” 解析Topic “%s” 的消息时发生异常,请检查后重试。
 KE-010035215=无法正确读取 Kafka 认证文件,请检查后再试。
@@ -160,6 +159,11 @@ KE-010042214=Jar文件 “%s” 不存在。
 KE-010042215=解析器 “%s” 已存在。
 KE-010042216=Jar文件 “%s” 已存在。
 
+## 100313xx async query
+KE-010031301=该项目下无法找到该 Query ID 对应的异步查询。请检查后重试。
+KE-010031302=项目名称不能为空。请检查后重试。
+KE-010031303=无效的时间格式。请按 “yyyy-MM-dd HH:mm:ss” 格式填写。
+KE-010031304=在当前版本中,"include header"参数被移至提交异步查询的API,因此您在下载结果中的"include header"参数将不起作用。请参考产品手册以了解更多细节。
 
 # System
 ## 400052XX password
diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
index fc75aa0610..6acc8373fc 100644
--- a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
+++ b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
@@ -156,6 +156,14 @@ KE-010042214=Jar "%s" does not exist.
 KE-010042215=Parser "%s" already exists.
 KE-010042216=Jar "%s" already exists.
 
+## 100313xx async query
+KE-010031301=Can’t find the query by this query ID in this project. Please check and try again.
+KE-010031302=The project name can’t be empty. Please check and try again.
+KE-010031303=The time format is invalid. Please enter the date in the format “yyyy-MM-dd HH:mm:ss”.
+KE-010031304=Notice:Now we move the "include_header" parameter to  Submit Async Query API, so the parameter here doesn't work.Please read user manual for details.
+
+
+### batch 3
 # System
 ## 400052XX password
 KE-040005201=Can't find PASSWORD ENCODER. Please check configuration item kylin.security.user-password-encoder.
diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf.properties b/src/core-common/src/main/resources/kylin_errorcode_conf.properties
index 976e342cc9..8fb380af3f 100644
--- a/src/core-common/src/main/resources/kylin_errorcode_conf.properties
+++ b/src/core-common/src/main/resources/kylin_errorcode_conf.properties
@@ -102,6 +102,12 @@ KE-010032221
 KE-010031201
 KE-010031202
 
+## 100313xx async query
+KE-010031301
+KE-010031302
+KE-010031303
+KE-010031304
+
 ## 100102XX computed column
 KE-010010201
 KE-010010202
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java b/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java
index b7deb09e12..09e05a4e53 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.query.util;
 
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND;
+
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.nio.charset.Charset;
@@ -30,10 +32,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.msg.MsgPicker;
+import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
-import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,7 +80,7 @@ public class AsyncQueryUtil {
                 osw.write(metaString);
             }
         } else {
-            throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound());
+            throw new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND);
         }
     }
 
@@ -96,7 +97,7 @@ public class AsyncQueryUtil {
                 osw.write(separator);
             }
         } else {
-            throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound());
+            throw new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND);
         }
     }
 
diff --git a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
index 30cdb088fe..3be9d9d753 100644
--- a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
+++ b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
@@ -17,9 +17,12 @@
  */
 package org.apache.kylin.rest.controller;
 
-import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED;
 import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON;
 import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON;
+import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_PROJECT_NAME_EMPTY;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_TIME_FORMAT_ERROR;
 
 import java.io.IOException;
 import java.text.ParseException;
@@ -40,16 +43,15 @@ import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.exception.QueryErrorCode;
 import org.apache.kylin.common.msg.Message;
 import org.apache.kylin.common.msg.MsgPicker;
-import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException;
 import org.apache.kylin.query.util.AsyncQueryUtil;
 import org.apache.kylin.rest.exception.ForbiddenException;
+import org.apache.kylin.rest.request.AsyncQuerySQLRequest;
+import org.apache.kylin.rest.response.AsyncQueryResponse;
 import org.apache.kylin.rest.response.EnvelopeResponse;
 import org.apache.kylin.rest.response.SQLResponse;
+import org.apache.kylin.rest.service.AsyncQueryService;
 import org.apache.kylin.rest.service.QueryService;
 import org.apache.kylin.rest.util.AclEvaluate;
-import org.apache.kylin.rest.request.AsyncQuerySQLRequest;
-import org.apache.kylin.rest.response.AsyncQueryResponse;
-import org.apache.kylin.rest.service.AsyncQueryService;
 import org.apache.spark.sql.SparderEnv;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -136,6 +138,7 @@ public class NAsyncQueryController extends NBasicController {
             queryContext.getQueryTagInfo().setFileEncode(encode);
             queryContext.getQueryTagInfo().setFileName(sqlRequest.getFileName());
             queryContext.getQueryTagInfo().setSeparator(sqlRequest.getSeparator());
+            queryContext.getQueryTagInfo().setIncludeHeader(sqlRequest.isIncludeHeader());
             queryContext.setProject(sqlRequest.getProject());
             logger.info("Start a new async query with queryId: {}", queryContext.getQueryId());
             String queryId = queryContext.getQueryId();
@@ -203,8 +206,8 @@ public class NAsyncQueryController extends NBasicController {
                         MsgPicker.getMsg().getCleanFolderFail());
             }
         } catch (ParseException e) {
-            logger.error(MsgPicker.getMsg().getAsyncQueryTimeFormatError(), e);
-            throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryTimeFormatError());
+            logger.error(ASYNC_QUERY_TIME_FORMAT_ERROR.getMsg(), e);
+            throw new KylinException(ASYNC_QUERY_TIME_FORMAT_ERROR);
         }
     }
 
@@ -216,7 +219,7 @@ public class NAsyncQueryController extends NBasicController {
             @RequestParam(value = "project", required = false) String project) throws IOException {
         if (project == null) {
             if (sqlRequest == null) {
-                throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty());
+                throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY);
             }
             project = sqlRequest.getProject();
         }
@@ -242,7 +245,7 @@ public class NAsyncQueryController extends NBasicController {
             throws IOException {
         if (project == null) {
             if (sqlRequest == null) {
-                throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty());
+                throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY);
             }
             project = sqlRequest.getProject();
         }
@@ -283,7 +286,7 @@ public class NAsyncQueryController extends NBasicController {
             @RequestParam(value = "project", required = false) String project) throws IOException {
         if (project == null) {
             if (sqlRequest == null) {
-                throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty());
+                throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY);
             }
             project = sqlRequest.getProject();
         }
@@ -306,7 +309,7 @@ public class NAsyncQueryController extends NBasicController {
             throws IOException {
         if (project == null) {
             if (sqlRequest == null) {
-                throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty());
+                throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY);
             }
             project = sqlRequest.getProject();
         }
@@ -323,16 +326,19 @@ public class NAsyncQueryController extends NBasicController {
     @GetMapping(value = "/async_query/{query_id:.+}/result_download")
     @ResponseBody
     public void downloadQueryResult(@PathVariable("query_id") String queryId,
-            @RequestParam(value = "include_header", required = false, defaultValue = "false") boolean include_header,
-            @RequestParam(value = "includeHeader", required = false, defaultValue = "false") boolean includeHeader,
+            @RequestParam(value = "oldIncludeHeader", required = false) Boolean oldIncludeHeader,
+            @RequestParam(value = "includeHeader", required = false) Boolean includeHeader,
             @Valid @RequestBody(required = false) final AsyncQuerySQLRequest sqlRequest, HttpServletResponse response,
             @RequestParam(value = "project", required = false) String project) throws IOException {
         if (project == null) {
             if (sqlRequest == null) {
-                throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty());
+                throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY);
             }
             project = sqlRequest.getProject();
         }
+        if (oldIncludeHeader != null || includeHeader != null) {
+            throw new KylinException(ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY);
+        }
         aclEvaluate.checkProjectQueryPermission(project);
         checkProjectName(project);
         KylinConfig config = queryService.getConfig();
@@ -356,8 +362,7 @@ public class NAsyncQueryController extends NBasicController {
             response.setContentType("application/" + format + ";charset=" + encode);
         }
         response.setHeader("Content-Disposition", "attachment; filename=\"" + fileName + "." + format + "\"");
-        asyncQueryService.retrieveSavedQueryResult(project, queryId, includeHeader || include_header, response, format,
-                encode, fileInfo.getSeparator());
+        asyncQueryService.retrieveSavedQueryResult(project, queryId, response, format, encode);
     }
 
     @ApiOperation(value = "async query result path", tags = { "QE" })
@@ -368,7 +373,7 @@ public class NAsyncQueryController extends NBasicController {
             @RequestParam(value = "project", required = false) String project) throws IOException {
         if (project == null) {
             if (sqlRequest == null) {
-                throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty());
+                throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY);
             }
             project = sqlRequest.getProject();
         }
diff --git a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2.java b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2.java
index 498f154904..190de73c9b 100644
--- a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2.java
+++ b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2.java
@@ -19,17 +19,21 @@
 package org.apache.kylin.rest.controller;
 
 import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V2_JSON;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND;
 
+import java.io.IOException;
+import java.util.List;
+
+import javax.servlet.http.HttpServletResponse;
+import javax.validation.Valid;
+
+import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.rest.request.AsyncQuerySQLRequest;
 import org.apache.kylin.rest.request.AsyncQuerySQLRequestV2;
 import org.apache.kylin.rest.response.AsyncQueryResponse;
 import org.apache.kylin.rest.response.AsyncQueryResponseV2;
-import org.apache.kylin.rest.service.AsyncQueryService;
-import io.swagger.annotations.ApiOperation;
-import org.apache.kylin.common.exception.KylinException;
-import org.apache.kylin.common.msg.MsgPicker;
-import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException;
 import org.apache.kylin.rest.response.EnvelopeResponse;
+import org.apache.kylin.rest.service.AsyncQueryService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.web.bind.annotation.GetMapping;
@@ -41,10 +45,7 @@ import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.bind.annotation.RestController;
 
-import javax.servlet.http.HttpServletResponse;
-import javax.validation.Valid;
-import java.io.IOException;
-import java.util.List;
+import io.swagger.annotations.ApiOperation;
 
 
 @RestController
@@ -68,6 +69,7 @@ public class NAsyncQueryControllerV2 extends NBasicController {
         sqlRequest.setProject(asyncQuerySQLRequest.getProject());
         sqlRequest.setSql(asyncQuerySQLRequest.getSql());
         sqlRequest.setSeparator(asyncQuerySQLRequest.getSeparator());
+        sqlRequest.setIncludeHeader(asyncQuerySQLRequest.isIncludeHeader());
         sqlRequest.setFormat("csv");
         sqlRequest.setEncode("utf-8");
         sqlRequest.setFileName("result");
@@ -112,7 +114,7 @@ public class NAsyncQueryControllerV2 extends NBasicController {
     @GetMapping(value = "/async_query/{query_id:.+}/result_download")
     @ResponseBody
     public void downloadQueryResult(@PathVariable("query_id") String queryId,
-                                    @RequestParam(value = "includeHeader", required = false, defaultValue = "false") boolean includeHeader,
+                                    @RequestParam(value = "includeHeader", required = false) Boolean includeHeader,
                                     HttpServletResponse response) throws IOException {
         asyncQueryController.downloadQueryResult(queryId, includeHeader, includeHeader, null, response, searchProject(queryId));
     }
@@ -120,7 +122,7 @@ public class NAsyncQueryControllerV2 extends NBasicController {
     private String searchProject(String queryId) throws IOException {
         String project = asyncQueryService.searchQueryResultProject(queryId);
         if (project == null) {
-            throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound());
+            throw new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND);
         }
         return project;
     }
diff --git a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
index 093c5e9c48..65d74a56f0 100644
--- a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
+++ b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
@@ -19,11 +19,14 @@
 package org.apache.kylin.rest.controller;
 
 import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON;
+import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_PROJECT_NAME_EMPTY;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_TIME_FORMAT_ERROR;
 import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.FAILED;
 import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.MISS;
 import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.RUNNING;
 import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.SUCCESS;
-import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED;
 
 import java.io.IOException;
 import java.text.ParseException;
@@ -33,10 +36,13 @@ import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.msg.MsgPicker;
 import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.request.AsyncQuerySQLRequest;
+import org.apache.kylin.rest.response.AsyncQueryResponse;
 import org.apache.kylin.rest.response.EnvelopeResponse;
 import org.apache.kylin.rest.response.SQLResponse;
+import org.apache.kylin.rest.service.AsyncQueryService;
 import org.apache.kylin.rest.service.QueryService;
 import org.apache.kylin.rest.util.AclEvaluate;
 import org.junit.After;
@@ -56,11 +62,6 @@ import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
 import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
 import org.springframework.test.web.servlet.setup.MockMvcBuilders;
 
-import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
-import org.apache.kylin.rest.request.AsyncQuerySQLRequest;
-import org.apache.kylin.rest.response.AsyncQueryResponse;
-import org.apache.kylin.rest.service.AsyncQueryService;
-
 public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
 
     private static final String PROJECT = "default";
@@ -108,6 +109,7 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
         asyncQuerySQLRequest.setProject(PROJECT);
         asyncQuerySQLRequest.setSql("select PART_DT from KYLIN_SALES limit 500");
         asyncQuerySQLRequest.setSeparator(",");
+        asyncQuerySQLRequest.setIncludeHeader(false);
         return asyncQuerySQLRequest;
     }
 
@@ -216,8 +218,8 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
                 .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
                 .andExpect(MockMvcResultMatchers.status().isInternalServerError());
 
-        Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.anyBoolean(),
-                Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.any());
+        Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any(),
+                Mockito.any(), Mockito.any(), Mockito.any());
     }
 
     @Test
@@ -225,8 +227,7 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
         Mockito.doReturn(true).when(asyncQueryService).hasPermission(Mockito.anyString(), Mockito.anyString());
         Mockito.doThrow(new IOException()).when(asyncQueryService).getFileInfo(Mockito.anyString(),
                 Mockito.anyString());
-        Mockito.doThrow(new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound()))
-                .when(asyncQueryService)
+        Mockito.doThrow(new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND)).when(asyncQueryService)
                 .checkStatus(Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString());
 
         mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/result_download", "123")
@@ -238,8 +239,8 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
                             "Can’t find the query by this query ID in this project. Please check and try again."));
                 });
 
-        Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.anyBoolean(),
-                Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.any());
+        Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any(),
+                Mockito.any(), Mockito.any(), Mockito.any());
     }
 
     @Test
@@ -407,11 +408,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
         mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query").param("project", PROJECT)
                 .param("older_than", "2011-11/11 11:11:11")
                 .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))).andExpect(result -> {
-                    Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException);
-                    Assert.assertEquals("KE-020040001",
-                            ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode()
-                                    .getCodeString());
-                    Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryTimeFormatError(),
+                    Assert.assertTrue(result.getResolvedException() instanceof KylinException);
+                    Assert.assertEquals("KE-010031303",
+                            ((KylinException) result.getResolvedException()).getErrorCode().getCodeString());
+                    Assert.assertEquals(ASYNC_QUERY_TIME_FORMAT_ERROR.getMsg(),
                             result.getResolvedException().getMessage());
                 });
     }
@@ -586,8 +586,8 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
                             resolvedException.getMessage());
                 });
 
-        Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.anyBoolean(),
-                Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.any());
+        Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any(),
+                Mockito.any(), Mockito.any(), Mockito.any());
     }
 
     @Test
@@ -603,8 +603,8 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
                 .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
                 .andExpect(MockMvcResultMatchers.status().isOk());
 
-        Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.anyBoolean(),
-                Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.any());
+        Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any(),
+                Mockito.any(), Mockito.any(), Mockito.any());
     }
 
     @Test
@@ -638,11 +638,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
         mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query/{query_id}", "123")
                 .contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
                 .andExpect(result -> {
-                    Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException);
-                    Assert.assertEquals("KE-020040001",
-                            ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode()
-                                    .getCodeString());
-                    Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(),
+                    Assert.assertTrue(result.getResolvedException() instanceof KylinException);
+                    Assert.assertEquals("KE-010031302",
+                            ((KylinException) result.getResolvedException()).getErrorCode().getCodeString());
+                    Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(),
                             result.getResolvedException().getMessage());
                 });
     }
@@ -652,11 +651,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
         mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/status", "123")
                 .contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
                 .andExpect(result -> {
-                    Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException);
-                    Assert.assertEquals("KE-020040001",
-                            ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode()
-                                    .getCodeString());
-                    Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(),
+                    Assert.assertTrue(result.getResolvedException() instanceof KylinException);
+                    Assert.assertEquals("KE-010031302",
+                            ((KylinException) result.getResolvedException()).getErrorCode().getCodeString());
+                    Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(),
                             result.getResolvedException().getMessage());
                 });
     }
@@ -666,11 +664,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
         mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/file_status", "123")
                 .contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
                 .andExpect(result -> {
-                    Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException);
-                    Assert.assertEquals("KE-020040001",
-                            ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode()
-                                    .getCodeString());
-                    Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(),
+                    Assert.assertTrue(result.getResolvedException() instanceof KylinException);
+                    Assert.assertEquals("KE-010031302",
+                            ((KylinException) result.getResolvedException()).getErrorCode().getCodeString());
+                    Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(),
                             result.getResolvedException().getMessage());
                 });
     }
@@ -680,11 +677,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
         mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/metadata", "123")
                 .contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
                 .andExpect(result -> {
-                    Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException);
-                    Assert.assertEquals("KE-020040001",
-                            ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode()
-                                    .getCodeString());
-                    Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(),
+                    Assert.assertTrue(result.getResolvedException() instanceof KylinException);
+                    Assert.assertEquals("KE-010031302",
+                            ((KylinException) result.getResolvedException()).getErrorCode().getCodeString());
+                    Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(),
                             result.getResolvedException().getMessage());
                 });
     }
@@ -694,11 +690,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
         mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/result_download", "123")
                 .contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
                 .andExpect(result -> {
-                    Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException);
-                    Assert.assertEquals("KE-020040001",
-                            ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode()
-                                    .getCodeString());
-                    Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(),
+                    Assert.assertTrue(result.getResolvedException() instanceof KylinException);
+                    Assert.assertEquals("KE-010031302",
+                            ((KylinException) result.getResolvedException()).getErrorCode().getCodeString());
+                    Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(),
                             result.getResolvedException().getMessage());
                 });
     }
@@ -708,11 +703,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
         mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id}/result_path", "123")
                 .contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
                 .andExpect(result -> {
-                    Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException);
-                    Assert.assertEquals("KE-020040001",
-                            ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode()
-                                    .getCodeString());
-                    Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(),
+                    Assert.assertTrue(result.getResolvedException() instanceof KylinException);
+                    Assert.assertEquals("KE-010031302",
+                            ((KylinException) result.getResolvedException()).getErrorCode().getCodeString());
+                    Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(),
                             result.getResolvedException().getMessage());
                 });
     }
diff --git a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java
index e570c0fd3e..d33135cc6f 100644
--- a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java
+++ b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java
@@ -19,22 +19,27 @@
 package org.apache.kylin.rest.controller;
 
 import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V2_JSON;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY;
 import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.FAILED;
 import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.MISS;
 import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.RUNNING;
 import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.SUCCESS;
 
-import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
-import org.apache.kylin.rest.request.AsyncQuerySQLRequestV2;
-import org.apache.kylin.rest.service.AsyncQueryService;
+import java.io.IOException;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.request.AsyncQuerySQLRequestV2;
 import org.apache.kylin.rest.response.SQLResponse;
+import org.apache.kylin.rest.service.AsyncQueryService;
 import org.apache.kylin.rest.service.QueryService;
 import org.apache.kylin.rest.util.AclEvaluate;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InjectMocks;
@@ -50,9 +55,6 @@ import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
 import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
 import org.springframework.test.web.servlet.setup.MockMvcBuilders;
 
-
-import java.io.IOException;
-
 public class NAsyncQueryControllerV2Test extends NLocalFileMetadataTestCase {
 
     private static final String PROJECT = "default";
@@ -80,8 +82,8 @@ public class NAsyncQueryControllerV2Test extends NLocalFileMetadataTestCase {
     public void setup() throws IOException {
         MockitoAnnotations.initMocks(this);
 
-        mockMvc = MockMvcBuilders.standaloneSetup(nAsyncQueryControllerV2).defaultRequest(MockMvcRequestBuilders.get("/"))
-                .build();
+        mockMvc = MockMvcBuilders.standaloneSetup(nAsyncQueryControllerV2)
+                .defaultRequest(MockMvcRequestBuilders.get("/")).build();
 
         SecurityContextHolder.getContext().setAuthentication(authentication);
         createTestMetadata();
@@ -237,7 +239,27 @@ public class NAsyncQueryControllerV2Test extends NLocalFileMetadataTestCase {
                 .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V2_JSON)))
                 .andExpect(MockMvcResultMatchers.status().isOk());
 
-        Mockito.verify(nAsyncQueryControllerV2).downloadQueryResult(Mockito.anyString(), Mockito.anyBoolean(), Mockito.any());
+        Mockito.verify(nAsyncQueryControllerV2).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any());
+    }
+
+    @Test
+    public void testDownloadQueryResultNotIncludeHeader() throws Exception {
+        Mockito.doReturn(true).when(asyncQueryService).hasPermission(Mockito.anyString(), Mockito.anyString());
+        AsyncQueryService.FileInfo fileInfo = new AsyncQueryService.FileInfo("csv", "gbk", "result");
+        Mockito.doReturn(fileInfo).when(asyncQueryService).getFileInfo(Mockito.anyString(), Mockito.anyString());
+        Mockito.doReturn(KylinConfig.getInstanceFromEnv()).when(kapQueryService).getConfig();
+
+        mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/result_download", "123")
+                .param("includeHeader", "false").contentType(MediaType.APPLICATION_JSON)
+                .content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest()))
+                .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V2_JSON)))
+                .andExpect(MockMvcResultMatchers.status().isInternalServerError()).andExpect(result -> {
+                    Assert.assertTrue(result.getResolvedException() instanceof KylinException);
+                    Assert.assertEquals(ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY.getMsg(),
+                            result.getResolvedException().getMessage());
+                });
+
+        Mockito.verify(nAsyncQueryControllerV2).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any());
     }
 
 }
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/request/AsyncQuerySQLRequestV2.java b/src/query-service/src/main/java/org/apache/kylin/rest/request/AsyncQuerySQLRequestV2.java
index 5e16dde7b1..eb8e11bd07 100644
--- a/src/query-service/src/main/java/org/apache/kylin/rest/request/AsyncQuerySQLRequestV2.java
+++ b/src/query-service/src/main/java/org/apache/kylin/rest/request/AsyncQuerySQLRequestV2.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.rest.request;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
@@ -35,5 +36,7 @@ public class AsyncQuerySQLRequestV2 implements Serializable, ProjectInsensitiveR
     private String separator = ",";
     private Integer offset = 0;
     private Integer limit = 0;
+    @JsonProperty("include_header")
+    private boolean includeHeader;
 
 }
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryService.java
index 55faca587a..71f8ca7e9a 100644
--- a/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryService.java
+++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryService.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.rest.service;
 
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND;
 import static org.apache.kylin.query.util.AsyncQueryUtil.getUserFileName;
 import static org.apache.kylin.rest.util.AclPermissionUtil.isAdmin;
 
@@ -47,6 +48,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.msg.Message;
 import org.apache.kylin.common.msg.MsgPicker;
 import org.apache.kylin.metadata.project.NProjectManager;
@@ -54,10 +56,6 @@ import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException;
 import org.apache.kylin.query.util.AsyncQueryUtil;
 import org.apache.kylin.rest.exception.NotFoundException;
-import org.apache.poi.ss.usermodel.Sheet;
-import org.apache.poi.ss.usermodel.Workbook;
-import org.apache.poi.xssf.usermodel.XSSFWorkbook;
-import org.apache.spark.sql.SparderEnv;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.security.core.context.SecurityContextHolder;
@@ -114,8 +112,8 @@ public class AsyncQueryService extends BasicService {
         }
     }
 
-    public void retrieveSavedQueryResult(String project, String queryId, boolean includeHeader,
-            HttpServletResponse response, String fileFormat, String encode, String separator) throws IOException {
+    public void retrieveSavedQueryResult(String project, String queryId, HttpServletResponse response,
+            String fileFormat, String encode) throws IOException {
         checkStatus(queryId, QueryStatus.SUCCESS, project, MsgPicker.getMsg().getQueryResultNotFound());
 
         FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
@@ -126,37 +124,15 @@ public class AsyncQueryService extends BasicService {
         }
 
         try (ServletOutputStream outputStream = response.getOutputStream()) {
-            String columnNames = null;
-            if (includeHeader) {
-                columnNames = processHeader(fileSystem, dataPath);
-                if (columnNames != null) {
-                    logger.debug("Query:{}, columnMeta:{}", columnNames, columnNames);
-                    if (!columnNames.endsWith(IOUtils.LINE_SEPARATOR_UNIX)) {
-                        columnNames = columnNames + IOUtils.LINE_SEPARATOR_UNIX;
-                    }
-                } else {
-                    logger.error("Query:{}, no columnMeta found", queryId);
-                }
-            }
             switch (fileFormat) {
             case "csv":
-                CSVWriter csvWriter = new CSVWriter();
-                processCSV(outputStream, dataPath, includeHeader, columnNames, csvWriter, separator);
-                break;
-            case "json":
-                processJSON(outputStream, dataPath, encode);
-                break;
             case "xlsx":
-                if (!includeHeader) {
-                    processFile(outputStream, dataPath);
-                } else {
-                    XLSXExcelWriter xlsxExcelWriter = new XLSXExcelWriter();
-                    processXLSX(outputStream, dataPath, includeHeader, columnNames, xlsxExcelWriter);
-                }
-                break;
             case "parquet":
                 processFile(outputStream, dataPath);
                 break;
+            case "json":
+                processJSON(outputStream, dataPath, encode);
+                break;
             default:
                 logger.info("Query:{}, processed", queryId);
             }
@@ -281,7 +257,7 @@ public class AsyncQueryService extends BasicService {
     public boolean deleteByQueryId(String project, String queryId) throws IOException {
         Path resultDir = getAsyncQueryResultDir(project, queryId);
         if (queryStatus(project, queryId) == QueryStatus.MISS) {
-            throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound());
+            throw new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND);
         }
         logger.info("clean async query result for query id [{}]", queryId);
         return AsyncQueryUtil.getFileSystem().delete(resultDir, true);
@@ -324,7 +300,7 @@ public class AsyncQueryService extends BasicService {
 
     public String asyncQueryResultPath(String project, String queryId) throws IOException {
         if (queryStatus(project, queryId) == QueryStatus.MISS) {
-            throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound());
+            throw new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND);
         }
         return getAsyncQueryResultDir(project, queryId).toString();
     }
@@ -344,28 +320,6 @@ public class AsyncQueryService extends BasicService {
         return new Path(KapConfig.getInstanceFromEnv().getAsyncResultBaseDir(project), queryId);
     }
 
-    private String processHeader(FileSystem fileSystem, Path dataPath) throws IOException {
-
-        FileStatus[] fileStatuses = fileSystem.listStatus(dataPath);
-        for (FileStatus header : fileStatuses) {
-            if (header.getPath().getName().equals(AsyncQueryUtil.getMetaDataFileName())) {
-                try (FSDataInputStream inputStream = fileSystem.open(header.getPath());
-                        BufferedReader bufferedReader = new BufferedReader(
-                                new InputStreamReader(inputStream, Charset.defaultCharset()))) {
-                    return bufferedReader.readLine();
-                }
-            }
-        }
-        return null;
-    }
-
-    private void processCSV(OutputStream outputStream, Path dataPath, boolean includeHeader, String columnNames,
-            CSVWriter excelWriter, String separator) throws IOException {
-        FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
-        FileStatus[] fileStatuses = fileSystem.listStatus(dataPath);
-        excelWriter.writeData(fileStatuses, outputStream, columnNames, separator, includeHeader);
-    }
-
     private void processJSON(OutputStream outputStream, Path dataPath, String encode) throws IOException {
         FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
         FileStatus[] fileStatuses = fileSystem.listStatus(dataPath);
@@ -395,25 +349,6 @@ public class AsyncQueryService extends BasicService {
         }
     }
 
-    private void processXLSX(OutputStream outputStream, Path dataPath, boolean includeHeader, String columnNames, XLSXExcelWriter excelWriter)
-            throws IOException {
-        FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
-        FileStatus[] fileStatuses = fileSystem.listStatus(dataPath);
-        try (Workbook wb = new XSSFWorkbook()) {
-            Sheet sheet = wb.createSheet("query_result");
-            // Apply column names
-            if (includeHeader && columnNames != null) {
-                org.apache.poi.ss.usermodel.Row excelRow = sheet.createRow(0);
-                String[] columnNameArray = columnNames.split(SparderEnv.getSeparator());
-                for (int i = 0; i < columnNameArray.length; i++) {
-                    excelRow.createCell(i).setCellValue(columnNameArray[i]);
-                }
-            }
-            excelWriter.writeData(fileStatuses, sheet);
-            wb.write(outputStream);
-        }
-    }
-
     public enum QueryStatus {
         RUNNING, FAILED, SUCCESS, MISS
     }
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/CSVWriter.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/CSVWriter.java
deleted file mode 100644
index a17dcf318c..0000000000
--- a/src/query-service/src/main/java/org/apache/kylin/rest/service/CSVWriter.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.rest.service;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.spark.sql.SparderEnv;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.collection.JavaConverters;
-
-public class CSVWriter {
-
-    private static final Logger logger = LoggerFactory.getLogger("query");
-
-    private static final String QUOTE_CHAR = "\"";
-    private static final String END_OF_LINE_SYMBOLS = IOUtils.LINE_SEPARATOR_UNIX;
-
-    public void writeData(FileStatus[] fileStatuses, OutputStream outputStream,
-                          String columnNames, String separator, boolean includeHeaders) throws IOException {
-
-        try (Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) {
-            if (includeHeaders) {
-                writer.write(columnNames.replace(",", separator));
-                writer.flush();
-            }
-            for (FileStatus fileStatus : fileStatuses) {
-                if (!fileStatus.getPath().getName().startsWith("_")) {
-                    if (fileStatus.getPath().getName().endsWith("parquet")) {
-                        writeDataByParquet(fileStatus, writer, separator);
-                    } else {
-                        writeDataByCsv(fileStatus, writer, separator);
-                    }
-                }
-            }
-
-            writer.flush();
-        }
-    }
-
-    public static void writeCsv(Iterator<List<Object>> rows, Writer writer, String separator) {
-        rows.forEachRemaining(row -> {
-            StringBuilder builder = new StringBuilder();
-
-            for (int i = 0; i < row.size(); i++) {
-                Object cell = row.get(i);
-                String column = cell == null ? "" : cell.toString();
-
-                if (i > 0) {
-                    builder.append(separator);
-                }
-
-                final String escapedCsv = encodeCell(column, separator);
-                builder.append(escapedCsv);
-            }
-            builder.append(END_OF_LINE_SYMBOLS); // EOL
-            try {
-                writer.write(builder.toString());
-            } catch (IOException e) {
-                logger.error("Failed to download asyncQueryResult csvExcel by parquet", e);
-            }
-        });
-    }
-
-    private void writeDataByParquet(FileStatus fileStatus, Writer writer, String separator) {
-        List<org.apache.spark.sql.Row> rowList = SparderEnv.getSparkSession().read()
-                .parquet(fileStatus.getPath().toString()).collectAsList();
-        writeCsv(rowList.stream().map(row -> JavaConverters.seqAsJavaList(row.toSeq())).iterator(), writer, separator);
-    }
-
-    // the encode logic is copied from org.supercsv.encoder.DefaultCsvEncoder.encode
-    private static String encodeCell(String cell, String separator) {
-
-        boolean needQuote = cell.contains(separator) || cell.contains("\r") || cell.contains("\n");
-
-        if (cell.contains(QUOTE_CHAR)) {
-            needQuote = true;
-            // escape
-            cell = cell.replace(QUOTE_CHAR, QUOTE_CHAR + QUOTE_CHAR);
-        }
-
-        if (needQuote) {
-            return QUOTE_CHAR + cell + QUOTE_CHAR;
-        } else {
-            return cell;
-        }
-    }
-
-    private void writeDataByCsv(FileStatus fileStatus, Writer writer, String separator) {
-        List<org.apache.spark.sql.Row> rowList = SparderEnv.getSparkSession().read()
-                .csv(fileStatus.getPath().toString()).collectAsList();
-        writeCsv(rowList.stream().map(row -> JavaConverters.seqAsJavaList(row.toSeq())).iterator(), writer, separator);
-    }
-
-}
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/XLSXExcelWriter.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/XLSXExcelWriter.java
deleted file mode 100644
index e54678f375..0000000000
--- a/src/query-service/src/main/java/org/apache/kylin/rest/service/XLSXExcelWriter.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.rest.service;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.query.util.AsyncQueryUtil;
-import org.apache.poi.ss.usermodel.CellType;
-import org.apache.poi.ss.usermodel.Row;
-import org.apache.poi.ss.usermodel.Sheet;
-import org.apache.poi.xssf.usermodel.XSSFCell;
-import org.apache.poi.xssf.usermodel.XSSFRow;
-import org.apache.poi.xssf.usermodel.XSSFSheet;
-import org.apache.poi.xssf.usermodel.XSSFWorkbook;
-import org.apache.spark.sql.SparderEnv;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.clearspring.analytics.util.Lists;
-
-import lombok.val;
-
-public class XLSXExcelWriter {
-
-    private static final Logger logger = LoggerFactory.getLogger("query");
-
-    public void writeData(FileStatus[] fileStatuses, Sheet sheet) {
-        for (FileStatus fileStatus : fileStatuses) {
-            if (!fileStatus.getPath().getName().startsWith("_")) {
-                if (fileStatus.getPath().getName().endsWith("parquet")) {
-                    writeDataByParquet(fileStatus, sheet);
-                } else if (fileStatus.getPath().getName().endsWith("xlsx")) {
-                    writeDataByXlsx(fileStatus, sheet);
-                } else {
-                    writeDataByCsv(fileStatus, sheet);
-                }
-            }
-        }
-    }
-
-    private void writeDataByXlsx(FileStatus f, Sheet sheet) {
-        boolean createTempFileStatus = false;
-        File file = new File("temp.xlsx");
-        try {
-            createTempFileStatus = file.createNewFile();
-            FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
-            fileSystem.copyToLocalFile(f.getPath(), new Path(file.getPath()));
-        } catch (Exception e) {
-            logger.error("Export excel writeDataByXlsx create exception f:{} createTempFileStatus:{} ",
-                    f.getPath(), createTempFileStatus, e);
-        }
-        try (InputStream is = new FileInputStream(file.getAbsolutePath());
-             XSSFWorkbook sheets = new XSSFWorkbook(is)) {
-            final AtomicInteger offset = new AtomicInteger(sheet.getPhysicalNumberOfRows());
-            XSSFSheet sheetAt = sheets.getSheetAt(0);
-            for (int i = 0; i < sheetAt.getPhysicalNumberOfRows(); i++) {
-                XSSFRow row = sheetAt.getRow(i);
-                org.apache.poi.ss.usermodel.Row excelRow = sheet.createRow(offset.get());
-                offset.incrementAndGet();
-                for (int index = 0; index < row.getPhysicalNumberOfCells(); index++) {
-                    XSSFCell cell = row.getCell(index);
-                    excelRow.createCell(index).setCellValue(getString(cell));
-                }
-            }
-            Files.delete(file.toPath());
-        } catch (Exception e) {
-            logger.error("Export excel writeDataByXlsx handler exception f:{} createTempFileStatus:{} ",
-                    f.getPath(), createTempFileStatus, e);
-        }
-    }
-
-    private static String getString(XSSFCell xssfCell) {
-        if (xssfCell == null) {
-            return "";
-        }
-        if (xssfCell.getCellType() == CellType.NUMERIC) {
-            return String.valueOf(xssfCell.getNumericCellValue());
-        } else if (xssfCell.getCellType() == CellType.BOOLEAN) {
-            return String.valueOf(xssfCell.getBooleanCellValue());
-        } else {
-            return xssfCell.getStringCellValue();
-        }
-    }
-
-    private void writeDataByParquet(FileStatus fileStatus, Sheet sheet) {
-        final AtomicInteger offset = new AtomicInteger(sheet.getPhysicalNumberOfRows());
-        List<org.apache.spark.sql.Row> rowList = SparderEnv.getSparkSession().read()
-                .parquet(fileStatus.getPath().toString()).collectAsList();
-        rowList.stream().forEach(row -> {
-            org.apache.poi.ss.usermodel.Row excelRow = sheet.createRow(offset.get());
-            offset.incrementAndGet();
-            val list = row.toSeq().toList();
-            for (int i = 0; i < list.size(); i++) {
-                Object cell = list.apply(i);
-                String column = cell == null ? "" : cell.toString();
-                excelRow.createCell(i).setCellValue(column);
-            }
-        });
-    }
-
-    public void writeDataByCsv(FileStatus fileStatus, Sheet sheet) {
-        FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
-        List<String> rowResults = Lists.newArrayList();
-        List<String[]> results = Lists.newArrayList();
-        final AtomicInteger offset = new AtomicInteger(sheet.getPhysicalNumberOfRows());
-        try (FSDataInputStream inputStream = fileSystem.open(fileStatus.getPath())) {
-            BufferedReader bufferedReader = new BufferedReader(
-                    new InputStreamReader(inputStream, StandardCharsets.UTF_8));
-            rowResults.addAll(Lists.newArrayList(bufferedReader.lines().collect(Collectors.toList())));
-            for (String row : rowResults) {
-                results.add(row.split(SparderEnv.getSeparator()));
-            }
-            for (int i = 0; i < results.size(); i++) {
-                Row row = sheet.createRow(offset.get());
-                offset.incrementAndGet();
-                String[] rowValues = results.get(i);
-                for (int j = 0; j < rowValues.length; j++) {
-                    row.createCell(j).setCellValue(rowValues[j]);
-                }
-            }
-        } catch (IOException e) {
-            logger.error("Failed to download asyncQueryResult xlsxExcel by csv", e);
-        }
-    }
-}
diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/AysncQueryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/AysncQueryServiceTest.java
index 282824ac65..241ffd3d10 100644
--- a/src/query-service/src/test/java/org/apache/kylin/rest/service/AysncQueryServiceTest.java
+++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/AysncQueryServiceTest.java
@@ -37,10 +37,10 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStreamWriter;
-import java.io.StringWriter;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -58,10 +58,14 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.metadata.query.QueryMetricsContext;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
+import org.apache.kylin.query.engine.QueryExec;
 import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException;
 import org.apache.kylin.query.pushdown.SparkSqlClient;
+import org.apache.kylin.query.runtime.plan.ResultPlan;
 import org.apache.kylin.query.util.AsyncQueryUtil;
 import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.poi.ss.usermodel.CellType;
@@ -87,6 +91,7 @@ import org.supercsv.io.ICsvListWriter;
 import org.supercsv.prefs.CsvPreference;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 import lombok.val;
 
@@ -163,38 +168,23 @@ public class AysncQueryServiceTest extends ServiceTestBase {
     }
 
     @Test
-    public void testAsyncQueryWithParquetSpecialCharacters() throws IOException {
+    public void testAsyncQueryAndDownloadCsvResultNotIncludeHeader() throws IOException {
         QueryContext queryContext = QueryContext.current();
         String queryId = queryContext.getQueryId();
         mockMetadata(queryId, true);
         queryContext.getQueryTagInfo().setAsyncQuery(true);
-        queryContext.getQueryTagInfo().setFileFormat("CSV");
+        queryContext.getQueryTagInfo().setFileFormat("csv");
         queryContext.getQueryTagInfo().setFileEncode("utf-8");
-        String sql = "select '\\(123\\)','123'";
-        queryContext.setProject(PROJECT);
+        queryContext.getQueryTagInfo().setSeparator(",");
+        queryContext.getQueryTagInfo().setIncludeHeader(false);
 
-        ss.sqlContext().setConf("spark.sql.parquet.columnNameCheck.enabled", "false");
-        SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
-
-        await().atMost(60000, TimeUnit.MILLISECONDS).until(
-                () -> AsyncQueryService.QueryStatus.SUCCESS.equals(asyncQueryService.queryStatus(PROJECT, queryId)));
-        HttpServletResponse response = mock(HttpServletResponse.class);
-        ServletOutputStream servletOutputStream = mock(ServletOutputStream.class);
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        when(response.getOutputStream()).thenReturn(servletOutputStream);
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                Object[] arguments = invocationOnMock.getArguments();
-                baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]);
-                return null;
-            }
-        }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
+        String sql = "select '123\"','123'";
+        queryContext.setProject(PROJECT);
+        ResultPlan.getResult(ss.sql(sql), null);
+        assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
 
-        SparderEnv.getSparkSession().sqlContext().setConf("spark.sql.parquet.columnNameCheck.enabled", "false");
-        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "csv", encodeDefault, ",");
         List<org.apache.spark.sql.Row> rowList = ss.read()
-                .parquet(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
+                .csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
         List<String> result = Lists.newArrayList();
         rowList.stream().forEach(row -> {
             val list = row.toSeq().toList();
@@ -204,35 +194,35 @@ public class AysncQueryServiceTest extends ServiceTestBase {
                 result.add(column);
             }
         });
-        assertEquals("(123)" + "123", result.get(0) + result.get(1));
+        assertEquals("123\"\"" + "123", result.get(0) + result.get(1));
+
+        // download asyncQuery result
+        HttpServletResponse response = mock(HttpServletResponse.class);
+        ByteArrayOutputStream baos = mockOutputStream(response);
+        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault);
+        Assert.assertEquals("\"123\"\"\",123\n", baos.toString(StandardCharsets.UTF_8.name()));
     }
 
     @Test
-    public void testAsyncQueryDownCsvResultByParquet() throws IOException {
+    public void testAsyncQueryAndDownloadCsvResultIncludeHeader() throws IOException, SQLException {
         QueryContext queryContext = QueryContext.current();
         String queryId = queryContext.getQueryId();
         mockMetadata(queryId, true);
         queryContext.getQueryTagInfo().setAsyncQuery(true);
         queryContext.getQueryTagInfo().setFileFormat("csv");
         queryContext.getQueryTagInfo().setFileEncode("utf-8");
+        queryContext.getQueryTagInfo().setSeparator(",");
+        queryContext.getQueryTagInfo().setIncludeHeader(true);
+
         String sql = "select '123\"','123'";
         queryContext.setProject(PROJECT);
-        SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
+
+        new QueryExec(PROJECT, getTestConfig()).executeQuery(sql);
+
         assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
-        HttpServletResponse response = mock(HttpServletResponse.class);
-        ServletOutputStream servletOutputStream = mock(ServletOutputStream.class);
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        when(response.getOutputStream()).thenReturn(servletOutputStream);
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                Object[] arguments = invocationOnMock.getArguments();
-                baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]);
-                return null;
-            }
-        }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
-        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "csv", encodeDefault, ",");
-        List<org.apache.spark.sql.Row> rowList = ss.read().csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
+
+        List<org.apache.spark.sql.Row> rowList = ss.read()
+                .csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
         List<String> result = Lists.newArrayList();
         rowList.stream().forEach(row -> {
             val list = row.toSeq().toList();
@@ -242,130 +232,146 @@ public class AysncQueryServiceTest extends ServiceTestBase {
                 result.add(column);
             }
         });
-        assertEquals("123\"" + "123", result.get(0) + result.get(1));
+        assertEquals("EXPR$0" + "EXPR$1", result.get(0) + result.get(1));
+        assertEquals("123\"\"" + "123", result.get(2) + result.get(3));
+
+        // download asyncQuery result
+        HttpServletResponse response = mock(HttpServletResponse.class);
+        ByteArrayOutputStream baos = mockOutputStream(response);
+        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault);
+        Assert.assertEquals("EXPR$0,EXPR$1\n\"123\"\"\",123\n", baos.toString(StandardCharsets.UTF_8.name()));
     }
 
     @Test
-    public void testSuccessQueryAndDownloadXlsxWriter() throws IOException {
+    public void testAsyncQueryPushDownAndDownloadCsvResultNotIncludeHeader() throws IOException {
         QueryContext queryContext = QueryContext.current();
         String queryId = queryContext.getQueryId();
         mockMetadata(queryId, true);
         queryContext.getQueryTagInfo().setAsyncQuery(true);
-        queryContext.getQueryTagInfo().setFileFormat("xlsx");
+        queryContext.getQueryTagInfo().setFileFormat("csv");
         queryContext.getQueryTagInfo().setFileEncode("utf-8");
-        String sql = "select '123\"' as col1,'123' as col2";
+        queryContext.getQueryTagInfo().setSeparator(",");
+        queryContext.getQueryTagInfo().setIncludeHeader(false);
+
+        String sql = "select '123\"','123'";
         queryContext.setProject(PROJECT);
+
         SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
         assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
-        HttpServletResponse response = mock(HttpServletResponse.class);
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ServletOutputStream servletOutputStream = mock(ServletOutputStream.class);
-        when(response.getOutputStream()).thenReturn(servletOutputStream);
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                Object[] arguments = invocationOnMock.getArguments();
-                baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]);
-                return null;
+
+        List<org.apache.spark.sql.Row> rowList = ss.read()
+                .csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
+        List<String> result = Lists.newArrayList();
+        rowList.stream().forEach(row -> {
+            val list = row.toSeq().toList();
+            for (int i = 0; i < list.size(); i++) {
+                Object cell = list.apply(i);
+                String column = cell == null ? "" : cell.toString();
+                result.add(column);
             }
-        }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
-        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "xlsx", encodeDefault, ",");
-        FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
-        FileStatus[] fileStatuses = fileSystem.listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()));
-        XLSXExcelWriter xlsxExcelWriter = new XLSXExcelWriter();
-        XSSFWorkbook workbook = new XSSFWorkbook();
-        XSSFSheet sheet = workbook.createSheet();
-        xlsxExcelWriter.writeData(fileStatuses, sheet);
-        XSSFRow row = sheet.getRow(0);
-        assertEquals("123\",123", row.getCell(0) + "," + row.getCell(1));
-        assertEquals("[col1, col2]", QueryContext.current().getColumnNames().toString());
+        });
+        assertEquals("123\"\"" + "123", result.get(0) + result.get(1));
+
+        // download asyncQuery pushDown result
+        HttpServletResponse response = mock(HttpServletResponse.class);
+        ByteArrayOutputStream baos = mockOutputStream(response);
+        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault);
+        Assert.assertEquals("\"123\"\"\",123\n", baos.toString(StandardCharsets.UTF_8.name()));
     }
 
     @Test
-    public void testSuccessQueryAndDownloadCSV() throws IOException {
+    public void testAsyncQueryPushDownAndDownloadCsvResultIncludeHeader() throws IOException {
         QueryContext queryContext = QueryContext.current();
         String queryId = queryContext.getQueryId();
         mockMetadata(queryId, true);
         queryContext.getQueryTagInfo().setAsyncQuery(true);
         queryContext.getQueryTagInfo().setFileFormat("csv");
         queryContext.getQueryTagInfo().setFileEncode("utf-8");
-        String sql = "select '123\"' as col1,'123' as col2";
+        queryContext.getQueryTagInfo().setSeparator(",");
+        queryContext.getQueryTagInfo().setIncludeHeader(true);
+
+        String sql = "select '123\"','123'";
         queryContext.setProject(PROJECT);
+
         SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
         assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
-        HttpServletResponse response = mock(HttpServletResponse.class);
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ServletOutputStream servletOutputStream = mock(ServletOutputStream.class);
-        when(response.getOutputStream()).thenReturn(servletOutputStream);
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                Object[] arguments = invocationOnMock.getArguments();
-                baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]);
-                return null;
+
+        List<org.apache.spark.sql.Row> rowList = ss.read()
+                .csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
+        List<String> result = Lists.newArrayList();
+        rowList.stream().forEach(row -> {
+            val list = row.toSeq().toList();
+            for (int i = 0; i < list.size(); i++) {
+                Object cell = list.apply(i);
+                String column = cell == null ? "" : cell.toString();
+                result.add(column);
             }
-        }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
-        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, true, response, "xlsx", encodeDefault, ",");
-        FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
-        FileStatus[] fileStatuses = fileSystem.listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()));
-        XLSXExcelWriter xlsxExcelWriter = new XLSXExcelWriter();
-        XSSFWorkbook workbook = new XSSFWorkbook();
-        XSSFSheet sheet = workbook.createSheet();
-        xlsxExcelWriter.writeData(fileStatuses, sheet);
-        XSSFRow row = sheet.getRow(0);
-        assertEquals("\"123\\\"\",123", row.getCell(0) + "," + row.getCell(1));
-        assertEquals("[col1, col2]", QueryContext.current().getColumnNames().toString());
+        });
+        assertEquals("123\"" + "123", result.get(0) + result.get(1));
+        assertEquals("123\"\"" + "123", result.get(2) + result.get(3));
+
+        // download asyncQuery pushDown result
+        HttpServletResponse response = mock(HttpServletResponse.class);
+        ByteArrayOutputStream baos = mockOutputStream(response);
+        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault);
+        Assert.assertEquals("123\",123\n\"123\"\"\",123\n", baos.toString(StandardCharsets.UTF_8.name()));
     }
 
     @Test
-    public void testSuccessQueryAndDownloadCSVForDateFormat() throws IOException {
+    public void testAsyncQueryAndDownloadCsvResultSpecialSeparator() throws IOException, SQLException {
+        String separator = "\n";
         QueryContext queryContext = QueryContext.current();
         String queryId = queryContext.getQueryId();
         mockMetadata(queryId, true);
         queryContext.getQueryTagInfo().setAsyncQuery(true);
         queryContext.getQueryTagInfo().setFileFormat("csv");
         queryContext.getQueryTagInfo().setFileEncode("utf-8");
-        String sql = "select '123\"' as col1,'123' as col2, date'2021-02-01' as col3";
+        queryContext.getQueryTagInfo().setSeparator(separator);
+        queryContext.getQueryTagInfo().setIncludeHeader(false);
+
+        String sql = "select '123\"','123'";
         queryContext.setProject(PROJECT);
-        SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
+
+        new QueryExec(PROJECT, getTestConfig()).executeQuery(sql);
+
         assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
-        HttpServletResponse response = mock(HttpServletResponse.class);
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ServletOutputStream servletOutputStream = mock(ServletOutputStream.class);
-        when(response.getOutputStream()).thenReturn(servletOutputStream);
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                Object[] arguments = invocationOnMock.getArguments();
-                baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]);
-                return null;
+
+        List<org.apache.spark.sql.Row> rowList = ss.read()
+                .csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
+        List<String> result = Lists.newArrayList();
+        rowList.stream().forEach(row -> {
+            val list = row.toSeq().toList();
+            for (int i = 0; i < list.size(); i++) {
+                Object cell = list.apply(i);
+                String column = cell == null ? "" : cell.toString();
+                result.add(column);
             }
-        }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
-        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, true, response, "xlsx", encodeDefault, ",");
-        FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
-        FileStatus[] fileStatuses = fileSystem.listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()));
-        XLSXExcelWriter xlsxExcelWriter = new XLSXExcelWriter();
-        XSSFWorkbook workbook = new XSSFWorkbook();
-        XSSFSheet sheet = workbook.createSheet();
-        xlsxExcelWriter.writeData(fileStatuses, sheet);
-        XSSFRow row = sheet.getRow(0);
-        assertEquals("\"123\\\"\",123,2021-02-01", row.getCell(0)
-                + "," + row.getCell(1) + "," + row.getCell(2));
-        assertEquals("[col1, col2, col3]", QueryContext.current().getColumnNames().toString());
+        });
+        assertEquals("123\"\"" + "123", result.get(0) + result.get(1));
+
+        // download asyncQuery result
+        HttpServletResponse response = mock(HttpServletResponse.class);
+        ByteArrayOutputStream baos = mockOutputStream(response);
+        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault);
+        Assert.assertEquals("\"123\"\"\"\n" + "123\n", baos.toString(StandardCharsets.UTF_8.name()));
     }
 
     @Test
-    public void testSuccessQueryAndDownloadCSVNotIncludeHeader() throws IOException {
+    public void testAsyncQueryWithParquetSpecialCharacters() throws IOException {
         QueryContext queryContext = QueryContext.current();
         String queryId = queryContext.getQueryId();
         mockMetadata(queryId, true);
         queryContext.getQueryTagInfo().setAsyncQuery(true);
-        queryContext.getQueryTagInfo().setFileFormat("csv");
+        queryContext.getQueryTagInfo().setFileFormat("CSV");
         queryContext.getQueryTagInfo().setFileEncode("utf-8");
-        String sql = "select '123\"','123'";
+        String sql = "select '\\(123\\)','123'";
         queryContext.setProject(PROJECT);
+
+        ss.sqlContext().setConf("spark.sql.parquet.columnNameCheck.enabled", "false");
         SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
-        assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
+
+        await().atMost(60000, TimeUnit.MILLISECONDS).until(
+                () -> AsyncQueryService.QueryStatus.SUCCESS.equals(asyncQueryService.queryStatus(PROJECT, queryId)));
         HttpServletResponse response = mock(HttpServletResponse.class);
         ServletOutputStream servletOutputStream = mock(ServletOutputStream.class);
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -378,9 +384,11 @@ public class AysncQueryServiceTest extends ServiceTestBase {
                 return null;
             }
         }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
-        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "csv", encodeDefault, "#");
-        List<org.apache.spark.sql.Row> rowList = ss.read().csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
-        Assert.assertEquals("\"123\"\"\"#123\n", baos.toString(StandardCharsets.UTF_8.name()));
+
+        SparderEnv.getSparkSession().sqlContext().setConf("spark.sql.parquet.columnNameCheck.enabled", "false");
+        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault);
+        List<org.apache.spark.sql.Row> rowList = ss.read()
+                .parquet(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
         List<String> result = Lists.newArrayList();
         rowList.stream().forEach(row -> {
             val list = row.toSeq().toList();
@@ -390,18 +398,19 @@ public class AysncQueryServiceTest extends ServiceTestBase {
                 result.add(column);
             }
         });
-        assertEquals("123\"" + "123", result.get(0) + result.get(1));
+        assertEquals("(123)" + "123", result.get(0) + result.get(1));
     }
 
     @Test
-    public void testSuccessQueryAndDownloadJSON() throws IOException {
+    public void testSuccessQueryAndDownloadCSVForDateFormat() throws IOException {
         QueryContext queryContext = QueryContext.current();
         String queryId = queryContext.getQueryId();
         mockMetadata(queryId, true);
         queryContext.getQueryTagInfo().setAsyncQuery(true);
-        queryContext.getQueryTagInfo().setFileFormat("json");
+        queryContext.getQueryTagInfo().setFileFormat("csv");
         queryContext.getQueryTagInfo().setFileEncode("utf-8");
-        String sql = "select '123\"' as col1,'123' as col2";
+        queryContext.getQueryTagInfo().setSeparator(",");
+        String sql = "select '123\"' as col1,'123' as col2, date'2021-02-01' as col3";
         queryContext.setProject(PROJECT);
         SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
         assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
@@ -417,25 +426,15 @@ public class AysncQueryServiceTest extends ServiceTestBase {
                 return null;
             }
         }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
-        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "json", encodeDefault, ",");
-        FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
-        FileStatus[] fileStatuses = fileSystem.listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()));
-        XLSXExcelWriter xlsxExcelWriter = new XLSXExcelWriter();
-        XSSFWorkbook workbook = new XSSFWorkbook();
-        XSSFSheet sheet = workbook.createSheet();
-        xlsxExcelWriter.writeData(fileStatuses, sheet);
-        XSSFRow row = sheet.getRow(0);
-        assertEquals("{\"col1\":\"123\\\"\",\"col2\":\"123\"}", row.getCell(0) + "," + row.getCell(1));
-        assertEquals("[col1, col2]", QueryContext.current().getColumnNames().toString());
     }
 
     @Test
-    public void testSuccessQueryAndDownloadXlsxResultByParquet() throws IOException {
+    public void testSuccessQueryAndDownloadJSON() throws IOException {
         QueryContext queryContext = QueryContext.current();
         String queryId = queryContext.getQueryId();
         mockMetadata(queryId, true);
         queryContext.getQueryTagInfo().setAsyncQuery(true);
-        queryContext.getQueryTagInfo().setFileFormat("xlsx");
+        queryContext.getQueryTagInfo().setFileFormat("json");
         queryContext.getQueryTagInfo().setFileEncode("utf-8");
         String sql = "select '123\"' as col1,'123' as col2";
         queryContext.setProject(PROJECT);
@@ -453,44 +452,63 @@ public class AysncQueryServiceTest extends ServiceTestBase {
                 return null;
             }
         }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
-        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "xlsx", encodeDefault, ",");
-        FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
+    }
+
+    @Test
+    public void testSuccessQueryAndDownloadXlsxResultNotIncludeHeader() throws IOException {
+        QueryContext queryContext = QueryContext.current();
+        String queryId = queryContext.getQueryId();
+        mockMetadata(queryId, true);
+        queryContext.getQueryTagInfo().setAsyncQuery(true);
+        queryContext.getQueryTagInfo().setFileFormat("xlsx");
+        queryContext.getQueryTagInfo().setFileEncode("utf-8");
+        String sql = "select '123\"' as col1,'123' as col2";
+        queryContext.setProject(PROJECT);
+        SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
+        assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
+        HttpServletResponse response = mock(HttpServletResponse.class);
+        ByteArrayOutputStream outputStream = mockOutputStream(response);
+        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "xlsx", encodeDefault);
+
         File file = new File("result.xlsx");
         boolean createTempFileStatus = file.createNewFile();
-        ArrayList<String> list = new ArrayList<>();
-        FileStatus[] fileStatuses = fileSystem.listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()));
-        for (FileStatus f : fileStatuses) {
-            if (!f.getPath().getName().startsWith("_")) {
-                fileSystem.copyToLocalFile(f.getPath(), new Path(file.getPath()));
-                try(InputStream is = new FileInputStream(file.getAbsolutePath());
-                    XSSFWorkbook sheets = new XSSFWorkbook(is)) {
-                    XSSFSheet sheetAt = sheets.getSheetAt(0);
-                    for (int i = 0; i < sheetAt.getPhysicalNumberOfRows(); i++) {
-                        XSSFRow row = sheetAt.getRow(i);
-                        StringBuilder builder = new StringBuilder();
-                        for (int index = 0; index < row.getPhysicalNumberOfCells(); index++) {
-                            XSSFCell cell = row.getCell(index);
-                            if (index > 0) {
-                                builder.append(",");
-                            }
-                            builder.append(getString(cell));
-                        }
-                        list.add(builder.toString());
-                    }
-                }
-            }
-        }
+        List<String> list = getXlsxResult(queryId, file);
         Files.delete(file.toPath());
-        logger.info("Temp File status createTempFileStatus:{}",
-                createTempFileStatus);
+        logger.info("Temp File status createTempFileStatus:{}", createTempFileStatus);
         assertEquals("123\",123", list.get(0));
     }
 
+    @Test
+    public void testSuccessQueryAndDownloadXlsxResultIncludeHeader() throws IOException {
+        QueryContext queryContext = QueryContext.current();
+        String queryId = queryContext.getQueryId();
+        mockMetadata(queryId, true);
+        queryContext.getQueryTagInfo().setAsyncQuery(true);
+        queryContext.getQueryTagInfo().setFileFormat("xlsx");
+        queryContext.getQueryTagInfo().setFileEncode("utf-8");
+        queryContext.getQueryTagInfo().setIncludeHeader(true);
+        String sql = "select '123\"' as col1,'123' as col2";
+        queryContext.setProject(PROJECT);
+        SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
+        assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
+        HttpServletResponse response = mock(HttpServletResponse.class);
+        ByteArrayOutputStream outputStream = mockOutputStream(response);
+        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "xlsx", encodeDefault);
+
+        File file = new File("result.xlsx");
+        boolean createTempFileStatus = file.createNewFile();
+        List<String> list = getXlsxResult(queryId, file);
+        Files.delete(file.toPath());
+        logger.info("Temp File status createTempFileStatus:{}", createTempFileStatus);
+        assertEquals("col1,col2", list.get(0));
+        assertEquals("123\",123", list.get(1));
+    }
+
     private static String getString(XSSFCell xssfCell) {
         if (xssfCell == null) {
             return "";
         }
-        if (xssfCell.getCellType()== CellType.NUMERIC) {
+        if (xssfCell.getCellType() == CellType.NUMERIC) {
             return String.valueOf(xssfCell.getNumericCellValue());
         } else if (xssfCell.getCellType() == CellType.BOOLEAN) {
             return String.valueOf(xssfCell.getBooleanCellValue());
@@ -519,9 +537,9 @@ public class AysncQueryServiceTest extends ServiceTestBase {
             }
         }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
 
-        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, formatDefault, encodeDefault, ",");
+        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, formatDefault, encodeDefault);
 
-        assertEquals("a1,b1,c1\n" + "a2,b2,c2\n", baos.toString(StandardCharsets.UTF_8.name()));
+        assertEquals("a1,b1,c1\r\n" + "a2,b2,c2\r\n", baos.toString(StandardCharsets.UTF_8.name()));
     }
 
     @Test
@@ -542,9 +560,9 @@ public class AysncQueryServiceTest extends ServiceTestBase {
             return null;
         }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
 
-        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, true, response, formatDefault, encodeDefault, ",");
+        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, formatDefault, encodeDefault);
 
-        assertEquals("name,age,city\na1,b1,c1\n" + "a2,b2,c2\n", baos.toString(StandardCharsets.UTF_8.name()));
+        assertEquals("a1,b1,c1\r\n" + "a2,b2,c2\r\n", baos.toString(StandardCharsets.UTF_8.name()));
     }
 
     @Test
@@ -565,9 +583,9 @@ public class AysncQueryServiceTest extends ServiceTestBase {
             return null;
         }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
 
-        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, formatDefault, encodeDefault, ",");
+        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, formatDefault, encodeDefault);
 
-        assertEquals("a1,b1,c1\n" + "a2,b2,c2\n", baos.toString(StandardCharsets.UTF_8.name()));
+        assertEquals("a1,b1,c1\r\n" + "a2,b2,c2\r\n", baos.toString(StandardCharsets.UTF_8.name()));
     }
 
     @Test
@@ -590,35 +608,12 @@ public class AysncQueryServiceTest extends ServiceTestBase {
             }
         }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
 
-        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "json", encodeDefault, ",");
+        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "json", encodeDefault);
 
         assertEquals("[\"{'column1':'a1', 'column2':'b1'}\",\"{'column1':'a2', 'column2':'b2'}\"]",
                 baos.toString(StandardCharsets.UTF_8.name()));
     }
 
-    @Test
-    public void testSuccessQueryAndDownloadXlsxResult() throws IOException, InterruptedException {
-        SQLResponse sqlResponse = mock(SQLResponse.class);
-        when(sqlResponse.isException()).thenReturn(false);
-        String queryId = RandomUtil.randomUUIDStr();
-        mockResultFile(queryId, false, true);
-        assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
-        HttpServletResponse response = mock(HttpServletResponse.class);
-        ServletOutputStream servletOutputStream = mock(ServletOutputStream.class);
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        when(response.getOutputStream()).thenReturn(servletOutputStream);
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                Object[] arguments = invocationOnMock.getArguments();
-                baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]);
-                return null;
-            }
-        }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
-
-        asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "xlsx", encodeDefault, ",");
-    }
-
     @Test
     public void testCleanFolder() throws IOException, InterruptedException {
         String queryId = RandomUtil.randomUUIDStr();
@@ -643,7 +638,7 @@ public class AysncQueryServiceTest extends ServiceTestBase {
         try {
             new Path(asyncQueryService.asyncQueryResultPath(PROJECT, queryId));
         } catch (Exception e) {
-            Assert.assertTrue(e instanceof NAsyncQueryIllegalParamException);
+            Assert.assertTrue(e instanceof KylinException);
             Assert.assertEquals("Can’t find the query by this query ID in this project. Please check and try again.",
                     e.getMessage());
         }
@@ -654,7 +649,7 @@ public class AysncQueryServiceTest extends ServiceTestBase {
         try {
             asyncQueryService.deleteByQueryId(PROJECT, "123");
         } catch (Exception e) {
-            Assert.assertTrue(e instanceof NAsyncQueryIllegalParamException);
+            Assert.assertTrue(e instanceof KylinException);
             Assert.assertEquals("Can’t find the query by this query ID in this project. Please check and try again.",
                     e.getMessage());
         }
@@ -678,7 +673,7 @@ public class AysncQueryServiceTest extends ServiceTestBase {
         try {
             new Path(asyncQueryService.asyncQueryResultPath(PROJECT, queryId));
         } catch (Exception e) {
-            Assert.assertTrue(e instanceof NAsyncQueryIllegalParamException);
+            Assert.assertTrue(e instanceof KylinException);
             Assert.assertEquals("Can’t find the query by this query ID in this project. Please check and try again.",
                     e.getMessage());
         }
@@ -808,14 +803,14 @@ public class AysncQueryServiceTest extends ServiceTestBase {
         try {
             AsyncQueryUtil.saveMetaData(PROJECT, sqlResponse.getColumnMetas(), queryId);
         } catch (Exception e) {
-            Assert.assertTrue(e instanceof NAsyncQueryIllegalParamException);
-            Assert.assertEquals("KE-020040001", ((NAsyncQueryIllegalParamException) e).getErrorCode().getCodeString());
+            Assert.assertTrue(e instanceof KylinException);
+            Assert.assertEquals("KE-010031301", ((KylinException) e).getErrorCode().getCodeString());
         }
         try {
             AsyncQueryUtil.saveFileInfo(PROJECT, formatDefault, encodeDefault, fileNameDefault, queryId, ",");
         } catch (Exception e) {
-            Assert.assertTrue(e instanceof NAsyncQueryIllegalParamException);
-            Assert.assertEquals("KE-020040001", ((NAsyncQueryIllegalParamException) e).getErrorCode().getCodeString());
+            Assert.assertTrue(e instanceof KylinException);
+            Assert.assertEquals("KE-010031301", ((KylinException) e).getErrorCode().getCodeString());
         }
     }
 
@@ -902,7 +897,7 @@ public class AysncQueryServiceTest extends ServiceTestBase {
         Path asyncQueryResultDir = AsyncQueryUtil.getAsyncQueryResultDir(PROJECT, queryId);
         fileSystem.delete(new Path(asyncQueryResultDir, AsyncQueryUtil.getFileInfo()));
         try (FSDataOutputStream os = fileSystem.create(new Path(asyncQueryResultDir, AsyncQueryUtil.getFileInfo()));
-             OutputStreamWriter osw = new OutputStreamWriter(os, Charset.defaultCharset())) {
+                OutputStreamWriter osw = new OutputStreamWriter(os, Charset.defaultCharset())) {
             osw.write(formatDefault + "\n");
             osw.write(encodeDefault + "\n");
             osw.write("foo" + "\n");
@@ -924,6 +919,33 @@ public class AysncQueryServiceTest extends ServiceTestBase {
         assertArrayEquals(dataTypes.toArray(), metaData.get(1).toArray());
     }
 
+    @Test
+    public void testAsyncQueryResultRowCount() throws Exception {
+        overwriteSystemProp("kylin.env", "DEV");
+        QueryContext queryContext = QueryContext.current();
+        String queryId = queryContext.getQueryId();
+        mockMetadata(queryId, true);
+        queryContext.getQueryTagInfo().setAsyncQuery(true);
+        queryContext.getQueryTagInfo().setFileFormat("csv");
+        queryContext.getQueryTagInfo().setFileEncode("utf-8");
+        queryContext.getQueryTagInfo().setSeparator(",");
+        queryContext.getQueryTagInfo().setIncludeHeader(false);
+        queryContext.setAclInfo(new QueryContext.AclInfo("ADMIN", Sets.newHashSet("g1"), true));
+
+        String sql = "select '123\"','123'";
+        queryContext.setProject(PROJECT);
+
+        new QueryExec(PROJECT, getTestConfig()).executeQuery(sql);
+
+        assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
+
+        QueryMetricsContext.start(queryId, "");
+        Assert.assertTrue(QueryMetricsContext.isStarted());
+        QueryMetricsContext metrics = QueryMetricsContext.collect(queryContext);
+        Assert.assertEquals(1, metrics.getResultRowCount());
+        QueryMetricsContext.reset();
+    }
+
     public Path mockResultFile(String queryId, boolean block, boolean needMeta)
             throws IOException, InterruptedException {
 
@@ -939,8 +961,8 @@ public class AysncQueryServiceTest extends ServiceTestBase {
         }
 
         try (FSDataOutputStream os = fileSystem.create(new Path(asyncQueryResultDir, "m00")); //
-             OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8); //
-             ICsvListWriter csvWriter = new CsvListWriter(osw, CsvPreference.STANDARD_PREFERENCE)) {
+                OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8); //
+                ICsvListWriter csvWriter = new CsvListWriter(osw, CsvPreference.STANDARD_PREFERENCE)) {
             csvWriter.write(row1);
             csvWriter.write(row2);
             fileSystem.createNewFile(new Path(asyncQueryResultDir, AsyncQueryUtil.getSuccessFlagFileName()));
@@ -963,7 +985,7 @@ public class AysncQueryServiceTest extends ServiceTestBase {
             fileSystem.mkdirs(asyncQueryResultDir);
         }
         try (FSDataOutputStream os = fileSystem.create(new Path(asyncQueryResultDir, "m00")); //
-             OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) {
+                OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) {
             osw.write(StringEscapeUtils.unescapeJson(row1));
             osw.write(StringEscapeUtils.unescapeJson(row2));
             fileSystem.createNewFile(new Path(asyncQueryResultDir, AsyncQueryUtil.getSuccessFlagFileName()));
@@ -982,7 +1004,7 @@ public class AysncQueryServiceTest extends ServiceTestBase {
         }
         try (FSDataOutputStream os = fileSystem
                 .create(new Path(asyncQueryResultDir, AsyncQueryUtil.getMetaDataFileName())); //
-             OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { //
+                OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { //
             String metaString = String.join(",", columnNames) + "\n" + String.join(",", dataTypes);
             osw.write(metaString);
             if (needMeta) {
@@ -1003,7 +1025,7 @@ public class AysncQueryServiceTest extends ServiceTestBase {
         }
         try (FSDataOutputStream os = fileSystem
                 .create(new Path(asyncQueryResultDir, AsyncQueryUtil.getMetaDataFileName())); //
-             OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { //
+                OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { //
             osw.write(formatDefault);
 
         } catch (IOException e) {
@@ -1019,7 +1041,7 @@ public class AysncQueryServiceTest extends ServiceTestBase {
         }
         try (FSDataOutputStream os = fileSystem
                 .create(new Path(asyncQueryResultDir, AsyncQueryUtil.getMetaDataFileName())); //
-             OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { //
+                OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { //
             osw.write(encodeDefault);
 
         } catch (IOException e) {
@@ -1027,17 +1049,49 @@ public class AysncQueryServiceTest extends ServiceTestBase {
         }
     }
 
-    @Test
-    public void testCsvWriter() throws IOException {
-        List<List<Object>> rows = Lists.newArrayList(
-                Lists.newArrayList(1, 3.12, "foo"),
-                Lists.newArrayList(2, 3.123, "fo<>o"),
-                Lists.newArrayList(3, 3.124, "fo\ro")
-        );
-        String expected = "1<>3.12<>foo\n2<>3.123<>\"fo<>o\"\n3<>3.124<>\"fo\ro\"\n";
-        try (StringWriter sw = new StringWriter()) {
-            CSVWriter.writeCsv(rows.iterator(), sw, "<>");
-            assertEquals(expected, sw.toString());
+    public ByteArrayOutputStream mockOutputStream(HttpServletResponse response) throws IOException {
+
+        ServletOutputStream servletOutputStream = mock(ServletOutputStream.class);
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        when(response.getOutputStream()).thenReturn(servletOutputStream);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                Object[] arguments = invocationOnMock.getArguments();
+                baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]);
+                return null;
+            }
+        }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
+        return baos;
+    }
+
+    public List<String> getXlsxResult(String queryId, File file) throws IOException {
+        FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
+        List<String> list = new ArrayList<>();
+        FileStatus[] fileStatuses = fileSystem
+                .listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()));
+        for (FileStatus f : fileStatuses) {
+            if (f.getPath().getName().startsWith("_")) {
+                continue;
+            }
+            fileSystem.copyToLocalFile(f.getPath(), new Path(file.getPath()));
+            try (InputStream is = new FileInputStream(file.getAbsolutePath());
+                    XSSFWorkbook sheets = new XSSFWorkbook(is)) {
+                XSSFSheet sheetAt = sheets.getSheetAt(0);
+                for (int i = 0; i < sheetAt.getPhysicalNumberOfRows(); i++) {
+                    XSSFRow row = sheetAt.getRow(i);
+                    StringBuilder builder = new StringBuilder();
+                    for (int index = 0; index < row.getPhysicalNumberOfCells(); index++) {
+                        XSSFCell cell = row.getCell(index);
+                        if (index > 0) {
+                            builder.append(",");
+                        }
+                        builder.append(getString(cell));
+                    }
+                    list.add(builder.toString());
+                }
+            }
         }
+        return list;
     }
 }
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
index 5b6d20c715..b5640d7c29 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
@@ -33,10 +33,11 @@ import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.sql.hive.QueryMetricUtils
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils
 import org.apache.spark.sql.util.SparderTypeUtil
-import org.apache.spark.sql.{DataFrame, SparderEnv, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row, SparderEnv, SparkSession}
 import org.slf4j.{Logger, LoggerFactory}
 
 import java.sql.Timestamp
+import java.util
 import java.util.{UUID, List => JList}
 import scala.collection.JavaConverters._
 import scala.collection.{immutable, mutable}
@@ -130,29 +131,8 @@ object SparkSqlClient {
       QueryContext.current().getMetrics.setQueryJobCount(jobCount)
       QueryContext.current().getMetrics.setQueryStageCount(stageCount)
       QueryContext.current().getMetrics.setQueryTaskCount(taskCount)
-      (
-        () => new java.util.Iterator[JList[String]] {
-          /*
-           * After fetching a batch of 1000, checks whether the query thread is interrupted.
-           */
-          val checkInterruptSize = 1000;
-          var readRowSize = 0;
-
-          override def hasNext: Boolean = resultRows.hasNext
-
-          override def next(): JList[String] = {
-            val row = resultRows.next()
-            readRowSize += 1;
-            if (readRowSize % checkInterruptSize == 0) {
-              QueryUtil.checkThreadInterrupted("Interrupted at the stage of collecting result in SparkSqlClient.",
-                "Current step: Collecting dataset of push-down.")
-            }
-            row.toSeq.map(rawValueToString(_)).asJava
-          }
-        },
-        resultSize,
-        fieldList
-      )
+      // return result
+      (readPushDownResultRow(resultRows, true), resultSize, fieldList)
     } catch {
       case e: Throwable =>
         if (e.isInstanceOf[InterruptedException]) {
@@ -169,6 +149,29 @@ object SparkSqlClient {
     }
   }
 
+  def readPushDownResultRow(resultRows: util.Iterator[Row], checkInterrupt: Boolean): java.lang.Iterable[JList[String]] = {
+    () =>
+      new java.util.Iterator[JList[String]] {
+        /*
+         * After fetching a batch of 1000, checks whether the query thread is interrupted.
+         */
+        val checkInterruptSize = 1000;
+        var readRowSize = 0;
+
+        override def hasNext: Boolean = resultRows.hasNext
+
+        override def next(): JList[String] = {
+          val row = resultRows.next()
+          readRowSize += 1;
+          if (checkInterrupt && readRowSize % checkInterruptSize == 0) {
+            QueryUtil.checkThreadInterrupted("Interrupted at the stage of collecting result in SparkSqlClient.",
+              "Current step: Collecting dataset of push-down.")
+          }
+          row.toSeq.map(rawValueToString(_)).asJava
+        }
+      }
+  }
+
   private def rawValueToString(value: Any, wrapped: Boolean = false): String = value match {
     case null => null
     case value: Timestamp => DateFormat.castTimestampToString(value.getTime)
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
index 7ecc408674..8cc3a6ba25 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
@@ -20,7 +20,8 @@ package org.apache.kylin.query.runtime.plan
 
 import com.google.common.cache.{Cache, CacheBuilder}
 import io.kyligence.kap.secondstorage.SecondStorageUtil
-import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.commons.io.IOUtils
 import org.apache.hadoop.fs.Path
 import org.apache.kylin.common.exception.NewQueryRefuseException
 import org.apache.kylin.common.util.{HadoopUtil, RandomUtil}
@@ -30,19 +31,22 @@ import org.apache.kylin.metadata.query.{BigQueryThresholdUpdater, StructField}
 import org.apache.kylin.metadata.state.QueryShareStateManager
 import org.apache.kylin.query.engine.RelColumnMetaDataExtractor
 import org.apache.kylin.query.engine.exec.ExecuteResult
+import org.apache.kylin.query.pushdown.SparkSqlClient.readPushDownResultRow
 import org.apache.kylin.query.relnode.OLAPContext
 import org.apache.kylin.query.util.{AsyncQueryUtil, QueryUtil, SparkJobTrace, SparkQueryJobManager}
-import org.apache.poi.xssf.usermodel.XSSFWorkbook
+import org.apache.poi.xssf.usermodel.{XSSFSheet, XSSFWorkbook}
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive.QueryMetricUtils
 import org.apache.spark.sql.util.SparderTypeUtil
-import org.apache.spark.sql.{DataFrame, SaveMode, SparderEnv}
+import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparderEnv}
 
-import java.io.{File, FileOutputStream}
-import java.util
+import java.io.{File, FileOutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
 import java.util.concurrent.atomic.AtomicLong
+import java.{lang, util}
 import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions.`iterator asScala`
 import scala.collection.mutable
 
 // scalastyle:off
@@ -55,6 +59,10 @@ object ResultPlan extends LogEx {
   val PARTITION_SPLIT_BYTES: Long = KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 1024 // 64MB
   val SPARK_SCHEDULER_POOL: String = "spark.scheduler.pool"
 
+  val QUOTE_CHAR = "\""
+  val END_OF_LINE_SYMBOLS = IOUtils.LINE_SEPARATOR_UNIX
+  val CHECK_WRITE_SIZE = 1000
+
   private def collectInternal(df: DataFrame, rowType: RelDataType): (java.lang.Iterable[util.List[String]], Int) = logTime("collectInternal", debug = true) {
     val jobGroup = Thread.currentThread().getName
     val sparkContext = SparderEnv.getSparkSession.sparkContext
@@ -139,20 +147,7 @@ object ResultPlan extends LogEx {
         s"Is TableIndex: ${QueryContext.current().getQueryTagInfo.isTableIndex}")
 
       val resultTypes = rowType.getFieldList.asScala
-      (() => new util.Iterator[util.List[String]] {
-
-        override def hasNext: Boolean = resultRows.hasNext
-
-        override def next(): util.List[String] = {
-          val row = resultRows.next()
-          if (Thread.interrupted()) {
-            throw new InterruptedException
-          }
-          row.toSeq.zip(resultTypes).map {
-            case (value, relField) => SparderTypeUtil.convertToStringWithCalciteType(value, relField.getType)
-          }.asJava
-        }
-      }, resultSize)
+      (readResultRow(resultRows, resultTypes), resultSize)
     } catch {
       case e: Throwable =>
         if (e.isInstanceOf[InterruptedException]) {
@@ -167,6 +162,25 @@ object ResultPlan extends LogEx {
     }
   }
 
+
+  def readResultRow(resultRows: util.Iterator[Row], resultTypes: mutable.Buffer[RelDataTypeField]): lang.Iterable[util.List[String]] = {
+    () =>
+      new util.Iterator[util.List[String]] {
+
+        override def hasNext: Boolean = resultRows.hasNext
+
+        override def next(): util.List[String] = {
+          val row = resultRows.next()
+          if (Thread.interrupted()) {
+            throw new InterruptedException
+          }
+          row.toSeq.zip(resultTypes).map {
+            case (value, relField) => SparderTypeUtil.convertToStringWithCalciteType(value, relField.getType)
+          }.asJava
+        }
+      }
+  }
+
   private def getNormalizedExplain(df: DataFrame): String = {
     df.queryExecution.executedPlan.toString.replaceAll("#\\d+", "#x")
   }
@@ -282,6 +296,8 @@ object ResultPlan extends LogEx {
     QueryContext.currentTrace().endLastSpan()
     val jobTrace = new SparkJobTrace(jobGroup, QueryContext.currentTrace(), QueryContext.current().getQueryId, sparkContext)
     val dateTimeFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
+    val queryId = QueryContext.current().getQueryId
+    val includeHeader = QueryContext.current().getQueryTagInfo.isIncludeHeader
     format match {
       case "json" =>
         val oldColumnNames = df.columns
@@ -302,31 +318,8 @@ object ResultPlan extends LogEx {
           normalizeSchema(df).write.mode(SaveMode.Overwrite).option("encoding", encode).option("charset", "utf-8").parquet(path)
         }
         sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "false")
-      case "csv" =>
-        df.write
-          .option("timestampFormat", dateTimeFormat)
-          .option("encoding", encode)
-          .option("dateFormat", "yyyy-MM-dd")
-          .option("charset", "utf-8").mode(SaveMode.Append).csv(path)
-      case "xlsx" => {
-        val queryId = QueryContext.current().getQueryId
-        val file = new File(queryId + ".xlsx")
-        file.createNewFile();
-        val outputStream = new FileOutputStream(file)
-        val workbook = new XSSFWorkbook
-        val sheet = workbook.createSheet("query_result");
-        var num = 0
-        df.collect().foreach(row => {
-          val row1 = sheet.createRow(num)
-          for (i <- 0 until row.length) {
-            row1.createCell(i).setCellValue(row.apply(i).toString)
-          }
-          num = num + 1
-        })
-        workbook.write(outputStream)
-        HadoopUtil.getWorkingFileSystem
-          .copyFromLocalFile(true, true, new Path(file.getPath), new Path(path + "/" + queryId + ".xlsx"))
-      }
+      case "csv" => processCsv(df, format, rowType, path, queryId, includeHeader)
+      case "xlsx" => processXlsx(df, format, rowType, path, queryId, includeHeader)
       case _ =>
         normalizeSchema(df).write.option("timestampFormat", dateTimeFormat).option("encoding", encode)
           .option("charset", "utf-8").mode(SaveMode.Append).parquet(path)
@@ -345,11 +338,142 @@ object ResultPlan extends LogEx {
       QueryContext.current().getMetrics.setQueryJobCount(jobCount)
       QueryContext.current().getMetrics.setQueryStageCount(stageCount)
       QueryContext.current().getMetrics.setQueryTaskCount(taskCount)
-      QueryContext.current().getMetrics.setResultRowCount(newExecution.executedPlan.metrics.get("numOutputRows")
+      setResultRowCount(newExecution.executedPlan)
+    }
+  }
+
+  def setResultRowCount(plan: SparkPlan): Unit = {
+    if (QueryContext.current().getMetrics.getResultRowCount == 0) {
+      QueryContext.current().getMetrics.setResultRowCount(plan.metrics.get("numOutputRows")
         .map(_.value).getOrElse(0))
     }
   }
 
+  def processCsv(df: DataFrame, format: String, rowType: RelDataType, path: String, queryId: String, includeHeader: Boolean) = {
+    val file = createTmpFile(queryId, format)
+    val writer = new OutputStreamWriter(new FileOutputStream(file), StandardCharsets.UTF_8)
+    if (includeHeader) processCsvHeader(writer, rowType)
+    val (iterator, resultRowSize) = df.toIterator()
+    asyncQueryIteratorWriteCsv(iterator, writer, rowType)
+    uploadAsyncQueryResult(file, path, queryId, format)
+    setResultRowCount(resultRowSize)
+  }
+
+  def processXlsx(df: DataFrame, format: String, rowType: RelDataType, path: String, queryId: String, includeHeader: Boolean) = {
+    val file = createTmpFile(queryId, format)
+    val outputStream = new FileOutputStream(file)
+    val workbook = new XSSFWorkbook
+    val sheet = workbook.createSheet("query_result")
+    var num = 0
+    if (includeHeader) {
+      processXlsxHeader(sheet, rowType)
+      num += 1
+    }
+    val (iterator, resultRowSize) = df.toIterator()
+    iterator.foreach(row => {
+      val row1 = sheet.createRow(num)
+      row.toSeq.zipWithIndex.foreach(it => row1.createCell(it._2).setCellValue(it._1.toString))
+      num += 1
+    })
+    workbook.write(outputStream)
+    uploadAsyncQueryResult(file, path, queryId, format)
+    setResultRowCount(resultRowSize)
+  }
+
+  private def setResultRowCount(resultRowSize: Int) = {
+    if (!KylinConfig.getInstanceFromEnv.isUTEnv) {
+      QueryContext.current().getMetrics.setResultRowCount(resultRowSize)
+    }
+  }
+
+  def processCsvHeader(writer: OutputStreamWriter, rowType: RelDataType): Unit = {
+    val separator = QueryContext.current().getQueryTagInfo.getSeparator
+    rowType match {
+      case null =>
+        val columnNames = QueryContext.current().getColumnNames.asScala.mkString(separator)
+        writer.write(columnNames + END_OF_LINE_SYMBOLS)
+      case _ =>
+        val builder = new StringBuilder
+        rowType.getFieldList.asScala.map(t => t.getName).foreach(column => builder.append(separator + column))
+        builder.deleteCharAt(0)
+        writer.write(builder.toString() + END_OF_LINE_SYMBOLS)
+    }
+    writer.flush()
+  }
+
+  def processXlsxHeader(sheet: XSSFSheet, rowType: RelDataType): Unit = {
+    val excelRow = sheet.createRow(0)
+
+    rowType match {
+      case null =>
+        val columnNameArray = QueryContext.current().getColumnNames
+        columnNameArray.asScala.zipWithIndex
+          .foreach(it => excelRow.createCell(it._2).setCellValue(it._1))
+      case _ =>
+        val columnArray = rowType.getFieldList.asScala.map(t => t.getName)
+        columnArray.zipWithIndex.foreach(it => excelRow.createCell(it._2).setCellValue(it._1))
+    }
+  }
+
+  def createTmpFile(queryId: String, format: String): File = {
+    val file = new File(queryId + format)
+    file.createNewFile()
+    file
+  }
+
+  def uploadAsyncQueryResult(file: File, path: String, queryId: String, format: String): Unit = {
+    HadoopUtil.getWorkingFileSystem
+      .copyFromLocalFile(true, true, new Path(file.getPath), new Path(path + "/" + queryId + "." + format))
+    if (file.exists()) file.delete()
+  }
+
+  def asyncQueryIteratorWriteCsv(resultRows: util.Iterator[Row], outputStream: OutputStreamWriter, rowType: RelDataType): Unit = {
+    var asyncQueryRowSize = 0
+    val separator = QueryContext.current().getQueryTagInfo.getSeparator
+    val asyncQueryResult = if (rowType != null) {
+      val resultTypes = rowType.getFieldList.asScala
+      readResultRow(resultRows, resultTypes)
+    } else {
+      readPushDownResultRow(resultRows, false)
+    }
+
+    asyncQueryResult.forEach(row => {
+
+      asyncQueryRowSize += 1
+      val builder = new StringBuilder
+
+      for (i <- 0 until row.size()) {
+        val column = if (row.get(i) == null) "" else row.get(i)
+
+        if (i > 0) builder.append(separator)
+
+        val escapedCsv = encodeCell(column, separator)
+        builder.append(escapedCsv)
+      }
+      builder.append(END_OF_LINE_SYMBOLS)
+      outputStream.write(builder.toString())
+      if (asyncQueryRowSize % CHECK_WRITE_SIZE == 0) {
+        outputStream.flush()
+      }
+    })
+    outputStream.flush()
+  }
+
+  // the encode logic is copied from org.supercsv.encoder.DefaultCsvEncoder.encode
+  def encodeCell(column1: String, separator: String): String = {
+
+    var column = column1
+    var needQuote = column.contains(separator) || column.contains("\r") || column.contains("\n")
+
+    if (column.contains(QUOTE_CHAR)) {
+      needQuote = true
+      column = column.replace(QUOTE_CHAR, QUOTE_CHAR + QUOTE_CHAR)
+    }
+
+    if (needQuote) QUOTE_CHAR + column + QUOTE_CHAR
+    else column
+  }
+
   /**
    * Normalize column name by replacing invalid characters with underscore
    * and strips accents