You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/02/27 08:00:59 UTC
[kylin] 19/34: KYLIN-5454 Downloading the async query result may cause OOM
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit db624c3e89e5444a3462893e6c86973f3fb72790
Author: fanshu.kong <17...@qq.com>
AuthorDate: Wed Nov 23 15:11:15 2022 +0800
KYLIN-5454 Downloading the async query result may cause OOM
Co-authored-by: Dorris Zhang <ru...@kyligence.io>
---
.../org/apache/kylin/rest/request/SQLRequest.java | 2 +
.../java/org/apache/kylin/common/QueryContext.java | 1 +
.../common/exception/code/ErrorCodeServer.java | 6 +
.../org/apache/kylin/common/msg/CnMessage.java | 10 -
.../java/org/apache/kylin/common/msg/Message.java | 11 +-
.../resources/kylin_error_msg_conf_cn.properties | 6 +-
.../resources/kylin_error_msg_conf_en.properties | 8 +
.../main/resources/kylin_errorcode_conf.properties | 6 +
.../apache/kylin/query/util/AsyncQueryUtil.java | 9 +-
.../rest/controller/NAsyncQueryController.java | 39 +-
.../rest/controller/NAsyncQueryControllerV2.java | 24 +-
.../rest/controller/NAsyncQueryControllerTest.java | 98 ++--
.../controller/NAsyncQueryControllerV2Test.java | 40 +-
.../kylin/rest/request/AsyncQuerySQLRequestV2.java | 3 +
.../kylin/rest/service/AsyncQueryService.java | 83 +---
.../org/apache/kylin/rest/service/CSVWriter.java | 120 -----
.../apache/kylin/rest/service/XLSXExcelWriter.java | 155 -------
.../kylin/rest/service/AysncQueryServiceTest.java | 496 ++++++++++++---------
.../kylin/query/pushdown/SparkSqlClient.scala | 51 ++-
.../kylin/query/runtime/plan/ResultPlan.scala | 214 +++++++--
20 files changed, 629 insertions(+), 753 deletions(-)
diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
index 597ebd52c6..4bc4ce91d8 100644
--- a/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
+++ b/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
@@ -69,6 +69,8 @@ public class SQLRequest implements Serializable, ProjectInsensitiveRequest, Vali
@JsonProperty("file_name")
private String fileName = "result";
private Integer forcedToTieredStorage; //0:CH->DFS; 1:CH->pushDown; 2:CH->return error
+ @JsonProperty("include_header")
+ private boolean includeHeader;
private Map<String, String> backdoorToggles;
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 551b66d90f..d65396e95b 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -360,6 +360,7 @@ public class QueryContext implements Closeable {
private String fileName;
private String separator;
private boolean isRefused;
+ private boolean includeHeader;
}
@Getter
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java
index 54f81183e7..a51a2f54ef 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java
@@ -132,6 +132,12 @@ public enum ErrorCodeServer implements ErrorCodeProducer {
USER_GROUP_NOT_EXIST("KE-010043220"),
REPEATED_PARAMETER("KE-010043221"),
+ // 100313xx async query
+ ASYNC_QUERY_RESULT_NOT_FOUND("KE-010031301"),
+ ASYNC_QUERY_PROJECT_NAME_EMPTY("KE-010031302"),
+ ASYNC_QUERY_TIME_FORMAT_ERROR("KE-010031303"),
+ ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY("KE-010031304"),
+
// 400272XX resource group
RESOURCE_GROUP_DISABLE_FAILED("KE-040027201"),
RESOURCE_GROUP_ENABLE_FAILED("KE-040027202"),
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
index 6e099d683b..1f7923622b 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
@@ -423,16 +423,6 @@ public class CnMessage extends Message {
return "当前无法清理文件夹。请确保相关 HDFS 文件可以正常访问。";
}
- @Override
- public String getAsyncQueryTimeFormatError() {
- return "无效的时间格式。请按 “yyyy-MM-dd HH:mm:ss” 格式填写。";
- }
-
- @Override
- public String getAsyncQueryProjectNameEmpty() {
- return "项目名称不能为空。请检查后重试。";
- }
-
@Override
public String getUserNotFound() {
return "找不到用户 '%s'";
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
index 8f349bd31f..805caa5688 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
@@ -60,8 +60,7 @@ public class Message {
private static final String LICENSE_MISMATCH_LICENSE = "The license doesn’t match the current cluster information. Please upload a new license, or contact Kyligence.";
private static final String LICENSE_NOT_EFFECTIVE = "License is not effective yet, please apply for a new license.";
private static final String LICENSE_EXPIRED = "The license has expired. Please upload a new license, or contact Kyligence.";
- private static final String DDL_UNSUPPORTED = "Unsupported DDL syntax, only support single `create view`, `drop "
- + "view`, `alter view`, `show create table`";
+ private static final String DDL_UNSUPPORTED = "Unsupported DDL syntax, only support single `create view`, `drop view`, `alter view`, `show create table`";
private static final String DDL_VIEW_NAME_ERROR = "View names need to start with KE_";
private static final String DDL_VIEW_NAME_DUPLICATE_ERROR = "Logical View names is duplicate";
private static final String DDL_DROP_ERROR = "Only support drop view";
@@ -522,14 +521,6 @@ public class Message {
return "Can’t clean file folder at the moment. Please ensure that the related file on HDFS could be accessed.";
}
- public String getAsyncQueryTimeFormatError() {
- return "The time format is invalid. Please enter the date in the format “yyyy-MM-dd HH:mm:ss”.";
- }
-
- public String getAsyncQueryProjectNameEmpty() {
- return "The project name can’t be empty. Please check and try again.";
- }
-
public String getUserNotFound() {
return "User '%s' not found.";
}
diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
index 2768232b9d..7697a01c8f 100644
--- a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
+++ b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
@@ -134,7 +134,6 @@ KE-010043219=请使用中文、英文、空格命名用户名和公司。
KE-010043220=找不到用户组 “%s”。请检查后重试。
KE-010043221=参数 “%s” 已存在。请检查后重试。
-
## Streaming
KE-010035202=使用解析器 “%s” 解析Topic “%s” 的消息时发生异常,请检查后重试。
KE-010035215=无法正确读取 Kafka 认证文件,请检查后再试。
@@ -160,6 +159,11 @@ KE-010042214=Jar文件 “%s” 不存在。
KE-010042215=解析器 “%s” 已存在。
KE-010042216=Jar文件 “%s” 已存在。
+## 100313xx async query
+KE-010031301=该项目下无法找到该 Query ID 对应的异步查询。请检查后重试。
+KE-010031302=项目名称不能为空。请检查后重试。
+KE-010031303=无效的时间格式。请按 “yyyy-MM-dd HH:mm:ss” 格式填写。
+KE-010031304=在当前版本中,"include header"参数被移至提交异步查询的API,因此您在下载结果中的"include header"参数将不起作用。请参考产品手册以了解更多细节。
# System
## 400052XX password
diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
index fc75aa0610..6acc8373fc 100644
--- a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
+++ b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
@@ -156,6 +156,14 @@ KE-010042214=Jar "%s" does not exist.
KE-010042215=Parser "%s" already exists.
KE-010042216=Jar "%s" already exists.
+## 100313xx async query
+KE-010031301=Can’t find the query by this query ID in this project. Please check and try again.
+KE-010031302=The project name can’t be empty. Please check and try again.
+KE-010031303=The time format is invalid. Please enter the date in the format “yyyy-MM-dd HH:mm:ss”.
+KE-010031304=Notice:Now we move the "include_header" parameter to Submit Async Query API, so the parameter here doesn't work.Please read user manual for details.
+
+
+### batch 3
# System
## 400052XX password
KE-040005201=Can't find PASSWORD ENCODER. Please check configuration item kylin.security.user-password-encoder.
diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf.properties b/src/core-common/src/main/resources/kylin_errorcode_conf.properties
index 976e342cc9..8fb380af3f 100644
--- a/src/core-common/src/main/resources/kylin_errorcode_conf.properties
+++ b/src/core-common/src/main/resources/kylin_errorcode_conf.properties
@@ -102,6 +102,12 @@ KE-010032221
KE-010031201
KE-010031202
+## 100313xx async query
+KE-010031301
+KE-010031302
+KE-010031303
+KE-010031304
+
## 100102XX computed column
KE-010010201
KE-010010202
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java b/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java
index b7deb09e12..09e05a4e53 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java
@@ -18,6 +18,8 @@
package org.apache.kylin.query.util;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND;
+
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
@@ -30,10 +32,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.msg.MsgPicker;
+import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
-import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,7 +80,7 @@ public class AsyncQueryUtil {
osw.write(metaString);
}
} else {
- throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound());
+ throw new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND);
}
}
@@ -96,7 +97,7 @@ public class AsyncQueryUtil {
osw.write(separator);
}
} else {
- throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound());
+ throw new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND);
}
}
diff --git a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
index 30cdb088fe..3be9d9d753 100644
--- a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
+++ b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
@@ -17,9 +17,12 @@
*/
package org.apache.kylin.rest.controller;
-import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED;
import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON;
import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON;
+import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_PROJECT_NAME_EMPTY;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_TIME_FORMAT_ERROR;
import java.io.IOException;
import java.text.ParseException;
@@ -40,16 +43,15 @@ import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.QueryErrorCode;
import org.apache.kylin.common.msg.Message;
import org.apache.kylin.common.msg.MsgPicker;
-import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException;
import org.apache.kylin.query.util.AsyncQueryUtil;
import org.apache.kylin.rest.exception.ForbiddenException;
+import org.apache.kylin.rest.request.AsyncQuerySQLRequest;
+import org.apache.kylin.rest.response.AsyncQueryResponse;
import org.apache.kylin.rest.response.EnvelopeResponse;
import org.apache.kylin.rest.response.SQLResponse;
+import org.apache.kylin.rest.service.AsyncQueryService;
import org.apache.kylin.rest.service.QueryService;
import org.apache.kylin.rest.util.AclEvaluate;
-import org.apache.kylin.rest.request.AsyncQuerySQLRequest;
-import org.apache.kylin.rest.response.AsyncQueryResponse;
-import org.apache.kylin.rest.service.AsyncQueryService;
import org.apache.spark.sql.SparderEnv;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -136,6 +138,7 @@ public class NAsyncQueryController extends NBasicController {
queryContext.getQueryTagInfo().setFileEncode(encode);
queryContext.getQueryTagInfo().setFileName(sqlRequest.getFileName());
queryContext.getQueryTagInfo().setSeparator(sqlRequest.getSeparator());
+ queryContext.getQueryTagInfo().setIncludeHeader(sqlRequest.isIncludeHeader());
queryContext.setProject(sqlRequest.getProject());
logger.info("Start a new async query with queryId: {}", queryContext.getQueryId());
String queryId = queryContext.getQueryId();
@@ -203,8 +206,8 @@ public class NAsyncQueryController extends NBasicController {
MsgPicker.getMsg().getCleanFolderFail());
}
} catch (ParseException e) {
- logger.error(MsgPicker.getMsg().getAsyncQueryTimeFormatError(), e);
- throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryTimeFormatError());
+ logger.error(ASYNC_QUERY_TIME_FORMAT_ERROR.getMsg(), e);
+ throw new KylinException(ASYNC_QUERY_TIME_FORMAT_ERROR);
}
}
@@ -216,7 +219,7 @@ public class NAsyncQueryController extends NBasicController {
@RequestParam(value = "project", required = false) String project) throws IOException {
if (project == null) {
if (sqlRequest == null) {
- throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty());
+ throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY);
}
project = sqlRequest.getProject();
}
@@ -242,7 +245,7 @@ public class NAsyncQueryController extends NBasicController {
throws IOException {
if (project == null) {
if (sqlRequest == null) {
- throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty());
+ throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY);
}
project = sqlRequest.getProject();
}
@@ -283,7 +286,7 @@ public class NAsyncQueryController extends NBasicController {
@RequestParam(value = "project", required = false) String project) throws IOException {
if (project == null) {
if (sqlRequest == null) {
- throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty());
+ throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY);
}
project = sqlRequest.getProject();
}
@@ -306,7 +309,7 @@ public class NAsyncQueryController extends NBasicController {
throws IOException {
if (project == null) {
if (sqlRequest == null) {
- throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty());
+ throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY);
}
project = sqlRequest.getProject();
}
@@ -323,16 +326,19 @@ public class NAsyncQueryController extends NBasicController {
@GetMapping(value = "/async_query/{query_id:.+}/result_download")
@ResponseBody
public void downloadQueryResult(@PathVariable("query_id") String queryId,
- @RequestParam(value = "include_header", required = false, defaultValue = "false") boolean include_header,
- @RequestParam(value = "includeHeader", required = false, defaultValue = "false") boolean includeHeader,
+ @RequestParam(value = "oldIncludeHeader", required = false) Boolean oldIncludeHeader,
+ @RequestParam(value = "includeHeader", required = false) Boolean includeHeader,
@Valid @RequestBody(required = false) final AsyncQuerySQLRequest sqlRequest, HttpServletResponse response,
@RequestParam(value = "project", required = false) String project) throws IOException {
if (project == null) {
if (sqlRequest == null) {
- throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty());
+ throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY);
}
project = sqlRequest.getProject();
}
+ if (oldIncludeHeader != null || includeHeader != null) {
+ throw new KylinException(ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY);
+ }
aclEvaluate.checkProjectQueryPermission(project);
checkProjectName(project);
KylinConfig config = queryService.getConfig();
@@ -356,8 +362,7 @@ public class NAsyncQueryController extends NBasicController {
response.setContentType("application/" + format + ";charset=" + encode);
}
response.setHeader("Content-Disposition", "attachment; filename=\"" + fileName + "." + format + "\"");
- asyncQueryService.retrieveSavedQueryResult(project, queryId, includeHeader || include_header, response, format,
- encode, fileInfo.getSeparator());
+ asyncQueryService.retrieveSavedQueryResult(project, queryId, response, format, encode);
}
@ApiOperation(value = "async query result path", tags = { "QE" })
@@ -368,7 +373,7 @@ public class NAsyncQueryController extends NBasicController {
@RequestParam(value = "project", required = false) String project) throws IOException {
if (project == null) {
if (sqlRequest == null) {
- throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty());
+ throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY);
}
project = sqlRequest.getProject();
}
diff --git a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2.java b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2.java
index 498f154904..190de73c9b 100644
--- a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2.java
+++ b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2.java
@@ -19,17 +19,21 @@
package org.apache.kylin.rest.controller;
import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V2_JSON;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND;
+import java.io.IOException;
+import java.util.List;
+
+import javax.servlet.http.HttpServletResponse;
+import javax.validation.Valid;
+
+import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.rest.request.AsyncQuerySQLRequest;
import org.apache.kylin.rest.request.AsyncQuerySQLRequestV2;
import org.apache.kylin.rest.response.AsyncQueryResponse;
import org.apache.kylin.rest.response.AsyncQueryResponseV2;
-import org.apache.kylin.rest.service.AsyncQueryService;
-import io.swagger.annotations.ApiOperation;
-import org.apache.kylin.common.exception.KylinException;
-import org.apache.kylin.common.msg.MsgPicker;
-import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException;
import org.apache.kylin.rest.response.EnvelopeResponse;
+import org.apache.kylin.rest.service.AsyncQueryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
@@ -41,10 +45,7 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
-import javax.servlet.http.HttpServletResponse;
-import javax.validation.Valid;
-import java.io.IOException;
-import java.util.List;
+import io.swagger.annotations.ApiOperation;
@RestController
@@ -68,6 +69,7 @@ public class NAsyncQueryControllerV2 extends NBasicController {
sqlRequest.setProject(asyncQuerySQLRequest.getProject());
sqlRequest.setSql(asyncQuerySQLRequest.getSql());
sqlRequest.setSeparator(asyncQuerySQLRequest.getSeparator());
+ sqlRequest.setIncludeHeader(asyncQuerySQLRequest.isIncludeHeader());
sqlRequest.setFormat("csv");
sqlRequest.setEncode("utf-8");
sqlRequest.setFileName("result");
@@ -112,7 +114,7 @@ public class NAsyncQueryControllerV2 extends NBasicController {
@GetMapping(value = "/async_query/{query_id:.+}/result_download")
@ResponseBody
public void downloadQueryResult(@PathVariable("query_id") String queryId,
- @RequestParam(value = "includeHeader", required = false, defaultValue = "false") boolean includeHeader,
+ @RequestParam(value = "includeHeader", required = false) Boolean includeHeader,
HttpServletResponse response) throws IOException {
asyncQueryController.downloadQueryResult(queryId, includeHeader, includeHeader, null, response, searchProject(queryId));
}
@@ -120,7 +122,7 @@ public class NAsyncQueryControllerV2 extends NBasicController {
private String searchProject(String queryId) throws IOException {
String project = asyncQueryService.searchQueryResultProject(queryId);
if (project == null) {
- throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound());
+ throw new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND);
}
return project;
}
diff --git a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
index 093c5e9c48..65d74a56f0 100644
--- a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
+++ b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
@@ -19,11 +19,14 @@
package org.apache.kylin.rest.controller;
import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON;
+import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_PROJECT_NAME_EMPTY;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_TIME_FORMAT_ERROR;
import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.FAILED;
import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.MISS;
import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.RUNNING;
import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.SUCCESS;
-import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED;
import java.io.IOException;
import java.text.ParseException;
@@ -33,10 +36,13 @@ import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.msg.MsgPicker;
import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.request.AsyncQuerySQLRequest;
+import org.apache.kylin.rest.response.AsyncQueryResponse;
import org.apache.kylin.rest.response.EnvelopeResponse;
import org.apache.kylin.rest.response.SQLResponse;
+import org.apache.kylin.rest.service.AsyncQueryService;
import org.apache.kylin.rest.service.QueryService;
import org.apache.kylin.rest.util.AclEvaluate;
import org.junit.After;
@@ -56,11 +62,6 @@ import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
-import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
-import org.apache.kylin.rest.request.AsyncQuerySQLRequest;
-import org.apache.kylin.rest.response.AsyncQueryResponse;
-import org.apache.kylin.rest.service.AsyncQueryService;
-
public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
private static final String PROJECT = "default";
@@ -108,6 +109,7 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
asyncQuerySQLRequest.setProject(PROJECT);
asyncQuerySQLRequest.setSql("select PART_DT from KYLIN_SALES limit 500");
asyncQuerySQLRequest.setSeparator(",");
+ asyncQuerySQLRequest.setIncludeHeader(false);
return asyncQuerySQLRequest;
}
@@ -216,8 +218,8 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
.accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
.andExpect(MockMvcResultMatchers.status().isInternalServerError());
- Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.anyBoolean(),
- Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.any());
+ Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any());
}
@Test
@@ -225,8 +227,7 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
Mockito.doReturn(true).when(asyncQueryService).hasPermission(Mockito.anyString(), Mockito.anyString());
Mockito.doThrow(new IOException()).when(asyncQueryService).getFileInfo(Mockito.anyString(),
Mockito.anyString());
- Mockito.doThrow(new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound()))
- .when(asyncQueryService)
+ Mockito.doThrow(new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND)).when(asyncQueryService)
.checkStatus(Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString());
mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/result_download", "123")
@@ -238,8 +239,8 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
"Can’t find the query by this query ID in this project. Please check and try again."));
});
- Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.anyBoolean(),
- Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.any());
+ Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any());
}
@Test
@@ -407,11 +408,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query").param("project", PROJECT)
.param("older_than", "2011-11/11 11:11:11")
.accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))).andExpect(result -> {
- Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException);
- Assert.assertEquals("KE-020040001",
- ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode()
- .getCodeString());
- Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryTimeFormatError(),
+ Assert.assertTrue(result.getResolvedException() instanceof KylinException);
+ Assert.assertEquals("KE-010031303",
+ ((KylinException) result.getResolvedException()).getErrorCode().getCodeString());
+ Assert.assertEquals(ASYNC_QUERY_TIME_FORMAT_ERROR.getMsg(),
result.getResolvedException().getMessage());
});
}
@@ -586,8 +586,8 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
resolvedException.getMessage());
});
- Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.anyBoolean(),
- Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.any());
+ Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any());
}
@Test
@@ -603,8 +603,8 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
.accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
.andExpect(MockMvcResultMatchers.status().isOk());
- Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.anyBoolean(),
- Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.any());
+ Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any());
}
@Test
@@ -638,11 +638,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query/{query_id}", "123")
.contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
.andExpect(result -> {
- Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException);
- Assert.assertEquals("KE-020040001",
- ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode()
- .getCodeString());
- Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(),
+ Assert.assertTrue(result.getResolvedException() instanceof KylinException);
+ Assert.assertEquals("KE-010031302",
+ ((KylinException) result.getResolvedException()).getErrorCode().getCodeString());
+ Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(),
result.getResolvedException().getMessage());
});
}
@@ -652,11 +651,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/status", "123")
.contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
.andExpect(result -> {
- Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException);
- Assert.assertEquals("KE-020040001",
- ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode()
- .getCodeString());
- Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(),
+ Assert.assertTrue(result.getResolvedException() instanceof KylinException);
+ Assert.assertEquals("KE-010031302",
+ ((KylinException) result.getResolvedException()).getErrorCode().getCodeString());
+ Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(),
result.getResolvedException().getMessage());
});
}
@@ -666,11 +664,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/file_status", "123")
.contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
.andExpect(result -> {
- Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException);
- Assert.assertEquals("KE-020040001",
- ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode()
- .getCodeString());
- Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(),
+ Assert.assertTrue(result.getResolvedException() instanceof KylinException);
+ Assert.assertEquals("KE-010031302",
+ ((KylinException) result.getResolvedException()).getErrorCode().getCodeString());
+ Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(),
result.getResolvedException().getMessage());
});
}
@@ -680,11 +677,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/metadata", "123")
.contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
.andExpect(result -> {
- Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException);
- Assert.assertEquals("KE-020040001",
- ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode()
- .getCodeString());
- Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(),
+ Assert.assertTrue(result.getResolvedException() instanceof KylinException);
+ Assert.assertEquals("KE-010031302",
+ ((KylinException) result.getResolvedException()).getErrorCode().getCodeString());
+ Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(),
result.getResolvedException().getMessage());
});
}
@@ -694,11 +690,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/result_download", "123")
.contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
.andExpect(result -> {
- Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException);
- Assert.assertEquals("KE-020040001",
- ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode()
- .getCodeString());
- Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(),
+ Assert.assertTrue(result.getResolvedException() instanceof KylinException);
+ Assert.assertEquals("KE-010031302",
+ ((KylinException) result.getResolvedException()).getErrorCode().getCodeString());
+ Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(),
result.getResolvedException().getMessage());
});
}
@@ -708,11 +703,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase {
mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id}/result_path", "123")
.contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
.andExpect(result -> {
- Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException);
- Assert.assertEquals("KE-020040001",
- ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode()
- .getCodeString());
- Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(),
+ Assert.assertTrue(result.getResolvedException() instanceof KylinException);
+ Assert.assertEquals("KE-010031302",
+ ((KylinException) result.getResolvedException()).getErrorCode().getCodeString());
+ Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(),
result.getResolvedException().getMessage());
});
}
diff --git a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java
index e570c0fd3e..d33135cc6f 100644
--- a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java
+++ b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java
@@ -19,22 +19,27 @@
package org.apache.kylin.rest.controller;
import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V2_JSON;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY;
import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.FAILED;
import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.MISS;
import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.RUNNING;
import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.SUCCESS;
-import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
-import org.apache.kylin.rest.request.AsyncQuerySQLRequestV2;
-import org.apache.kylin.rest.service.AsyncQueryService;
+import java.io.IOException;
+
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.request.AsyncQuerySQLRequestV2;
import org.apache.kylin.rest.response.SQLResponse;
+import org.apache.kylin.rest.service.AsyncQueryService;
import org.apache.kylin.rest.service.QueryService;
import org.apache.kylin.rest.util.AclEvaluate;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InjectMocks;
@@ -50,9 +55,6 @@ import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
-
-import java.io.IOException;
-
public class NAsyncQueryControllerV2Test extends NLocalFileMetadataTestCase {
private static final String PROJECT = "default";
@@ -80,8 +82,8 @@ public class NAsyncQueryControllerV2Test extends NLocalFileMetadataTestCase {
public void setup() throws IOException {
MockitoAnnotations.initMocks(this);
- mockMvc = MockMvcBuilders.standaloneSetup(nAsyncQueryControllerV2).defaultRequest(MockMvcRequestBuilders.get("/"))
- .build();
+ mockMvc = MockMvcBuilders.standaloneSetup(nAsyncQueryControllerV2)
+ .defaultRequest(MockMvcRequestBuilders.get("/")).build();
SecurityContextHolder.getContext().setAuthentication(authentication);
createTestMetadata();
@@ -237,7 +239,27 @@ public class NAsyncQueryControllerV2Test extends NLocalFileMetadataTestCase {
.accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V2_JSON)))
.andExpect(MockMvcResultMatchers.status().isOk());
- Mockito.verify(nAsyncQueryControllerV2).downloadQueryResult(Mockito.anyString(), Mockito.anyBoolean(), Mockito.any());
+ Mockito.verify(nAsyncQueryControllerV2).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any());
+ }
+
+ @Test
+ public void testDownloadQueryResultNotIncludeHeader() throws Exception {
+ Mockito.doReturn(true).when(asyncQueryService).hasPermission(Mockito.anyString(), Mockito.anyString());
+ AsyncQueryService.FileInfo fileInfo = new AsyncQueryService.FileInfo("csv", "gbk", "result");
+ Mockito.doReturn(fileInfo).when(asyncQueryService).getFileInfo(Mockito.anyString(), Mockito.anyString());
+ Mockito.doReturn(KylinConfig.getInstanceFromEnv()).when(kapQueryService).getConfig();
+
+ mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/result_download", "123")
+ .param("includeHeader", "false").contentType(MediaType.APPLICATION_JSON)
+ .content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest()))
+ .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V2_JSON)))
+ .andExpect(MockMvcResultMatchers.status().isInternalServerError()).andExpect(result -> {
+ Assert.assertTrue(result.getResolvedException() instanceof KylinException);
+ Assert.assertEquals(ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY.getMsg(),
+ result.getResolvedException().getMessage());
+ });
+
+ Mockito.verify(nAsyncQueryControllerV2).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any());
}
}
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/request/AsyncQuerySQLRequestV2.java b/src/query-service/src/main/java/org/apache/kylin/rest/request/AsyncQuerySQLRequestV2.java
index 5e16dde7b1..eb8e11bd07 100644
--- a/src/query-service/src/main/java/org/apache/kylin/rest/request/AsyncQuerySQLRequestV2.java
+++ b/src/query-service/src/main/java/org/apache/kylin/rest/request/AsyncQuerySQLRequestV2.java
@@ -18,6 +18,7 @@
package org.apache.kylin.rest.request;
+import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@@ -35,5 +36,7 @@ public class AsyncQuerySQLRequestV2 implements Serializable, ProjectInsensitiveR
private String separator = ",";
private Integer offset = 0;
private Integer limit = 0;
+ @JsonProperty("include_header")
+ private boolean includeHeader;
}
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryService.java
index 55faca587a..71f8ca7e9a 100644
--- a/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryService.java
+++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryService.java
@@ -18,6 +18,7 @@
package org.apache.kylin.rest.service;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND;
import static org.apache.kylin.query.util.AsyncQueryUtil.getUserFileName;
import static org.apache.kylin.rest.util.AclPermissionUtil.isAdmin;
@@ -47,6 +48,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.msg.Message;
import org.apache.kylin.common.msg.MsgPicker;
import org.apache.kylin.metadata.project.NProjectManager;
@@ -54,10 +56,6 @@ import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException;
import org.apache.kylin.query.util.AsyncQueryUtil;
import org.apache.kylin.rest.exception.NotFoundException;
-import org.apache.poi.ss.usermodel.Sheet;
-import org.apache.poi.ss.usermodel.Workbook;
-import org.apache.poi.xssf.usermodel.XSSFWorkbook;
-import org.apache.spark.sql.SparderEnv;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.core.context.SecurityContextHolder;
@@ -114,8 +112,8 @@ public class AsyncQueryService extends BasicService {
}
}
- public void retrieveSavedQueryResult(String project, String queryId, boolean includeHeader,
- HttpServletResponse response, String fileFormat, String encode, String separator) throws IOException {
+ public void retrieveSavedQueryResult(String project, String queryId, HttpServletResponse response,
+ String fileFormat, String encode) throws IOException {
checkStatus(queryId, QueryStatus.SUCCESS, project, MsgPicker.getMsg().getQueryResultNotFound());
FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
@@ -126,37 +124,15 @@ public class AsyncQueryService extends BasicService {
}
try (ServletOutputStream outputStream = response.getOutputStream()) {
- String columnNames = null;
- if (includeHeader) {
- columnNames = processHeader(fileSystem, dataPath);
- if (columnNames != null) {
- logger.debug("Query:{}, columnMeta:{}", columnNames, columnNames);
- if (!columnNames.endsWith(IOUtils.LINE_SEPARATOR_UNIX)) {
- columnNames = columnNames + IOUtils.LINE_SEPARATOR_UNIX;
- }
- } else {
- logger.error("Query:{}, no columnMeta found", queryId);
- }
- }
switch (fileFormat) {
case "csv":
- CSVWriter csvWriter = new CSVWriter();
- processCSV(outputStream, dataPath, includeHeader, columnNames, csvWriter, separator);
- break;
- case "json":
- processJSON(outputStream, dataPath, encode);
- break;
case "xlsx":
- if (!includeHeader) {
- processFile(outputStream, dataPath);
- } else {
- XLSXExcelWriter xlsxExcelWriter = new XLSXExcelWriter();
- processXLSX(outputStream, dataPath, includeHeader, columnNames, xlsxExcelWriter);
- }
- break;
case "parquet":
processFile(outputStream, dataPath);
break;
+ case "json":
+ processJSON(outputStream, dataPath, encode);
+ break;
default:
logger.info("Query:{}, processed", queryId);
}
@@ -281,7 +257,7 @@ public class AsyncQueryService extends BasicService {
public boolean deleteByQueryId(String project, String queryId) throws IOException {
Path resultDir = getAsyncQueryResultDir(project, queryId);
if (queryStatus(project, queryId) == QueryStatus.MISS) {
- throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound());
+ throw new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND);
}
logger.info("clean async query result for query id [{}]", queryId);
return AsyncQueryUtil.getFileSystem().delete(resultDir, true);
@@ -324,7 +300,7 @@ public class AsyncQueryService extends BasicService {
public String asyncQueryResultPath(String project, String queryId) throws IOException {
if (queryStatus(project, queryId) == QueryStatus.MISS) {
- throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound());
+ throw new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND);
}
return getAsyncQueryResultDir(project, queryId).toString();
}
@@ -344,28 +320,6 @@ public class AsyncQueryService extends BasicService {
return new Path(KapConfig.getInstanceFromEnv().getAsyncResultBaseDir(project), queryId);
}
- private String processHeader(FileSystem fileSystem, Path dataPath) throws IOException {
-
- FileStatus[] fileStatuses = fileSystem.listStatus(dataPath);
- for (FileStatus header : fileStatuses) {
- if (header.getPath().getName().equals(AsyncQueryUtil.getMetaDataFileName())) {
- try (FSDataInputStream inputStream = fileSystem.open(header.getPath());
- BufferedReader bufferedReader = new BufferedReader(
- new InputStreamReader(inputStream, Charset.defaultCharset()))) {
- return bufferedReader.readLine();
- }
- }
- }
- return null;
- }
-
- private void processCSV(OutputStream outputStream, Path dataPath, boolean includeHeader, String columnNames,
- CSVWriter excelWriter, String separator) throws IOException {
- FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
- FileStatus[] fileStatuses = fileSystem.listStatus(dataPath);
- excelWriter.writeData(fileStatuses, outputStream, columnNames, separator, includeHeader);
- }
-
private void processJSON(OutputStream outputStream, Path dataPath, String encode) throws IOException {
FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
FileStatus[] fileStatuses = fileSystem.listStatus(dataPath);
@@ -395,25 +349,6 @@ public class AsyncQueryService extends BasicService {
}
}
- private void processXLSX(OutputStream outputStream, Path dataPath, boolean includeHeader, String columnNames, XLSXExcelWriter excelWriter)
- throws IOException {
- FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
- FileStatus[] fileStatuses = fileSystem.listStatus(dataPath);
- try (Workbook wb = new XSSFWorkbook()) {
- Sheet sheet = wb.createSheet("query_result");
- // Apply column names
- if (includeHeader && columnNames != null) {
- org.apache.poi.ss.usermodel.Row excelRow = sheet.createRow(0);
- String[] columnNameArray = columnNames.split(SparderEnv.getSeparator());
- for (int i = 0; i < columnNameArray.length; i++) {
- excelRow.createCell(i).setCellValue(columnNameArray[i]);
- }
- }
- excelWriter.writeData(fileStatuses, sheet);
- wb.write(outputStream);
- }
- }
-
public enum QueryStatus {
RUNNING, FAILED, SUCCESS, MISS
}
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/CSVWriter.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/CSVWriter.java
deleted file mode 100644
index a17dcf318c..0000000000
--- a/src/query-service/src/main/java/org/apache/kylin/rest/service/CSVWriter.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.rest.service;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.spark.sql.SparderEnv;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.collection.JavaConverters;
-
-public class CSVWriter {
-
- private static final Logger logger = LoggerFactory.getLogger("query");
-
- private static final String QUOTE_CHAR = "\"";
- private static final String END_OF_LINE_SYMBOLS = IOUtils.LINE_SEPARATOR_UNIX;
-
- public void writeData(FileStatus[] fileStatuses, OutputStream outputStream,
- String columnNames, String separator, boolean includeHeaders) throws IOException {
-
- try (Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) {
- if (includeHeaders) {
- writer.write(columnNames.replace(",", separator));
- writer.flush();
- }
- for (FileStatus fileStatus : fileStatuses) {
- if (!fileStatus.getPath().getName().startsWith("_")) {
- if (fileStatus.getPath().getName().endsWith("parquet")) {
- writeDataByParquet(fileStatus, writer, separator);
- } else {
- writeDataByCsv(fileStatus, writer, separator);
- }
- }
- }
-
- writer.flush();
- }
- }
-
- public static void writeCsv(Iterator<List<Object>> rows, Writer writer, String separator) {
- rows.forEachRemaining(row -> {
- StringBuilder builder = new StringBuilder();
-
- for (int i = 0; i < row.size(); i++) {
- Object cell = row.get(i);
- String column = cell == null ? "" : cell.toString();
-
- if (i > 0) {
- builder.append(separator);
- }
-
- final String escapedCsv = encodeCell(column, separator);
- builder.append(escapedCsv);
- }
- builder.append(END_OF_LINE_SYMBOLS); // EOL
- try {
- writer.write(builder.toString());
- } catch (IOException e) {
- logger.error("Failed to download asyncQueryResult csvExcel by parquet", e);
- }
- });
- }
-
- private void writeDataByParquet(FileStatus fileStatus, Writer writer, String separator) {
- List<org.apache.spark.sql.Row> rowList = SparderEnv.getSparkSession().read()
- .parquet(fileStatus.getPath().toString()).collectAsList();
- writeCsv(rowList.stream().map(row -> JavaConverters.seqAsJavaList(row.toSeq())).iterator(), writer, separator);
- }
-
- // the encode logic is copied from org.supercsv.encoder.DefaultCsvEncoder.encode
- private static String encodeCell(String cell, String separator) {
-
- boolean needQuote = cell.contains(separator) || cell.contains("\r") || cell.contains("\n");
-
- if (cell.contains(QUOTE_CHAR)) {
- needQuote = true;
- // escape
- cell = cell.replace(QUOTE_CHAR, QUOTE_CHAR + QUOTE_CHAR);
- }
-
- if (needQuote) {
- return QUOTE_CHAR + cell + QUOTE_CHAR;
- } else {
- return cell;
- }
- }
-
- private void writeDataByCsv(FileStatus fileStatus, Writer writer, String separator) {
- List<org.apache.spark.sql.Row> rowList = SparderEnv.getSparkSession().read()
- .csv(fileStatus.getPath().toString()).collectAsList();
- writeCsv(rowList.stream().map(row -> JavaConverters.seqAsJavaList(row.toSeq())).iterator(), writer, separator);
- }
-
-}
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/XLSXExcelWriter.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/XLSXExcelWriter.java
deleted file mode 100644
index e54678f375..0000000000
--- a/src/query-service/src/main/java/org/apache/kylin/rest/service/XLSXExcelWriter.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.rest.service;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.query.util.AsyncQueryUtil;
-import org.apache.poi.ss.usermodel.CellType;
-import org.apache.poi.ss.usermodel.Row;
-import org.apache.poi.ss.usermodel.Sheet;
-import org.apache.poi.xssf.usermodel.XSSFCell;
-import org.apache.poi.xssf.usermodel.XSSFRow;
-import org.apache.poi.xssf.usermodel.XSSFSheet;
-import org.apache.poi.xssf.usermodel.XSSFWorkbook;
-import org.apache.spark.sql.SparderEnv;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.clearspring.analytics.util.Lists;
-
-import lombok.val;
-
-public class XLSXExcelWriter {
-
- private static final Logger logger = LoggerFactory.getLogger("query");
-
- public void writeData(FileStatus[] fileStatuses, Sheet sheet) {
- for (FileStatus fileStatus : fileStatuses) {
- if (!fileStatus.getPath().getName().startsWith("_")) {
- if (fileStatus.getPath().getName().endsWith("parquet")) {
- writeDataByParquet(fileStatus, sheet);
- } else if (fileStatus.getPath().getName().endsWith("xlsx")) {
- writeDataByXlsx(fileStatus, sheet);
- } else {
- writeDataByCsv(fileStatus, sheet);
- }
- }
- }
- }
-
- private void writeDataByXlsx(FileStatus f, Sheet sheet) {
- boolean createTempFileStatus = false;
- File file = new File("temp.xlsx");
- try {
- createTempFileStatus = file.createNewFile();
- FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
- fileSystem.copyToLocalFile(f.getPath(), new Path(file.getPath()));
- } catch (Exception e) {
- logger.error("Export excel writeDataByXlsx create exception f:{} createTempFileStatus:{} ",
- f.getPath(), createTempFileStatus, e);
- }
- try (InputStream is = new FileInputStream(file.getAbsolutePath());
- XSSFWorkbook sheets = new XSSFWorkbook(is)) {
- final AtomicInteger offset = new AtomicInteger(sheet.getPhysicalNumberOfRows());
- XSSFSheet sheetAt = sheets.getSheetAt(0);
- for (int i = 0; i < sheetAt.getPhysicalNumberOfRows(); i++) {
- XSSFRow row = sheetAt.getRow(i);
- org.apache.poi.ss.usermodel.Row excelRow = sheet.createRow(offset.get());
- offset.incrementAndGet();
- for (int index = 0; index < row.getPhysicalNumberOfCells(); index++) {
- XSSFCell cell = row.getCell(index);
- excelRow.createCell(index).setCellValue(getString(cell));
- }
- }
- Files.delete(file.toPath());
- } catch (Exception e) {
- logger.error("Export excel writeDataByXlsx handler exception f:{} createTempFileStatus:{} ",
- f.getPath(), createTempFileStatus, e);
- }
- }
-
- private static String getString(XSSFCell xssfCell) {
- if (xssfCell == null) {
- return "";
- }
- if (xssfCell.getCellType() == CellType.NUMERIC) {
- return String.valueOf(xssfCell.getNumericCellValue());
- } else if (xssfCell.getCellType() == CellType.BOOLEAN) {
- return String.valueOf(xssfCell.getBooleanCellValue());
- } else {
- return xssfCell.getStringCellValue();
- }
- }
-
- private void writeDataByParquet(FileStatus fileStatus, Sheet sheet) {
- final AtomicInteger offset = new AtomicInteger(sheet.getPhysicalNumberOfRows());
- List<org.apache.spark.sql.Row> rowList = SparderEnv.getSparkSession().read()
- .parquet(fileStatus.getPath().toString()).collectAsList();
- rowList.stream().forEach(row -> {
- org.apache.poi.ss.usermodel.Row excelRow = sheet.createRow(offset.get());
- offset.incrementAndGet();
- val list = row.toSeq().toList();
- for (int i = 0; i < list.size(); i++) {
- Object cell = list.apply(i);
- String column = cell == null ? "" : cell.toString();
- excelRow.createCell(i).setCellValue(column);
- }
- });
- }
-
- public void writeDataByCsv(FileStatus fileStatus, Sheet sheet) {
- FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
- List<String> rowResults = Lists.newArrayList();
- List<String[]> results = Lists.newArrayList();
- final AtomicInteger offset = new AtomicInteger(sheet.getPhysicalNumberOfRows());
- try (FSDataInputStream inputStream = fileSystem.open(fileStatus.getPath())) {
- BufferedReader bufferedReader = new BufferedReader(
- new InputStreamReader(inputStream, StandardCharsets.UTF_8));
- rowResults.addAll(Lists.newArrayList(bufferedReader.lines().collect(Collectors.toList())));
- for (String row : rowResults) {
- results.add(row.split(SparderEnv.getSeparator()));
- }
- for (int i = 0; i < results.size(); i++) {
- Row row = sheet.createRow(offset.get());
- offset.incrementAndGet();
- String[] rowValues = results.get(i);
- for (int j = 0; j < rowValues.length; j++) {
- row.createCell(j).setCellValue(rowValues[j]);
- }
- }
- } catch (IOException e) {
- logger.error("Failed to download asyncQueryResult xlsxExcel by csv", e);
- }
- }
-}
diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/AysncQueryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/AysncQueryServiceTest.java
index 282824ac65..241ffd3d10 100644
--- a/src/query-service/src/test/java/org/apache/kylin/rest/service/AysncQueryServiceTest.java
+++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/AysncQueryServiceTest.java
@@ -37,10 +37,10 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
-import java.io.StringWriter;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -58,10 +58,14 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.metadata.query.QueryMetricsContext;
import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
+import org.apache.kylin.query.engine.QueryExec;
import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException;
import org.apache.kylin.query.pushdown.SparkSqlClient;
+import org.apache.kylin.query.runtime.plan.ResultPlan;
import org.apache.kylin.query.util.AsyncQueryUtil;
import org.apache.kylin.rest.response.SQLResponse;
import org.apache.poi.ss.usermodel.CellType;
@@ -87,6 +91,7 @@ import org.supercsv.io.ICsvListWriter;
import org.supercsv.prefs.CsvPreference;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import lombok.val;
@@ -163,38 +168,23 @@ public class AysncQueryServiceTest extends ServiceTestBase {
}
@Test
- public void testAsyncQueryWithParquetSpecialCharacters() throws IOException {
+ public void testAsyncQueryAndDownloadCsvResultNotIncludeHeader() throws IOException {
QueryContext queryContext = QueryContext.current();
String queryId = queryContext.getQueryId();
mockMetadata(queryId, true);
queryContext.getQueryTagInfo().setAsyncQuery(true);
- queryContext.getQueryTagInfo().setFileFormat("CSV");
+ queryContext.getQueryTagInfo().setFileFormat("csv");
queryContext.getQueryTagInfo().setFileEncode("utf-8");
- String sql = "select '\\(123\\)','123'";
- queryContext.setProject(PROJECT);
+ queryContext.getQueryTagInfo().setSeparator(",");
+ queryContext.getQueryTagInfo().setIncludeHeader(false);
- ss.sqlContext().setConf("spark.sql.parquet.columnNameCheck.enabled", "false");
- SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
-
- await().atMost(60000, TimeUnit.MILLISECONDS).until(
- () -> AsyncQueryService.QueryStatus.SUCCESS.equals(asyncQueryService.queryStatus(PROJECT, queryId)));
- HttpServletResponse response = mock(HttpServletResponse.class);
- ServletOutputStream servletOutputStream = mock(ServletOutputStream.class);
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- when(response.getOutputStream()).thenReturn(servletOutputStream);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- Object[] arguments = invocationOnMock.getArguments();
- baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]);
- return null;
- }
- }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
+ String sql = "select '123\"','123'";
+ queryContext.setProject(PROJECT);
+ ResultPlan.getResult(ss.sql(sql), null);
+ assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
- SparderEnv.getSparkSession().sqlContext().setConf("spark.sql.parquet.columnNameCheck.enabled", "false");
- asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "csv", encodeDefault, ",");
List<org.apache.spark.sql.Row> rowList = ss.read()
- .parquet(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
+ .csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
List<String> result = Lists.newArrayList();
rowList.stream().forEach(row -> {
val list = row.toSeq().toList();
@@ -204,35 +194,35 @@ public class AysncQueryServiceTest extends ServiceTestBase {
result.add(column);
}
});
- assertEquals("(123)" + "123", result.get(0) + result.get(1));
+ assertEquals("123\"\"" + "123", result.get(0) + result.get(1));
+
+ // download asyncQuery result
+ HttpServletResponse response = mock(HttpServletResponse.class);
+ ByteArrayOutputStream baos = mockOutputStream(response);
+ asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault);
+ Assert.assertEquals("\"123\"\"\",123\n", baos.toString(StandardCharsets.UTF_8.name()));
}
@Test
- public void testAsyncQueryDownCsvResultByParquet() throws IOException {
+ public void testAsyncQueryAndDownloadCsvResultIncludeHeader() throws IOException, SQLException {
QueryContext queryContext = QueryContext.current();
String queryId = queryContext.getQueryId();
mockMetadata(queryId, true);
queryContext.getQueryTagInfo().setAsyncQuery(true);
queryContext.getQueryTagInfo().setFileFormat("csv");
queryContext.getQueryTagInfo().setFileEncode("utf-8");
+ queryContext.getQueryTagInfo().setSeparator(",");
+ queryContext.getQueryTagInfo().setIncludeHeader(true);
+
String sql = "select '123\"','123'";
queryContext.setProject(PROJECT);
- SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
+
+ new QueryExec(PROJECT, getTestConfig()).executeQuery(sql);
+
assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
- HttpServletResponse response = mock(HttpServletResponse.class);
- ServletOutputStream servletOutputStream = mock(ServletOutputStream.class);
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- when(response.getOutputStream()).thenReturn(servletOutputStream);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- Object[] arguments = invocationOnMock.getArguments();
- baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]);
- return null;
- }
- }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
- asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "csv", encodeDefault, ",");
- List<org.apache.spark.sql.Row> rowList = ss.read().csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
+
+ List<org.apache.spark.sql.Row> rowList = ss.read()
+ .csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
List<String> result = Lists.newArrayList();
rowList.stream().forEach(row -> {
val list = row.toSeq().toList();
@@ -242,130 +232,146 @@ public class AysncQueryServiceTest extends ServiceTestBase {
result.add(column);
}
});
- assertEquals("123\"" + "123", result.get(0) + result.get(1));
+ assertEquals("EXPR$0" + "EXPR$1", result.get(0) + result.get(1));
+ assertEquals("123\"\"" + "123", result.get(2) + result.get(3));
+
+ // download asyncQuery result
+ HttpServletResponse response = mock(HttpServletResponse.class);
+ ByteArrayOutputStream baos = mockOutputStream(response);
+ asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault);
+ Assert.assertEquals("EXPR$0,EXPR$1\n\"123\"\"\",123\n", baos.toString(StandardCharsets.UTF_8.name()));
}
@Test
- public void testSuccessQueryAndDownloadXlsxWriter() throws IOException {
+ public void testAsyncQueryPushDownAndDownloadCsvResultNotIncludeHeader() throws IOException {
QueryContext queryContext = QueryContext.current();
String queryId = queryContext.getQueryId();
mockMetadata(queryId, true);
queryContext.getQueryTagInfo().setAsyncQuery(true);
- queryContext.getQueryTagInfo().setFileFormat("xlsx");
+ queryContext.getQueryTagInfo().setFileFormat("csv");
queryContext.getQueryTagInfo().setFileEncode("utf-8");
- String sql = "select '123\"' as col1,'123' as col2";
+ queryContext.getQueryTagInfo().setSeparator(",");
+ queryContext.getQueryTagInfo().setIncludeHeader(false);
+
+ String sql = "select '123\"','123'";
queryContext.setProject(PROJECT);
+
SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
- HttpServletResponse response = mock(HttpServletResponse.class);
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ServletOutputStream servletOutputStream = mock(ServletOutputStream.class);
- when(response.getOutputStream()).thenReturn(servletOutputStream);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- Object[] arguments = invocationOnMock.getArguments();
- baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]);
- return null;
+
+ List<org.apache.spark.sql.Row> rowList = ss.read()
+ .csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
+ List<String> result = Lists.newArrayList();
+ rowList.stream().forEach(row -> {
+ val list = row.toSeq().toList();
+ for (int i = 0; i < list.size(); i++) {
+ Object cell = list.apply(i);
+ String column = cell == null ? "" : cell.toString();
+ result.add(column);
}
- }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
- asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "xlsx", encodeDefault, ",");
- FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
- FileStatus[] fileStatuses = fileSystem.listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()));
- XLSXExcelWriter xlsxExcelWriter = new XLSXExcelWriter();
- XSSFWorkbook workbook = new XSSFWorkbook();
- XSSFSheet sheet = workbook.createSheet();
- xlsxExcelWriter.writeData(fileStatuses, sheet);
- XSSFRow row = sheet.getRow(0);
- assertEquals("123\",123", row.getCell(0) + "," + row.getCell(1));
- assertEquals("[col1, col2]", QueryContext.current().getColumnNames().toString());
+ });
+ assertEquals("123\"\"" + "123", result.get(0) + result.get(1));
+
+ // download asyncQuery pushDown result
+ HttpServletResponse response = mock(HttpServletResponse.class);
+ ByteArrayOutputStream baos = mockOutputStream(response);
+ asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault);
+ Assert.assertEquals("\"123\"\"\",123\n", baos.toString(StandardCharsets.UTF_8.name()));
}
@Test
- public void testSuccessQueryAndDownloadCSV() throws IOException {
+ public void testAsyncQueryPushDownAndDownloadCsvResultIncludeHeader() throws IOException {
QueryContext queryContext = QueryContext.current();
String queryId = queryContext.getQueryId();
mockMetadata(queryId, true);
queryContext.getQueryTagInfo().setAsyncQuery(true);
queryContext.getQueryTagInfo().setFileFormat("csv");
queryContext.getQueryTagInfo().setFileEncode("utf-8");
- String sql = "select '123\"' as col1,'123' as col2";
+ queryContext.getQueryTagInfo().setSeparator(",");
+ queryContext.getQueryTagInfo().setIncludeHeader(true);
+
+ String sql = "select '123\"','123'";
queryContext.setProject(PROJECT);
+
SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
- HttpServletResponse response = mock(HttpServletResponse.class);
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ServletOutputStream servletOutputStream = mock(ServletOutputStream.class);
- when(response.getOutputStream()).thenReturn(servletOutputStream);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- Object[] arguments = invocationOnMock.getArguments();
- baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]);
- return null;
+
+ List<org.apache.spark.sql.Row> rowList = ss.read()
+ .csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
+ List<String> result = Lists.newArrayList();
+ rowList.stream().forEach(row -> {
+ val list = row.toSeq().toList();
+ for (int i = 0; i < list.size(); i++) {
+ Object cell = list.apply(i);
+ String column = cell == null ? "" : cell.toString();
+ result.add(column);
}
- }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
- asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, true, response, "xlsx", encodeDefault, ",");
- FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
- FileStatus[] fileStatuses = fileSystem.listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()));
- XLSXExcelWriter xlsxExcelWriter = new XLSXExcelWriter();
- XSSFWorkbook workbook = new XSSFWorkbook();
- XSSFSheet sheet = workbook.createSheet();
- xlsxExcelWriter.writeData(fileStatuses, sheet);
- XSSFRow row = sheet.getRow(0);
- assertEquals("\"123\\\"\",123", row.getCell(0) + "," + row.getCell(1));
- assertEquals("[col1, col2]", QueryContext.current().getColumnNames().toString());
+ });
+ assertEquals("123\"" + "123", result.get(0) + result.get(1));
+ assertEquals("123\"\"" + "123", result.get(2) + result.get(3));
+
+ // download asyncQuery pushDown result
+ HttpServletResponse response = mock(HttpServletResponse.class);
+ ByteArrayOutputStream baos = mockOutputStream(response);
+ asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault);
+ Assert.assertEquals("123\",123\n\"123\"\"\",123\n", baos.toString(StandardCharsets.UTF_8.name()));
}
@Test
- public void testSuccessQueryAndDownloadCSVForDateFormat() throws IOException {
+ public void testAsyncQueryAndDownloadCsvResultSpecialSeparator() throws IOException, SQLException {
+ String separator = "\n";
QueryContext queryContext = QueryContext.current();
String queryId = queryContext.getQueryId();
mockMetadata(queryId, true);
queryContext.getQueryTagInfo().setAsyncQuery(true);
queryContext.getQueryTagInfo().setFileFormat("csv");
queryContext.getQueryTagInfo().setFileEncode("utf-8");
- String sql = "select '123\"' as col1,'123' as col2, date'2021-02-01' as col3";
+ queryContext.getQueryTagInfo().setSeparator(separator);
+ queryContext.getQueryTagInfo().setIncludeHeader(false);
+
+ String sql = "select '123\"','123'";
queryContext.setProject(PROJECT);
- SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
+
+ new QueryExec(PROJECT, getTestConfig()).executeQuery(sql);
+
assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
- HttpServletResponse response = mock(HttpServletResponse.class);
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ServletOutputStream servletOutputStream = mock(ServletOutputStream.class);
- when(response.getOutputStream()).thenReturn(servletOutputStream);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- Object[] arguments = invocationOnMock.getArguments();
- baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]);
- return null;
+
+ List<org.apache.spark.sql.Row> rowList = ss.read()
+ .csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
+ List<String> result = Lists.newArrayList();
+ rowList.stream().forEach(row -> {
+ val list = row.toSeq().toList();
+ for (int i = 0; i < list.size(); i++) {
+ Object cell = list.apply(i);
+ String column = cell == null ? "" : cell.toString();
+ result.add(column);
}
- }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
- asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, true, response, "xlsx", encodeDefault, ",");
- FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
- FileStatus[] fileStatuses = fileSystem.listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()));
- XLSXExcelWriter xlsxExcelWriter = new XLSXExcelWriter();
- XSSFWorkbook workbook = new XSSFWorkbook();
- XSSFSheet sheet = workbook.createSheet();
- xlsxExcelWriter.writeData(fileStatuses, sheet);
- XSSFRow row = sheet.getRow(0);
- assertEquals("\"123\\\"\",123,2021-02-01", row.getCell(0)
- + "," + row.getCell(1) + "," + row.getCell(2));
- assertEquals("[col1, col2, col3]", QueryContext.current().getColumnNames().toString());
+ });
+ assertEquals("123\"\"" + "123", result.get(0) + result.get(1));
+
+ // download asyncQuery result
+ HttpServletResponse response = mock(HttpServletResponse.class);
+ ByteArrayOutputStream baos = mockOutputStream(response);
+ asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault);
+ Assert.assertEquals("\"123\"\"\"\n" + "123\n", baos.toString(StandardCharsets.UTF_8.name()));
}
@Test
- public void testSuccessQueryAndDownloadCSVNotIncludeHeader() throws IOException {
+ public void testAsyncQueryWithParquetSpecialCharacters() throws IOException {
QueryContext queryContext = QueryContext.current();
String queryId = queryContext.getQueryId();
mockMetadata(queryId, true);
queryContext.getQueryTagInfo().setAsyncQuery(true);
- queryContext.getQueryTagInfo().setFileFormat("csv");
+ queryContext.getQueryTagInfo().setFileFormat("CSV");
queryContext.getQueryTagInfo().setFileEncode("utf-8");
- String sql = "select '123\"','123'";
+ String sql = "select '\\(123\\)','123'";
queryContext.setProject(PROJECT);
+
+ ss.sqlContext().setConf("spark.sql.parquet.columnNameCheck.enabled", "false");
SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
- assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
+
+ await().atMost(60000, TimeUnit.MILLISECONDS).until(
+ () -> AsyncQueryService.QueryStatus.SUCCESS.equals(asyncQueryService.queryStatus(PROJECT, queryId)));
HttpServletResponse response = mock(HttpServletResponse.class);
ServletOutputStream servletOutputStream = mock(ServletOutputStream.class);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -378,9 +384,11 @@ public class AysncQueryServiceTest extends ServiceTestBase {
return null;
}
}).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
- asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "csv", encodeDefault, "#");
- List<org.apache.spark.sql.Row> rowList = ss.read().csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
- Assert.assertEquals("\"123\"\"\"#123\n", baos.toString(StandardCharsets.UTF_8.name()));
+
+ SparderEnv.getSparkSession().sqlContext().setConf("spark.sql.parquet.columnNameCheck.enabled", "false");
+ asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault);
+ List<org.apache.spark.sql.Row> rowList = ss.read()
+ .parquet(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList();
List<String> result = Lists.newArrayList();
rowList.stream().forEach(row -> {
val list = row.toSeq().toList();
@@ -390,18 +398,19 @@ public class AysncQueryServiceTest extends ServiceTestBase {
result.add(column);
}
});
- assertEquals("123\"" + "123", result.get(0) + result.get(1));
+ assertEquals("(123)" + "123", result.get(0) + result.get(1));
}
@Test
- public void testSuccessQueryAndDownloadJSON() throws IOException {
+ public void testSuccessQueryAndDownloadCSVForDateFormat() throws IOException {
QueryContext queryContext = QueryContext.current();
String queryId = queryContext.getQueryId();
mockMetadata(queryId, true);
queryContext.getQueryTagInfo().setAsyncQuery(true);
- queryContext.getQueryTagInfo().setFileFormat("json");
+ queryContext.getQueryTagInfo().setFileFormat("csv");
queryContext.getQueryTagInfo().setFileEncode("utf-8");
- String sql = "select '123\"' as col1,'123' as col2";
+ queryContext.getQueryTagInfo().setSeparator(",");
+ String sql = "select '123\"' as col1,'123' as col2, date'2021-02-01' as col3";
queryContext.setProject(PROJECT);
SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
@@ -417,25 +426,15 @@ public class AysncQueryServiceTest extends ServiceTestBase {
return null;
}
}).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
- asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "json", encodeDefault, ",");
- FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
- FileStatus[] fileStatuses = fileSystem.listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()));
- XLSXExcelWriter xlsxExcelWriter = new XLSXExcelWriter();
- XSSFWorkbook workbook = new XSSFWorkbook();
- XSSFSheet sheet = workbook.createSheet();
- xlsxExcelWriter.writeData(fileStatuses, sheet);
- XSSFRow row = sheet.getRow(0);
- assertEquals("{\"col1\":\"123\\\"\",\"col2\":\"123\"}", row.getCell(0) + "," + row.getCell(1));
- assertEquals("[col1, col2]", QueryContext.current().getColumnNames().toString());
}
@Test
- public void testSuccessQueryAndDownloadXlsxResultByParquet() throws IOException {
+ public void testSuccessQueryAndDownloadJSON() throws IOException {
QueryContext queryContext = QueryContext.current();
String queryId = queryContext.getQueryId();
mockMetadata(queryId, true);
queryContext.getQueryTagInfo().setAsyncQuery(true);
- queryContext.getQueryTagInfo().setFileFormat("xlsx");
+ queryContext.getQueryTagInfo().setFileFormat("json");
queryContext.getQueryTagInfo().setFileEncode("utf-8");
String sql = "select '123\"' as col1,'123' as col2";
queryContext.setProject(PROJECT);
@@ -453,44 +452,63 @@ public class AysncQueryServiceTest extends ServiceTestBase {
return null;
}
}).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
- asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "xlsx", encodeDefault, ",");
- FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
+ }
+
+ @Test
+ public void testSuccessQueryAndDownloadXlsxResultNotIncludeHeader() throws IOException {
+ QueryContext queryContext = QueryContext.current();
+ String queryId = queryContext.getQueryId();
+ mockMetadata(queryId, true);
+ queryContext.getQueryTagInfo().setAsyncQuery(true);
+ queryContext.getQueryTagInfo().setFileFormat("xlsx");
+ queryContext.getQueryTagInfo().setFileEncode("utf-8");
+ String sql = "select '123\"' as col1,'123' as col2";
+ queryContext.setProject(PROJECT);
+ SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
+ assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
+ HttpServletResponse response = mock(HttpServletResponse.class);
+ ByteArrayOutputStream outputStream = mockOutputStream(response);
+ asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "xlsx", encodeDefault);
+
File file = new File("result.xlsx");
boolean createTempFileStatus = file.createNewFile();
- ArrayList<String> list = new ArrayList<>();
- FileStatus[] fileStatuses = fileSystem.listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()));
- for (FileStatus f : fileStatuses) {
- if (!f.getPath().getName().startsWith("_")) {
- fileSystem.copyToLocalFile(f.getPath(), new Path(file.getPath()));
- try(InputStream is = new FileInputStream(file.getAbsolutePath());
- XSSFWorkbook sheets = new XSSFWorkbook(is)) {
- XSSFSheet sheetAt = sheets.getSheetAt(0);
- for (int i = 0; i < sheetAt.getPhysicalNumberOfRows(); i++) {
- XSSFRow row = sheetAt.getRow(i);
- StringBuilder builder = new StringBuilder();
- for (int index = 0; index < row.getPhysicalNumberOfCells(); index++) {
- XSSFCell cell = row.getCell(index);
- if (index > 0) {
- builder.append(",");
- }
- builder.append(getString(cell));
- }
- list.add(builder.toString());
- }
- }
- }
- }
+ List<String> list = getXlsxResult(queryId, file);
Files.delete(file.toPath());
- logger.info("Temp File status createTempFileStatus:{}",
- createTempFileStatus);
+ logger.info("Temp File status createTempFileStatus:{}", createTempFileStatus);
assertEquals("123\",123", list.get(0));
}
+ @Test
+ public void testSuccessQueryAndDownloadXlsxResultIncludeHeader() throws IOException {
+ QueryContext queryContext = QueryContext.current();
+ String queryId = queryContext.getQueryId();
+ mockMetadata(queryId, true);
+ queryContext.getQueryTagInfo().setAsyncQuery(true);
+ queryContext.getQueryTagInfo().setFileFormat("xlsx");
+ queryContext.getQueryTagInfo().setFileEncode("utf-8");
+ queryContext.getQueryTagInfo().setIncludeHeader(true);
+ String sql = "select '123\"' as col1,'123' as col2";
+ queryContext.setProject(PROJECT);
+ SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT);
+ assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
+ HttpServletResponse response = mock(HttpServletResponse.class);
+ ByteArrayOutputStream outputStream = mockOutputStream(response);
+ asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "xlsx", encodeDefault);
+
+ File file = new File("result.xlsx");
+ boolean createTempFileStatus = file.createNewFile();
+ List<String> list = getXlsxResult(queryId, file);
+ Files.delete(file.toPath());
+ logger.info("Temp File status createTempFileStatus:{}", createTempFileStatus);
+ assertEquals("col1,col2", list.get(0));
+ assertEquals("123\",123", list.get(1));
+ }
+
private static String getString(XSSFCell xssfCell) {
if (xssfCell == null) {
return "";
}
- if (xssfCell.getCellType()== CellType.NUMERIC) {
+ if (xssfCell.getCellType() == CellType.NUMERIC) {
return String.valueOf(xssfCell.getNumericCellValue());
} else if (xssfCell.getCellType() == CellType.BOOLEAN) {
return String.valueOf(xssfCell.getBooleanCellValue());
@@ -519,9 +537,9 @@ public class AysncQueryServiceTest extends ServiceTestBase {
}
}).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
- asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, formatDefault, encodeDefault, ",");
+ asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, formatDefault, encodeDefault);
- assertEquals("a1,b1,c1\n" + "a2,b2,c2\n", baos.toString(StandardCharsets.UTF_8.name()));
+ assertEquals("a1,b1,c1\r\n" + "a2,b2,c2\r\n", baos.toString(StandardCharsets.UTF_8.name()));
}
@Test
@@ -542,9 +560,9 @@ public class AysncQueryServiceTest extends ServiceTestBase {
return null;
}).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
- asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, true, response, formatDefault, encodeDefault, ",");
+ asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, formatDefault, encodeDefault);
- assertEquals("name,age,city\na1,b1,c1\n" + "a2,b2,c2\n", baos.toString(StandardCharsets.UTF_8.name()));
+ assertEquals("a1,b1,c1\r\n" + "a2,b2,c2\r\n", baos.toString(StandardCharsets.UTF_8.name()));
}
@Test
@@ -565,9 +583,9 @@ public class AysncQueryServiceTest extends ServiceTestBase {
return null;
}).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
- asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, formatDefault, encodeDefault, ",");
+ asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, formatDefault, encodeDefault);
- assertEquals("a1,b1,c1\n" + "a2,b2,c2\n", baos.toString(StandardCharsets.UTF_8.name()));
+ assertEquals("a1,b1,c1\r\n" + "a2,b2,c2\r\n", baos.toString(StandardCharsets.UTF_8.name()));
}
@Test
@@ -590,35 +608,12 @@ public class AysncQueryServiceTest extends ServiceTestBase {
}
}).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
- asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "json", encodeDefault, ",");
+ asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "json", encodeDefault);
assertEquals("[\"{'column1':'a1', 'column2':'b1'}\",\"{'column1':'a2', 'column2':'b2'}\"]",
baos.toString(StandardCharsets.UTF_8.name()));
}
- @Test
- public void testSuccessQueryAndDownloadXlsxResult() throws IOException, InterruptedException {
- SQLResponse sqlResponse = mock(SQLResponse.class);
- when(sqlResponse.isException()).thenReturn(false);
- String queryId = RandomUtil.randomUUIDStr();
- mockResultFile(queryId, false, true);
- assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
- HttpServletResponse response = mock(HttpServletResponse.class);
- ServletOutputStream servletOutputStream = mock(ServletOutputStream.class);
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- when(response.getOutputStream()).thenReturn(servletOutputStream);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- Object[] arguments = invocationOnMock.getArguments();
- baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]);
- return null;
- }
- }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
-
- asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "xlsx", encodeDefault, ",");
- }
-
@Test
public void testCleanFolder() throws IOException, InterruptedException {
String queryId = RandomUtil.randomUUIDStr();
@@ -643,7 +638,7 @@ public class AysncQueryServiceTest extends ServiceTestBase {
try {
new Path(asyncQueryService.asyncQueryResultPath(PROJECT, queryId));
} catch (Exception e) {
- Assert.assertTrue(e instanceof NAsyncQueryIllegalParamException);
+ Assert.assertTrue(e instanceof KylinException);
Assert.assertEquals("Can’t find the query by this query ID in this project. Please check and try again.",
e.getMessage());
}
@@ -654,7 +649,7 @@ public class AysncQueryServiceTest extends ServiceTestBase {
try {
asyncQueryService.deleteByQueryId(PROJECT, "123");
} catch (Exception e) {
- Assert.assertTrue(e instanceof NAsyncQueryIllegalParamException);
+ Assert.assertTrue(e instanceof KylinException);
Assert.assertEquals("Can’t find the query by this query ID in this project. Please check and try again.",
e.getMessage());
}
@@ -678,7 +673,7 @@ public class AysncQueryServiceTest extends ServiceTestBase {
try {
new Path(asyncQueryService.asyncQueryResultPath(PROJECT, queryId));
} catch (Exception e) {
- Assert.assertTrue(e instanceof NAsyncQueryIllegalParamException);
+ Assert.assertTrue(e instanceof KylinException);
Assert.assertEquals("Can’t find the query by this query ID in this project. Please check and try again.",
e.getMessage());
}
@@ -808,14 +803,14 @@ public class AysncQueryServiceTest extends ServiceTestBase {
try {
AsyncQueryUtil.saveMetaData(PROJECT, sqlResponse.getColumnMetas(), queryId);
} catch (Exception e) {
- Assert.assertTrue(e instanceof NAsyncQueryIllegalParamException);
- Assert.assertEquals("KE-020040001", ((NAsyncQueryIllegalParamException) e).getErrorCode().getCodeString());
+ Assert.assertTrue(e instanceof KylinException);
+ Assert.assertEquals("KE-010031301", ((KylinException) e).getErrorCode().getCodeString());
}
try {
AsyncQueryUtil.saveFileInfo(PROJECT, formatDefault, encodeDefault, fileNameDefault, queryId, ",");
} catch (Exception e) {
- Assert.assertTrue(e instanceof NAsyncQueryIllegalParamException);
- Assert.assertEquals("KE-020040001", ((NAsyncQueryIllegalParamException) e).getErrorCode().getCodeString());
+ Assert.assertTrue(e instanceof KylinException);
+ Assert.assertEquals("KE-010031301", ((KylinException) e).getErrorCode().getCodeString());
}
}
@@ -902,7 +897,7 @@ public class AysncQueryServiceTest extends ServiceTestBase {
Path asyncQueryResultDir = AsyncQueryUtil.getAsyncQueryResultDir(PROJECT, queryId);
fileSystem.delete(new Path(asyncQueryResultDir, AsyncQueryUtil.getFileInfo()));
try (FSDataOutputStream os = fileSystem.create(new Path(asyncQueryResultDir, AsyncQueryUtil.getFileInfo()));
- OutputStreamWriter osw = new OutputStreamWriter(os, Charset.defaultCharset())) {
+ OutputStreamWriter osw = new OutputStreamWriter(os, Charset.defaultCharset())) {
osw.write(formatDefault + "\n");
osw.write(encodeDefault + "\n");
osw.write("foo" + "\n");
@@ -924,6 +919,33 @@ public class AysncQueryServiceTest extends ServiceTestBase {
assertArrayEquals(dataTypes.toArray(), metaData.get(1).toArray());
}
+ @Test
+ public void testAsyncQueryResultRowCount() throws Exception {
+ overwriteSystemProp("kylin.env", "DEV");
+ QueryContext queryContext = QueryContext.current();
+ String queryId = queryContext.getQueryId();
+ mockMetadata(queryId, true);
+ queryContext.getQueryTagInfo().setAsyncQuery(true);
+ queryContext.getQueryTagInfo().setFileFormat("csv");
+ queryContext.getQueryTagInfo().setFileEncode("utf-8");
+ queryContext.getQueryTagInfo().setSeparator(",");
+ queryContext.getQueryTagInfo().setIncludeHeader(false);
+ queryContext.setAclInfo(new QueryContext.AclInfo("ADMIN", Sets.newHashSet("g1"), true));
+
+ String sql = "select '123\"','123'";
+ queryContext.setProject(PROJECT);
+
+ new QueryExec(PROJECT, getTestConfig()).executeQuery(sql);
+
+ assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId));
+
+ QueryMetricsContext.start(queryId, "");
+ Assert.assertTrue(QueryMetricsContext.isStarted());
+ QueryMetricsContext metrics = QueryMetricsContext.collect(queryContext);
+ Assert.assertEquals(1, metrics.getResultRowCount());
+ QueryMetricsContext.reset();
+ }
+
public Path mockResultFile(String queryId, boolean block, boolean needMeta)
throws IOException, InterruptedException {
@@ -939,8 +961,8 @@ public class AysncQueryServiceTest extends ServiceTestBase {
}
try (FSDataOutputStream os = fileSystem.create(new Path(asyncQueryResultDir, "m00")); //
- OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8); //
- ICsvListWriter csvWriter = new CsvListWriter(osw, CsvPreference.STANDARD_PREFERENCE)) {
+ OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8); //
+ ICsvListWriter csvWriter = new CsvListWriter(osw, CsvPreference.STANDARD_PREFERENCE)) {
csvWriter.write(row1);
csvWriter.write(row2);
fileSystem.createNewFile(new Path(asyncQueryResultDir, AsyncQueryUtil.getSuccessFlagFileName()));
@@ -963,7 +985,7 @@ public class AysncQueryServiceTest extends ServiceTestBase {
fileSystem.mkdirs(asyncQueryResultDir);
}
try (FSDataOutputStream os = fileSystem.create(new Path(asyncQueryResultDir, "m00")); //
- OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) {
+ OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) {
osw.write(StringEscapeUtils.unescapeJson(row1));
osw.write(StringEscapeUtils.unescapeJson(row2));
fileSystem.createNewFile(new Path(asyncQueryResultDir, AsyncQueryUtil.getSuccessFlagFileName()));
@@ -982,7 +1004,7 @@ public class AysncQueryServiceTest extends ServiceTestBase {
}
try (FSDataOutputStream os = fileSystem
.create(new Path(asyncQueryResultDir, AsyncQueryUtil.getMetaDataFileName())); //
- OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { //
+ OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { //
String metaString = String.join(",", columnNames) + "\n" + String.join(",", dataTypes);
osw.write(metaString);
if (needMeta) {
@@ -1003,7 +1025,7 @@ public class AysncQueryServiceTest extends ServiceTestBase {
}
try (FSDataOutputStream os = fileSystem
.create(new Path(asyncQueryResultDir, AsyncQueryUtil.getMetaDataFileName())); //
- OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { //
+ OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { //
osw.write(formatDefault);
} catch (IOException e) {
@@ -1019,7 +1041,7 @@ public class AysncQueryServiceTest extends ServiceTestBase {
}
try (FSDataOutputStream os = fileSystem
.create(new Path(asyncQueryResultDir, AsyncQueryUtil.getMetaDataFileName())); //
- OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { //
+ OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { //
osw.write(encodeDefault);
} catch (IOException e) {
@@ -1027,17 +1049,49 @@ public class AysncQueryServiceTest extends ServiceTestBase {
}
}
- @Test
- public void testCsvWriter() throws IOException {
- List<List<Object>> rows = Lists.newArrayList(
- Lists.newArrayList(1, 3.12, "foo"),
- Lists.newArrayList(2, 3.123, "fo<>o"),
- Lists.newArrayList(3, 3.124, "fo\ro")
- );
- String expected = "1<>3.12<>foo\n2<>3.123<>\"fo<>o\"\n3<>3.124<>\"fo\ro\"\n";
- try (StringWriter sw = new StringWriter()) {
- CSVWriter.writeCsv(rows.iterator(), sw, "<>");
- assertEquals(expected, sw.toString());
+ public ByteArrayOutputStream mockOutputStream(HttpServletResponse response) throws IOException {
+
+ ServletOutputStream servletOutputStream = mock(ServletOutputStream.class);
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ when(response.getOutputStream()).thenReturn(servletOutputStream);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ Object[] arguments = invocationOnMock.getArguments();
+ baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]);
+ return null;
+ }
+ }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt());
+ return baos;
+ }
+
+ public List<String> getXlsxResult(String queryId, File file) throws IOException {
+ FileSystem fileSystem = AsyncQueryUtil.getFileSystem();
+ List<String> list = new ArrayList<>();
+ FileStatus[] fileStatuses = fileSystem
+ .listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()));
+ for (FileStatus f : fileStatuses) {
+ if (f.getPath().getName().startsWith("_")) {
+ continue;
+ }
+ fileSystem.copyToLocalFile(f.getPath(), new Path(file.getPath()));
+ try (InputStream is = new FileInputStream(file.getAbsolutePath());
+ XSSFWorkbook sheets = new XSSFWorkbook(is)) {
+ XSSFSheet sheetAt = sheets.getSheetAt(0);
+ for (int i = 0; i < sheetAt.getPhysicalNumberOfRows(); i++) {
+ XSSFRow row = sheetAt.getRow(i);
+ StringBuilder builder = new StringBuilder();
+ for (int index = 0; index < row.getPhysicalNumberOfCells(); index++) {
+ XSSFCell cell = row.getCell(index);
+ if (index > 0) {
+ builder.append(",");
+ }
+ builder.append(getString(cell));
+ }
+ list.add(builder.toString());
+ }
+ }
}
+ return list;
}
}
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
index 5b6d20c715..b5640d7c29 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
@@ -33,10 +33,11 @@ import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.hive.QueryMetricUtils
import org.apache.spark.sql.hive.utils.ResourceDetectUtils
import org.apache.spark.sql.util.SparderTypeUtil
-import org.apache.spark.sql.{DataFrame, SparderEnv, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row, SparderEnv, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import java.sql.Timestamp
+import java.util
import java.util.{UUID, List => JList}
import scala.collection.JavaConverters._
import scala.collection.{immutable, mutable}
@@ -130,29 +131,8 @@ object SparkSqlClient {
QueryContext.current().getMetrics.setQueryJobCount(jobCount)
QueryContext.current().getMetrics.setQueryStageCount(stageCount)
QueryContext.current().getMetrics.setQueryTaskCount(taskCount)
- (
- () => new java.util.Iterator[JList[String]] {
- /*
- * After fetching a batch of 1000, checks whether the query thread is interrupted.
- */
- val checkInterruptSize = 1000;
- var readRowSize = 0;
-
- override def hasNext: Boolean = resultRows.hasNext
-
- override def next(): JList[String] = {
- val row = resultRows.next()
- readRowSize += 1;
- if (readRowSize % checkInterruptSize == 0) {
- QueryUtil.checkThreadInterrupted("Interrupted at the stage of collecting result in SparkSqlClient.",
- "Current step: Collecting dataset of push-down.")
- }
- row.toSeq.map(rawValueToString(_)).asJava
- }
- },
- resultSize,
- fieldList
- )
+ // return result
+ (readPushDownResultRow(resultRows, true), resultSize, fieldList)
} catch {
case e: Throwable =>
if (e.isInstanceOf[InterruptedException]) {
@@ -169,6 +149,29 @@ object SparkSqlClient {
}
}
+ def readPushDownResultRow(resultRows: util.Iterator[Row], checkInterrupt: Boolean): java.lang.Iterable[JList[String]] = {
+ () =>
+ new java.util.Iterator[JList[String]] {
+ /*
+ * After fetching a batch of 1000, checks whether the query thread is interrupted.
+ */
+ val checkInterruptSize = 1000;
+ var readRowSize = 0;
+
+ override def hasNext: Boolean = resultRows.hasNext
+
+ override def next(): JList[String] = {
+ val row = resultRows.next()
+ readRowSize += 1;
+ if (checkInterrupt && readRowSize % checkInterruptSize == 0) {
+ QueryUtil.checkThreadInterrupted("Interrupted at the stage of collecting result in SparkSqlClient.",
+ "Current step: Collecting dataset of push-down.")
+ }
+ row.toSeq.map(rawValueToString(_)).asJava
+ }
+ }
+ }
+
private def rawValueToString(value: Any, wrapped: Boolean = false): String = value match {
case null => null
case value: Timestamp => DateFormat.castTimestampToString(value.getTime)
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
index 7ecc408674..8cc3a6ba25 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
@@ -20,7 +20,8 @@ package org.apache.kylin.query.runtime.plan
import com.google.common.cache.{Cache, CacheBuilder}
import io.kyligence.kap.secondstorage.SecondStorageUtil
-import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.Path
import org.apache.kylin.common.exception.NewQueryRefuseException
import org.apache.kylin.common.util.{HadoopUtil, RandomUtil}
@@ -30,19 +31,22 @@ import org.apache.kylin.metadata.query.{BigQueryThresholdUpdater, StructField}
import org.apache.kylin.metadata.state.QueryShareStateManager
import org.apache.kylin.query.engine.RelColumnMetaDataExtractor
import org.apache.kylin.query.engine.exec.ExecuteResult
+import org.apache.kylin.query.pushdown.SparkSqlClient.readPushDownResultRow
import org.apache.kylin.query.relnode.OLAPContext
import org.apache.kylin.query.util.{AsyncQueryUtil, QueryUtil, SparkJobTrace, SparkQueryJobManager}
-import org.apache.poi.xssf.usermodel.XSSFWorkbook
+import org.apache.poi.xssf.usermodel.{XSSFSheet, XSSFWorkbook}
import org.apache.spark.SparkConf
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.QueryMetricUtils
import org.apache.spark.sql.util.SparderTypeUtil
-import org.apache.spark.sql.{DataFrame, SaveMode, SparderEnv}
+import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparderEnv}
-import java.io.{File, FileOutputStream}
-import java.util
+import java.io.{File, FileOutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
import java.util.concurrent.atomic.AtomicLong
+import java.{lang, util}
import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions.`iterator asScala`
import scala.collection.mutable
// scalastyle:off
@@ -55,6 +59,10 @@ object ResultPlan extends LogEx {
val PARTITION_SPLIT_BYTES: Long = KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 1024 // 64MB
val SPARK_SCHEDULER_POOL: String = "spark.scheduler.pool"
+ val QUOTE_CHAR = "\""
+ val END_OF_LINE_SYMBOLS = IOUtils.LINE_SEPARATOR_UNIX
+ val CHECK_WRITE_SIZE = 1000
+
private def collectInternal(df: DataFrame, rowType: RelDataType): (java.lang.Iterable[util.List[String]], Int) = logTime("collectInternal", debug = true) {
val jobGroup = Thread.currentThread().getName
val sparkContext = SparderEnv.getSparkSession.sparkContext
@@ -139,20 +147,7 @@ object ResultPlan extends LogEx {
s"Is TableIndex: ${QueryContext.current().getQueryTagInfo.isTableIndex}")
val resultTypes = rowType.getFieldList.asScala
- (() => new util.Iterator[util.List[String]] {
-
- override def hasNext: Boolean = resultRows.hasNext
-
- override def next(): util.List[String] = {
- val row = resultRows.next()
- if (Thread.interrupted()) {
- throw new InterruptedException
- }
- row.toSeq.zip(resultTypes).map {
- case (value, relField) => SparderTypeUtil.convertToStringWithCalciteType(value, relField.getType)
- }.asJava
- }
- }, resultSize)
+ (readResultRow(resultRows, resultTypes), resultSize)
} catch {
case e: Throwable =>
if (e.isInstanceOf[InterruptedException]) {
@@ -167,6 +162,25 @@ object ResultPlan extends LogEx {
}
}
+
+ def readResultRow(resultRows: util.Iterator[Row], resultTypes: mutable.Buffer[RelDataTypeField]): lang.Iterable[util.List[String]] = {
+ () =>
+ new util.Iterator[util.List[String]] {
+
+ override def hasNext: Boolean = resultRows.hasNext
+
+ override def next(): util.List[String] = {
+ val row = resultRows.next()
+ if (Thread.interrupted()) {
+ throw new InterruptedException
+ }
+ row.toSeq.zip(resultTypes).map {
+ case (value, relField) => SparderTypeUtil.convertToStringWithCalciteType(value, relField.getType)
+ }.asJava
+ }
+ }
+ }
+
private def getNormalizedExplain(df: DataFrame): String = {
df.queryExecution.executedPlan.toString.replaceAll("#\\d+", "#x")
}
@@ -282,6 +296,8 @@ object ResultPlan extends LogEx {
QueryContext.currentTrace().endLastSpan()
val jobTrace = new SparkJobTrace(jobGroup, QueryContext.currentTrace(), QueryContext.current().getQueryId, sparkContext)
val dateTimeFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
+ val queryId = QueryContext.current().getQueryId
+ val includeHeader = QueryContext.current().getQueryTagInfo.isIncludeHeader
format match {
case "json" =>
val oldColumnNames = df.columns
@@ -302,31 +318,8 @@ object ResultPlan extends LogEx {
normalizeSchema(df).write.mode(SaveMode.Overwrite).option("encoding", encode).option("charset", "utf-8").parquet(path)
}
sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "false")
- case "csv" =>
- df.write
- .option("timestampFormat", dateTimeFormat)
- .option("encoding", encode)
- .option("dateFormat", "yyyy-MM-dd")
- .option("charset", "utf-8").mode(SaveMode.Append).csv(path)
- case "xlsx" => {
- val queryId = QueryContext.current().getQueryId
- val file = new File(queryId + ".xlsx")
- file.createNewFile();
- val outputStream = new FileOutputStream(file)
- val workbook = new XSSFWorkbook
- val sheet = workbook.createSheet("query_result");
- var num = 0
- df.collect().foreach(row => {
- val row1 = sheet.createRow(num)
- for (i <- 0 until row.length) {
- row1.createCell(i).setCellValue(row.apply(i).toString)
- }
- num = num + 1
- })
- workbook.write(outputStream)
- HadoopUtil.getWorkingFileSystem
- .copyFromLocalFile(true, true, new Path(file.getPath), new Path(path + "/" + queryId + ".xlsx"))
- }
+ case "csv" => processCsv(df, format, rowType, path, queryId, includeHeader)
+ case "xlsx" => processXlsx(df, format, rowType, path, queryId, includeHeader)
case _ =>
normalizeSchema(df).write.option("timestampFormat", dateTimeFormat).option("encoding", encode)
.option("charset", "utf-8").mode(SaveMode.Append).parquet(path)
@@ -345,11 +338,142 @@ object ResultPlan extends LogEx {
QueryContext.current().getMetrics.setQueryJobCount(jobCount)
QueryContext.current().getMetrics.setQueryStageCount(stageCount)
QueryContext.current().getMetrics.setQueryTaskCount(taskCount)
- QueryContext.current().getMetrics.setResultRowCount(newExecution.executedPlan.metrics.get("numOutputRows")
+ setResultRowCount(newExecution.executedPlan)
+ }
+ }
+
+ def setResultRowCount(plan: SparkPlan): Unit = {
+ if (QueryContext.current().getMetrics.getResultRowCount == 0) {
+ QueryContext.current().getMetrics.setResultRowCount(plan.metrics.get("numOutputRows")
.map(_.value).getOrElse(0))
}
}
+ def processCsv(df: DataFrame, format: String, rowType: RelDataType, path: String, queryId: String, includeHeader: Boolean) = {
+ val file = createTmpFile(queryId, format)
+ val writer = new OutputStreamWriter(new FileOutputStream(file), StandardCharsets.UTF_8)
+ if (includeHeader) processCsvHeader(writer, rowType)
+ val (iterator, resultRowSize) = df.toIterator()
+ asyncQueryIteratorWriteCsv(iterator, writer, rowType)
+ uploadAsyncQueryResult(file, path, queryId, format)
+ setResultRowCount(resultRowSize)
+ }
+
+ def processXlsx(df: DataFrame, format: String, rowType: RelDataType, path: String, queryId: String, includeHeader: Boolean) = {
+ val file = createTmpFile(queryId, format)
+ val outputStream = new FileOutputStream(file)
+ val workbook = new XSSFWorkbook
+ val sheet = workbook.createSheet("query_result")
+ var num = 0
+ if (includeHeader) {
+ processXlsxHeader(sheet, rowType)
+ num += 1
+ }
+ val (iterator, resultRowSize) = df.toIterator()
+ iterator.foreach(row => {
+ val row1 = sheet.createRow(num)
+ row.toSeq.zipWithIndex.foreach(it => row1.createCell(it._2).setCellValue(it._1.toString))
+ num += 1
+ })
+ workbook.write(outputStream)
+ uploadAsyncQueryResult(file, path, queryId, format)
+ setResultRowCount(resultRowSize)
+ }
+
+ private def setResultRowCount(resultRowSize: Int) = {
+ if (!KylinConfig.getInstanceFromEnv.isUTEnv) {
+ QueryContext.current().getMetrics.setResultRowCount(resultRowSize)
+ }
+ }
+
+ def processCsvHeader(writer: OutputStreamWriter, rowType: RelDataType): Unit = {
+ val separator = QueryContext.current().getQueryTagInfo.getSeparator
+ rowType match {
+ case null =>
+ val columnNames = QueryContext.current().getColumnNames.asScala.mkString(separator)
+ writer.write(columnNames + END_OF_LINE_SYMBOLS)
+ case _ =>
+ val builder = new StringBuilder
+ rowType.getFieldList.asScala.map(t => t.getName).foreach(column => builder.append(separator + column))
+ builder.deleteCharAt(0)
+ writer.write(builder.toString() + END_OF_LINE_SYMBOLS)
+ }
+ writer.flush()
+ }
+
+ def processXlsxHeader(sheet: XSSFSheet, rowType: RelDataType): Unit = {
+ val excelRow = sheet.createRow(0)
+
+ rowType match {
+ case null =>
+ val columnNameArray = QueryContext.current().getColumnNames
+ columnNameArray.asScala.zipWithIndex
+ .foreach(it => excelRow.createCell(it._2).setCellValue(it._1))
+ case _ =>
+ val columnArray = rowType.getFieldList.asScala.map(t => t.getName)
+ columnArray.zipWithIndex.foreach(it => excelRow.createCell(it._2).setCellValue(it._1))
+ }
+ }
+
+ def createTmpFile(queryId: String, format: String): File = {
+ val file = new File(queryId + format)
+ file.createNewFile()
+ file
+ }
+
+ def uploadAsyncQueryResult(file: File, path: String, queryId: String, format: String): Unit = {
+ HadoopUtil.getWorkingFileSystem
+ .copyFromLocalFile(true, true, new Path(file.getPath), new Path(path + "/" + queryId + "." + format))
+ if (file.exists()) file.delete()
+ }
+
+ def asyncQueryIteratorWriteCsv(resultRows: util.Iterator[Row], outputStream: OutputStreamWriter, rowType: RelDataType): Unit = {
+ var asyncQueryRowSize = 0
+ val separator = QueryContext.current().getQueryTagInfo.getSeparator
+ val asyncQueryResult = if (rowType != null) {
+ val resultTypes = rowType.getFieldList.asScala
+ readResultRow(resultRows, resultTypes)
+ } else {
+ readPushDownResultRow(resultRows, false)
+ }
+
+ asyncQueryResult.forEach(row => {
+
+ asyncQueryRowSize += 1
+ val builder = new StringBuilder
+
+ for (i <- 0 until row.size()) {
+ val column = if (row.get(i) == null) "" else row.get(i)
+
+ if (i > 0) builder.append(separator)
+
+ val escapedCsv = encodeCell(column, separator)
+ builder.append(escapedCsv)
+ }
+ builder.append(END_OF_LINE_SYMBOLS)
+ outputStream.write(builder.toString())
+ if (asyncQueryRowSize % CHECK_WRITE_SIZE == 0) {
+ outputStream.flush()
+ }
+ })
+ outputStream.flush()
+ }
+
+ // the encode logic is copied from org.supercsv.encoder.DefaultCsvEncoder.encode
+ def encodeCell(column1: String, separator: String): String = {
+
+ var column = column1
+ var needQuote = column.contains(separator) || column.contains("\r") || column.contains("\n")
+
+ if (column.contains(QUOTE_CHAR)) {
+ needQuote = true
+ column = column.replace(QUOTE_CHAR, QUOTE_CHAR + QUOTE_CHAR)
+ }
+
+ if (needQuote) QUOTE_CHAR + column + QUOTE_CHAR
+ else column
+ }
+
/**
* Normalize column name by replacing invalid characters with underscore
* and strips accents